diff --git a/.github/workflows/dev-build.yml b/.github/workflows/dev-build.yml
index d03fbeff14..c3af006f54 100644
--- a/.github/workflows/dev-build.yml
+++ b/.github/workflows/dev-build.yml
@@ -30,7 +30,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
- default: ec2-c6g.4xlarge-arm64
+ default: ec2-c6g.8xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G
diff --git a/.github/workflows/nightly-build.yml b/.github/workflows/nightly-build.yml
index 14ebb6e715..54af32a94b 100644
--- a/.github/workflows/nightly-build.yml
+++ b/.github/workflows/nightly-build.yml
@@ -27,7 +27,7 @@ on:
linux_arm64_runner:
type: choice
description: The runner uses to build linux-arm64 artifacts
- default: ec2-c6g.4xlarge-arm64
+ default: ec2-c6g.8xlarge-arm64
options:
- ec2-c6g.xlarge-arm64 # 4C8G
- ec2-c6g.2xlarge-arm64 # 8C16G
diff --git a/.github/workflows/nightly-jsonbench.yaml b/.github/workflows/nightly-jsonbench.yaml
index 3667ee26a6..a9ce4dd363 100644
--- a/.github/workflows/nightly-jsonbench.yaml
+++ b/.github/workflows/nightly-jsonbench.yaml
@@ -1,19 +1,81 @@
name: Nightly JSONBench
on:
- schedule:
- # Trigger at 00:00(Asia/Shanghai) on every weekday.
- - cron: "0 16 * * 0-4"
+ workflow_run:
+ workflows: [ "GreptimeDB Nightly Build" ]
+ types: [ completed ]
workflow_dispatch:
+ inputs:
+ run_id:
+ description: The nightly build workflow run id to download GreptimeDB artifacts from
+ required: true
+ type: string
+
+permissions:
+ actions: read
+ contents: read
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true
jobs:
+ resolve-artifact:
+ name: Resolve GreptimeDB nightly artifact
+ if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'workflow_dispatch' || github.event.workflow_run.conclusion == 'success') }}
+ runs-on: ubuntu-latest
+ outputs:
+ artifact-name: ${{ steps.find-artifact.outputs.artifact-name }}
+ run-id: ${{ steps.resolve-run-id.outputs.run-id }}
+ steps:
+ - name: Resolve nightly build run id
+ id: resolve-run-id
+ shell: bash
+ env:
+ EVENT_NAME: ${{ github.event_name }}
+ WORKFLOW_RUN_ID: ${{ github.event.workflow_run.id }}
+ INPUT_RUN_ID: ${{ inputs.run_id }}
+ run: |
+ set -euo pipefail
+
+ if [[ "${EVENT_NAME}" == "workflow_dispatch" ]]; then
+ run_id="${INPUT_RUN_ID}"
+ else
+ run_id="${WORKFLOW_RUN_ID}"
+ fi
+
+ if [[ ! "${run_id}" =~ ^[0-9]+$ ]]; then
+ echo "Invalid workflow run id: ${run_id}"
+ exit 1
+ fi
+
+ echo "run-id=${run_id}" >> "${GITHUB_OUTPUT}"
+
+ - name: Find GreptimeDB nightly artifact
+ id: find-artifact
+ shell: bash
+ env:
+ GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ RUN_ID: ${{ steps.resolve-run-id.outputs.run-id }}
+ run: |
+ set -euo pipefail
+
+ artifact_name=$(gh api "repos/${GITHUB_REPOSITORY}/actions/runs/${RUN_ID}/artifacts" --paginate \
+ --jq '.artifacts[] | select(.name | test("^greptime-linux-arm64-nightly-[0-9]{8}-[0-9a-f]+$")) | .name' \
+ | head -n 1)
+
+ if [[ -z "${artifact_name}" ]]; then
+ echo "Cannot find linux arm64 nightly artifact in workflow run ${RUN_ID}."
+ exit 1
+ fi
+
+ echo "Download GreptimeDB artifact: ${artifact_name}"
+ echo "artifact-name=${artifact_name}" >> "${GITHUB_OUTPUT}"
+
allocate-runner:
name: Allocate runner
- if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
+ if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'workflow_dispatch' || github.event.workflow_run.conclusion == 'success') }}
+ needs: [ resolve-artifact ]
runs-on: ubuntu-latest
outputs:
linux-arm64-runner: ${{ steps.start-linux-arm64-runner.outputs.label }}
@@ -43,55 +105,50 @@ jobs:
jsonbench:
name: Run JSONBench
- if: ${{ github.repository == 'GreptimeTeam/greptimedb' }}
- needs: [ allocate-runner ]
+ if: ${{ github.repository == 'GreptimeTeam/greptimedb' && (github.event_name == 'workflow_dispatch' || github.event.workflow_run.conclusion == 'success') }}
+ needs: [ resolve-artifact, allocate-runner ]
runs-on: ${{ needs.allocate-runner.outputs.linux-arm64-runner }}
timeout-minutes: 120
env:
- JSONBENCH_DATA_DIR: /home/runner/data/bluesky
- JSONBENCH_OUTPUT_PREFIX: _ubuntu-latest
+ JSONBENCH_OUTPUT_PREFIX: _linux-arm64
steps:
- - name: Checkout
- uses: actions/checkout@v4
+ - name: Download GreptimeDB nightly artifact
+ uses: actions/download-artifact@v4
with:
- fetch-depth: 0
- persist-credentials: false
+ name: ${{ needs.resolve-artifact.outputs.artifact-name }}
+ path: greptimedb-artifact
+ github-token: ${{ secrets.GITHUB_TOKEN }}
+ run-id: ${{ needs.resolve-artifact.outputs.run-id }}
- - uses: arduino/setup-protoc@v3
- with:
- repo-token: ${{ secrets.GITHUB_TOKEN }}
-
- - uses: actions-rust-lang/setup-rust-toolchain@v1
-
- - name: Rust Cache
- uses: Swatinem/rust-cache@v2
- with:
- shared-key: "nightly-jsonbench"
- cache-all-crates: "true"
- save-if: ${{ github.ref == 'refs/heads/main' }}
-
- - name: Build GreptimeDB
- run: cargo build --profile nightly --bin greptime
-
- - name: Reclaim disk space
+ - name: Prepare GreptimeDB binary
shell: bash
run: |
set -euo pipefail
- mkdir -p "${RUNNER_TEMP}/greptimedb-bin"
- cp ./target/nightly/greptime "${RUNNER_TEMP}/greptimedb-bin/greptime"
- chmod +x "${RUNNER_TEMP}/greptimedb-bin/greptime"
-
- rm -rf ./target
+ tar -xzf "greptimedb-artifact/${{ needs.resolve-artifact.outputs.artifact-name }}.tar.gz"
+ cp "${{ needs.resolve-artifact.outputs.artifact-name }}/greptime" ./greptime
+ chmod +x ./greptime
+ rm -rf greptimedb-artifact "${{ needs.resolve-artifact.outputs.artifact-name }}"
- name: Run JSONBench
+ env:
+ # TODO(LFC): Change to "3" (100m) when JSON2 ingestion performance is optimized.
+ JSONBENCH_DATASET: 2
shell: bash
run: |
set -euo pipefail
- cd "${RUNNER_TEMP}"
- cp "${RUNNER_TEMP}/greptimedb-bin/greptime" ./greptime
- chmod +x ./greptime
+ export JSONBENCH_DATA_DIR="/root/data/bluesky"
+ echo "Use JSONBench data directory ${JSONBENCH_DATA_DIR}"
+
+ echo "Cloning JSONBench"
+ git clone --branch greptimedb-new-json --depth 1 https://github.com/GreptimeTeam/JSONBench.git JSONBench
+
+ echo "Downloading JSONBench dataset choice ${JSONBENCH_DATASET} to ${JSONBENCH_DATA_DIR}"
+ mkdir -p "${JSONBENCH_DATA_DIR}"
+ printf "${JSONBENCH_DATASET}\n" | ./JSONBench/download_data.sh
+ downloaded_files=$(find "${JSONBENCH_DATA_DIR}" -type f | wc -l)
+ echo "Downloaded JSONBench dataset files: ${downloaded_files}"
export GREPTIMEDB_STANDALONE__WAL__DIR=greptimedb_data/wal
export GREPTIMEDB_STANDALONE__STORAGE__DATA_HOME=greptimedb_data
@@ -100,10 +157,12 @@ jobs:
export GREPTIMEDB_STANDALONE__HTTP__BODY_LIMIT=1GB
export GREPTIMEDB_STANDALONE__HTTP__TIMEOUT=500s
+ echo "Starting GreptimeDB standalone"
./greptime standalone start > greptimedb.log 2>&1 &
greptime_pid=$!
trap 'kill "${greptime_pid}" 2>/dev/null || true' EXIT
+ echo "Waiting for GreptimeDB health check"
until curl -s --fail -o /dev/null http://localhost:4000/health; do
if ! kill -0 "${greptime_pid}" 2>/dev/null; then
cat greptimedb.log
@@ -111,12 +170,14 @@ jobs:
fi
sleep 1
done
+ echo "GreptimeDB is ready"
- git clone --branch greptimedb-new-json --depth 1 https://github.com/GreptimeTeam/JSONBench.git JSONBench
cp ./greptime JSONBench/greptimedb/greptime
cd JSONBench/greptimedb
- ./main.sh 3 "${JSONBENCH_DATA_DIR}" success.log error.log "${JSONBENCH_OUTPUT_PREFIX}" false
+ echo "Running JSONBench main.sh with dataset choice ${JSONBENCH_DATASET} and install=false"
+ ./main.sh ${JSONBENCH_DATASET} "${JSONBENCH_DATA_DIR}" success.log error.log "${JSONBENCH_OUTPUT_PREFIX}" false
+ echo "JSONBench finished"
- name: Upload JSONBench results
if: always()
@@ -124,21 +185,21 @@ jobs:
with:
name: jsonbench-results
path: |
- ${{ runner.temp }}/greptimedb.log
- ${{ runner.temp }}/JSONBench/greptimedb/*.log
- ${{ runner.temp }}/JSONBench/greptimedb/*.total_size
- ${{ runner.temp }}/JSONBench/greptimedb/*.data_size
- ${{ runner.temp }}/JSONBench/greptimedb/*.index_size
- ${{ runner.temp }}/JSONBench/greptimedb/*.count
- ${{ runner.temp }}/JSONBench/greptimedb/*.results_runtime
- ${{ runner.temp }}/JSONBench/greptimedb/*.query_results
+ ./greptimedb.log
+ ./JSONBench/greptimedb/*.log
+ ./JSONBench/greptimedb/*.total_size
+ ./JSONBench/greptimedb/*.data_size
+ ./JSONBench/greptimedb/*.index_size
+ ./JSONBench/greptimedb/*.count
+ ./JSONBench/greptimedb/*.results_runtime
+ ./JSONBench/greptimedb/*.query_results
if-no-files-found: ignore
retention-days: 7
stop-linux-arm64-runner:
name: Stop Linux ARM64 runner
# It's always run as the last job in the workflow to make sure that the runner is released.
- if: ${{ always() }}
+ if: ${{ always() && needs.allocate-runner.outputs.linux-arm64-ec2-runner-instance-id != '' }}
runs-on: ubuntu-latest
needs: [
allocate-runner,
diff --git a/Cargo.lock b/Cargo.lock
index a65159d26a..2485a5ceec 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -79,8 +79,9 @@ checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75"
dependencies = [
"cfg-if",
"const-random",
- "getrandom 0.3.3",
+ "getrandom 0.3.4",
"once_cell",
+ "serde",
"version_check",
"zerocopy",
]
@@ -771,7 +772,7 @@ version = "4.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ef1e3e699d84ab1b0911a1010c5c106aa34ae89aeac103be5ce0c3859db1e891"
dependencies = [
- "term",
+ "term 1.0.2",
]
[[package]]
@@ -1427,6 +1428,12 @@ dependencies = [
"syn 2.0.117",
]
+[[package]]
+name = "borrow-or-share"
+version = "0.2.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dc0b364ead1874514c8c2855ab558056ebfeb775653e7ae45ff72f28f8f3166c"
+
[[package]]
name = "borsh"
version = "1.5.7"
@@ -1525,6 +1532,12 @@ dependencies = [
"syn 1.0.109",
]
+[[package]]
+name = "bytecount"
+version = "0.6.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "175812e0be2bccb6abe50bb8d566126198344f707e304f45c648fd8f2cc0365e"
+
[[package]]
name = "bytemuck"
version = "1.23.1"
@@ -1635,7 +1648,7 @@ dependencies = [
"paste",
"prometheus 0.14.0",
"promql-parser",
- "rand 0.9.1",
+ "rand 0.9.4",
"serde",
"serde_json",
"session",
@@ -1973,7 +1986,7 @@ dependencies = [
"partition",
"paste",
"query",
- "rand 0.9.1",
+ "rand 0.9.4",
"reqwest 0.13.2",
"serde",
"serde_json",
@@ -2020,7 +2033,7 @@ dependencies = [
"prometheus 0.14.0",
"prost 0.14.1",
"query",
- "rand 0.9.1",
+ "rand 0.9.4",
"serde_json",
"snafu 0.8.6",
"store-api",
@@ -2031,6 +2044,15 @@ dependencies = [
"tracing",
]
+[[package]]
+name = "clipboard-win"
+version = "5.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bde03770d3df201d4fb868f2c9c59e66a3e4e2bd06692a0fe701e7103c7e84d4"
+dependencies = [
+ "error-code",
+]
+
[[package]]
name = "clocksource"
version = "0.8.1"
@@ -2123,7 +2145,7 @@ dependencies = [
"prometheus 0.14.0",
"prost 0.14.1",
"query",
- "rand 0.9.1",
+ "rand 0.9.4",
"regex",
"reqwest 0.13.2",
"serde",
@@ -2462,7 +2484,7 @@ dependencies = [
"hyper-util",
"lazy_static",
"prost 0.14.1",
- "rand 0.9.1",
+ "rand 0.9.4",
"serde",
"serde_json",
"snafu 0.8.6",
@@ -2581,7 +2603,7 @@ dependencies = [
"prometheus 0.14.0",
"prost 0.14.1",
"prost-types 0.14.1",
- "rand 0.9.1",
+ "rand 0.9.4",
"regex",
"rskafka",
"rustls",
@@ -2649,7 +2671,7 @@ dependencies = [
"futures-util",
"humantime-serde",
"object-store",
- "rand 0.9.1",
+ "rand 0.9.4",
"serde",
"serde_json",
"smallvec",
@@ -2833,7 +2855,7 @@ dependencies = [
"common-query",
"common-recordbatch",
"once_cell",
- "rand 0.9.1",
+ "rand 0.9.4",
"tempfile",
]
@@ -2849,7 +2871,7 @@ dependencies = [
"humantime",
"humantime-serde",
"once_cell",
- "rand 0.9.1",
+ "rand 0.9.4",
"serde",
"serde_json",
"snafu 0.8.6",
@@ -3594,6 +3616,12 @@ dependencies = [
"parking_lot_core 0.9.11",
]
+[[package]]
+name = "data-encoding"
+version = "2.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a4ae5f15dda3c708c0ade84bfee31ccab44a3da4f88015ed22f63732abe300c8"
+
[[package]]
name = "datafusion"
version = "53.1.0"
@@ -3638,7 +3666,7 @@ dependencies = [
"object_store",
"parking_lot 0.12.4",
"parquet",
- "rand 0.9.1",
+ "rand 0.9.4",
"regex",
"sqlparser",
"tempfile",
@@ -3755,7 +3783,7 @@ dependencies = [
"liblzma",
"log",
"object_store",
- "rand 0.9.1",
+ "rand 0.9.4",
"tokio",
"tokio-util",
"url",
@@ -3881,7 +3909,7 @@ dependencies = [
"log",
"object_store",
"parking_lot 0.12.4",
- "rand 0.9.1",
+ "rand 0.9.4",
"tempfile",
"url",
]
@@ -3944,7 +3972,7 @@ dependencies = [
"md-5 0.10.6",
"memchr",
"num-traits",
- "rand 0.9.1",
+ "rand 0.9.4",
"regex",
"sha2 0.10.9",
"unicode-segmentation",
@@ -4234,7 +4262,7 @@ dependencies = [
"datafusion-proto-common",
"object_store",
"prost 0.14.1",
- "rand 0.9.1",
+ "rand 0.9.4",
]
[[package]]
@@ -4685,6 +4713,27 @@ dependencies = [
"crypto-common 0.2.1",
]
+[[package]]
+name = "dirs-next"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1"
+dependencies = [
+ "cfg-if",
+ "dirs-sys-next",
+]
+
+[[package]]
+name = "dirs-sys-next"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d"
+dependencies = [
+ "libc",
+ "redox_users",
+ "winapi",
+]
+
[[package]]
name = "displaydoc"
version = "0.2.5"
@@ -4713,14 +4762,14 @@ dependencies = [
[[package]]
name = "dns-lookup"
-version = "2.0.4"
+version = "3.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e5766087c2235fec47fafa4cfecc81e494ee679d0fd4a59887ea0919bfb0e4fc"
+checksum = "6e39034cee21a2f5bbb66ba0e3689819c4bb5d00382a282006e802a7ffa6c41d"
dependencies = [
"cfg-if",
"libc",
- "socket2 0.5.10",
- "windows-sys 0.48.0",
+ "socket2 0.6.0",
+ "windows-sys 0.60.2",
]
[[package]]
@@ -4740,31 +4789,30 @@ dependencies = [
[[package]]
name = "domain"
-version = "0.11.0"
+version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a11dd7f04a6a6d2aea0153c6e31f5ea7af8b2efdf52cdaeea7a9a592c7fefef9"
+checksum = "8c469892dddfeff64ecfdbc64cf059c77fb0decaeccd4d5d484394bdd6312bac"
dependencies = [
"bumpalo",
"bytes",
"domain-macros",
"futures-util",
- "hashbrown 0.14.5",
+ "hashbrown 0.17.1",
+ "jiff",
"log",
- "moka",
"octseq",
- "rand 0.8.5",
+ "rand 0.10.1",
"serde",
"smallvec",
- "time",
"tokio",
"tracing",
]
[[package]]
name = "domain-macros"
-version = "0.11.0"
+version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0e197fdfd2cdb5fdeb7f8ddcf3aed5d5d04ecde2890d448b14ffb716f7376b70"
+checksum = "6fef7ef74e413e36d5364db163ca577ccb56f2f74377705d5f920ee3e1544127"
dependencies = [
"proc-macro2",
"quote",
@@ -4849,6 +4897,15 @@ dependencies = [
"serde",
]
+[[package]]
+name = "email_address"
+version = "0.2.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e079f19b08ca6239f47f8ba8509c11cf3ea30095831f7fed61441475edd8c449"
+dependencies = [
+ "serde",
+]
+
[[package]]
name = "ena"
version = "0.14.3"
@@ -4962,6 +5019,12 @@ dependencies = [
"windows-sys 0.59.0",
]
+[[package]]
+name = "error-code"
+version = "3.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59"
+
[[package]]
name = "etcd-client"
version = "0.17.0"
@@ -5018,6 +5081,12 @@ dependencies = [
"pin-project-lite",
]
+[[package]]
+name = "exitcode"
+version = "1.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "de853764b47027c2e862a995c34978ffa63c1501f2e15f987ba11bd4f9bba193"
+
[[package]]
name = "fail"
version = "0.5.1"
@@ -5043,9 +5112,9 @@ checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fancy-regex"
-version = "0.14.0"
+version = "0.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6e24cb5a94bcae1e5408b0effca5cd7172ea3c5755049c5f3af4cd283a165298"
+checksum = "72cf461f865c862bb7dc573f643dd6a2b6842f7c30b07882b56bd148cc2761b8"
dependencies = [
"bit-set",
"regex-automata",
@@ -5274,7 +5343,7 @@ dependencies = [
"prometheus 0.14.0",
"prost 0.14.1",
"query",
- "rand 0.9.1",
+ "rand 0.9.4",
"serde",
"serde_json",
"servers",
@@ -5299,6 +5368,17 @@ dependencies = [
"bitflags 1.3.2",
]
+[[package]]
+name = "fluent-uri"
+version = "0.4.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bc74ac4d8359ae70623506d512209619e5cf8f347124910440dbc221714b328e"
+dependencies = [
+ "borrow-or-share",
+ "ref-cast",
+ "serde",
+]
+
[[package]]
name = "flume"
version = "0.11.1"
@@ -5337,6 +5417,16 @@ dependencies = [
"percent-encoding",
]
+[[package]]
+name = "fraction"
+version = "0.15.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e076045bb43dac435333ed5f04caf35c7463631d0dae2deb2638d94dd0a5b872"
+dependencies = [
+ "lazy_static",
+ "num",
+]
+
[[package]]
name = "fragile"
version = "2.0.1"
@@ -5405,7 +5495,7 @@ dependencies = [
"promql-parser",
"prost 0.14.1",
"query",
- "rand 0.9.1",
+ "rand 0.9.4",
"reqwest 0.13.2",
"serde",
"serde_json",
@@ -5756,21 +5846,21 @@ dependencies = [
"cfg-if",
"js-sys",
"libc",
- "wasi 0.11.1+wasi-snapshot-preview1",
+ "wasi",
"wasm-bindgen",
]
[[package]]
name = "getrandom"
-version = "0.3.3"
+version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
+checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"r-efi",
- "wasi 0.14.2+wasi-0.2.4",
+ "wasip2",
"wasm-bindgen",
]
@@ -5842,9 +5932,9 @@ dependencies = [
[[package]]
name = "grok"
-version = "2.1.0"
+version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c52724b609896f661a3f4641dd3a44dc602958ef615857c12d00756b4e9355b"
+checksum = "6ddab6a9c8bb998cb2fc3101fde8ef561b7c4970db3957be7a8eee1e168f666b"
dependencies = [
"glob",
"onig",
@@ -5984,6 +6074,9 @@ name = "hashbrown"
version = "0.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed5909b6e89a2db4456e54cd5f673791d7eca6732202bbf2a9cc504fe2f9b84a"
+dependencies = [
+ "allocator-api2",
+]
[[package]]
name = "hashlink"
@@ -6650,7 +6743,7 @@ dependencies = [
"pin-project",
"prost 0.14.1",
"puffin",
- "rand 0.9.1",
+ "rand 0.9.4",
"rand_chacha 0.9.0",
"regex",
"regex-automata",
@@ -6841,6 +6934,15 @@ dependencies = [
"derive_utils",
]
+[[package]]
+name = "ipcrypt-rs"
+version = "0.9.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "96e4f67dbfc0f75d7b65953ecf0be3fd84ee0cb1ae72a00a4aa9a2f5518a2c80"
+dependencies = [
+ "aes",
+]
+
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -7016,6 +7118,36 @@ dependencies = [
"windows-sys 0.45.0",
]
+[[package]]
+name = "jni"
+version = "0.22.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "5efd9a482cf3a427f00d6b35f14332adc7902ce91efb778580e180ff90fa3498"
+dependencies = [
+ "cfg-if",
+ "combine",
+ "jni-macros",
+ "jni-sys 0.4.1",
+ "log",
+ "simd_cesu8",
+ "thiserror 2.0.17",
+ "walkdir",
+ "windows-link 0.2.1",
+]
+
+[[package]]
+name = "jni-macros"
+version = "0.22.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a00109accc170f0bdb141fed3e393c565b6f5e072365c3bd58f5b062591560a3"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "rustc_version",
+ "simd_cesu8",
+ "syn 2.0.117",
+]
+
[[package]]
name = "jni-sys"
version = "0.3.1"
@@ -7050,7 +7182,7 @@ version = "0.1.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38f262f097c174adebe41eb73d66ae9c06b2844fb0da69969647bbddd9b0538a"
dependencies = [
- "getrandom 0.3.3",
+ "getrandom 0.3.4",
"libc",
]
@@ -7139,11 +7271,38 @@ version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c6e529149475ca0b2820835d3dce8fcc41c6b943ca608d32f35b449255e4627"
dependencies = [
- "fluent-uri",
+ "fluent-uri 0.1.4",
"serde",
"serde_json",
]
+[[package]]
+name = "jsonschema"
+version = "0.38.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "89f50532ce4a0ba3ae930212908d8ec50e7806065c059fe9c75da2ece6132294"
+dependencies = [
+ "ahash 0.8.12",
+ "bytecount",
+ "data-encoding",
+ "email_address",
+ "fancy-regex",
+ "fraction",
+ "getrandom 0.3.4",
+ "idna",
+ "itoa",
+ "num-cmp",
+ "num-traits",
+ "percent-encoding",
+ "referencing",
+ "regex",
+ "regex-syntax",
+ "serde",
+ "serde_json",
+ "unicode-general-category",
+ "uuid-simd",
+]
+
[[package]]
name = "jsonwebtoken"
version = "10.3.0"
@@ -7337,7 +7496,7 @@ dependencies = [
"regex-syntax",
"sha3",
"string_cache",
- "term",
+ "term 1.0.2",
"unicode-xid",
"walkdir",
]
@@ -7736,7 +7895,7 @@ dependencies = [
"protobuf 2.28.0",
"protobuf-build",
"raft-engine",
- "rand 0.9.1",
+ "rand 0.9.4",
"rskafka",
"serde",
"serde_json",
@@ -8043,7 +8202,7 @@ dependencies = [
"futures-util",
"humantime-serde",
"meta-srv",
- "rand 0.9.1",
+ "rand 0.9.4",
"serde",
"serde_json",
"session",
@@ -8111,7 +8270,7 @@ dependencies = [
"partition",
"prometheus 0.14.0",
"prost 0.14.1",
- "rand 0.9.1",
+ "rand 0.9.4",
"regex",
"rskafka",
"serde",
@@ -8253,7 +8412,7 @@ checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c"
dependencies = [
"libc",
"log",
- "wasi 0.11.1+wasi-snapshot-preview1",
+ "wasi",
"windows-sys 0.59.0",
]
@@ -8343,7 +8502,7 @@ dependencies = [
"prometheus 0.14.0",
"prost 0.14.1",
"puffin",
- "rand 0.9.1",
+ "rand 0.9.4",
"rayon",
"regex",
"roaring",
@@ -8656,6 +8815,12 @@ dependencies = [
"winapi",
]
+[[package]]
+name = "ndk-context"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "27b02d87554356db9e9a873add8782d4ea6e3e58ea071a9adb9a2e8ddb884a8b"
+
[[package]]
name = "neli"
version = "0.6.5"
@@ -8758,6 +8923,15 @@ dependencies = [
"memchr",
]
+[[package]]
+name = "nom-language"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "2de2bc5b451bfedaef92c90b8939a8fff5770bdcc1fafd6239d086aab8fa6b29"
+dependencies = [
+ "nom 8.0.0",
+]
+
[[package]]
name = "notify"
version = "8.0.0"
@@ -8841,6 +9015,12 @@ dependencies = [
"zeroize",
]
+[[package]]
+name = "num-cmp"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "63335b2e2c34fae2fb0aa2cecfd9f0832a1e24b3b32ecec612c3426d46dc8aaa"
+
[[package]]
name = "num-complex"
version = "0.4.6"
@@ -9001,6 +9181,31 @@ dependencies = [
"libc",
]
+[[package]]
+name = "objc2"
+version = "0.6.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "3a12a8ed07aefc768292f076dc3ac8c48f3781c8f2d5851dd3d98950e8c5a89f"
+dependencies = [
+ "objc2-encode",
+]
+
+[[package]]
+name = "objc2-encode"
+version = "4.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33"
+
+[[package]]
+name = "objc2-foundation"
+version = "0.3.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e3e0adef53c21f888deb4fa59fc59f7eb17404926ee8a6f59f5df0fd7f9f3272"
+dependencies = [
+ "bitflags 2.11.1",
+ "objc2",
+]
+
[[package]]
name = "object"
version = "0.36.7"
@@ -9040,7 +9245,7 @@ dependencies = [
"object_store_opendal",
"opendal",
"prometheus 0.14.0",
- "rand 0.9.1",
+ "rand 0.9.4",
"reqwest 0.13.2",
"serde",
"snafu 0.8.6",
@@ -9094,9 +9299,9 @@ dependencies = [
[[package]]
name = "octseq"
-version = "0.5.2"
+version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "126c3ca37c9c44cec575247f43a3e4374d8927684f129d2beeb0d2cef262fe12"
+checksum = "182eab3e1cd9cdc0ecf1ce3342d9844f3dc7d098f0694569bfdf327b612d69fd"
dependencies = [
"bytes",
"serde",
@@ -9552,7 +9757,7 @@ dependencies = [
"futures-util",
"opentelemetry 0.30.0",
"percent-encoding",
- "rand 0.9.1",
+ "rand 0.9.4",
"serde_json",
"thiserror 2.0.17",
"tokio",
@@ -9570,7 +9775,7 @@ dependencies = [
"futures-util",
"opentelemetry 0.31.0",
"percent-encoding",
- "rand 0.9.1",
+ "rand 0.9.4",
"thiserror 2.0.17",
]
@@ -9748,7 +9953,7 @@ dependencies = [
"paste",
"prost 0.14.1",
"prost-build 0.14.1",
- "rand 0.9.1",
+ "rand 0.9.4",
"replace_with",
"serde",
"smallvec",
@@ -10154,7 +10359,7 @@ dependencies = [
"md5",
"pg_interval_2",
"postgres-types",
- "rand 0.10.0",
+ "rand 0.10.1",
"rust_decimal",
"rustls-pki-types",
"ryu",
@@ -10531,7 +10736,7 @@ dependencies = [
"hmac",
"md-5 0.10.6",
"memchr",
- "rand 0.9.1",
+ "rand 0.9.4",
"sha2 0.10.9",
"stringprep",
]
@@ -10680,6 +10885,19 @@ dependencies = [
"syn 2.0.117",
]
+[[package]]
+name = "prettytable-rs"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "eea25e07510aa6ab6547308ebe3c036016d162b8da920dbb079e3ba8acf3d95a"
+dependencies = [
+ "encode_unicode",
+ "is-terminal",
+ "lazy_static",
+ "term 0.7.0",
+ "unicode-width 0.1.14",
+]
+
[[package]]
name = "proc-macro-crate"
version = "1.3.1"
@@ -10845,7 +11063,7 @@ checksum = "bee689443a2bd0a16ab0348b52ee43e3b2d1b1f931c8aa5c9f8de4c86fbe8c40"
dependencies = [
"bitflags 2.11.1",
"num-traits",
- "rand 0.9.1",
+ "rand 0.9.4",
"rand_chacha 0.9.0",
"rand_xorshift",
"regex-syntax",
@@ -11272,7 +11490,7 @@ dependencies = [
"promql",
"promql-parser",
"prost 0.14.1",
- "rand 0.9.1",
+ "rand 0.9.4",
"regex",
"serde",
"serde_json",
@@ -11345,9 +11563,9 @@ checksum = "434b42fec591c96ef50e21e886936e66d3cc3f737104fdb9b737c40ffb94c098"
dependencies = [
"aws-lc-rs",
"bytes",
- "getrandom 0.3.3",
+ "getrandom 0.3.4",
"lru-slab",
- "rand 0.9.1",
+ "rand 0.9.4",
"ring",
"rustc-hash 2.1.1",
"rustls",
@@ -11453,9 +11671,9 @@ dependencies = [
[[package]]
name = "rand"
-version = "0.9.1"
+version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9fbfd9d094a40bf3ae768db9361049ace4c0e04a4fd6b359518bd7b73a73dd97"
+checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.3",
@@ -11463,9 +11681,9 @@ dependencies = [
[[package]]
name = "rand"
-version = "0.10.0"
+version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bc266eb313df6c5c09c1c7b1fbe2510961e5bcd3add930c1e31f7ed9da0feff8"
+checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207"
dependencies = [
"chacha20 0.10.0",
"getrandom 0.4.1",
@@ -11508,7 +11726,7 @@ version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38"
dependencies = [
- "getrandom 0.3.3",
+ "getrandom 0.3.4",
]
[[package]]
@@ -11601,6 +11819,17 @@ dependencies = [
"bitflags 2.11.1",
]
+[[package]]
+name = "redox_users"
+version = "0.4.6"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "ba009ff324d1fc1b900bd1fdb31564febe58a8ccc8a6fdbb93b543d33b13ca43"
+dependencies = [
+ "getrandom 0.2.16",
+ "libredox",
+ "thiserror 1.0.69",
+]
+
[[package]]
name = "ref-cast"
version = "1.0.24"
@@ -11621,6 +11850,21 @@ dependencies = [
"syn 2.0.117",
]
+[[package]]
+name = "referencing"
+version = "0.38.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "15a8af0c6bb8eaf8b07cb06fc31ff30ca6fe19fb99afa476c276d8b24f365b0b"
+dependencies = [
+ "ahash 0.8.12",
+ "fluent-uri 0.4.1",
+ "getrandom 0.3.4",
+ "hashbrown 0.16.1",
+ "parking_lot 0.12.4",
+ "percent-encoding",
+ "serde_json",
+]
+
[[package]]
name = "regex"
version = "1.12.2"
@@ -11686,6 +11930,15 @@ version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
+[[package]]
+name = "relative-path"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bca40a312222d8ba74837cb474edef44b37f561da5f773981007a10bbaa992b0"
+dependencies = [
+ "serde",
+]
+
[[package]]
name = "rend"
version = "0.4.2"
@@ -11825,6 +12078,7 @@ dependencies = [
"futures-channel",
"futures-core",
"futures-util",
+ "h2 0.4.11",
"http 1.3.1",
"http-body 1.0.1",
"http-body-util",
@@ -11837,6 +12091,7 @@ dependencies = [
"pin-project-lite",
"quinn",
"rustls",
+ "rustls-native-certs 0.8.1",
"rustls-pki-types",
"serde",
"serde_json",
@@ -11898,6 +12153,50 @@ dependencies = [
"web-sys",
]
+[[package]]
+name = "reqwest-middleware"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "57f17d28a6e6acfe1733fe24bcd30774d13bffa4b8a22535b4c8c98423088d4e"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "http 1.3.1",
+ "reqwest 0.12.28",
+ "serde",
+ "thiserror 1.0.69",
+ "tower-service",
+]
+
+[[package]]
+name = "reqwest-retry"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "105747e3a037fe5bf17458d794de91149e575b6183fc72c85623a44abb9683f5"
+dependencies = [
+ "anyhow",
+ "async-trait",
+ "futures",
+ "getrandom 0.2.16",
+ "http 1.3.1",
+ "hyper 1.6.0",
+ "reqwest 0.12.28",
+ "reqwest-middleware",
+ "retry-policies",
+ "thiserror 2.0.17",
+ "tokio",
+ "wasmtimer",
+]
+
+[[package]]
+name = "retry-policies"
+version = "0.5.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "dc05fbf560421a0357a750cbe78c7ca19d4923918490daabba313d5dbc871e47"
+dependencies = [
+ "rand 0.10.1",
+]
+
[[package]]
name = "rgb"
version = "0.8.50"
@@ -11985,9 +12284,12 @@ dependencies = [
[[package]]
name = "roxmltree"
-version = "0.20.0"
+version = "0.21.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97"
+checksum = "f1964b10c76125c36f8afe190065a4bf9a87bf324842c05701330bba9f1cacbb"
+dependencies = [
+ "memchr",
+]
[[package]]
name = "rsa"
@@ -12042,7 +12344,7 @@ dependencies = [
"integer-encoding 4.0.2",
"lz4",
"parking_lot 0.12.4",
- "rand 0.9.1",
+ "rand 0.9.4",
"rsasl",
"rustls",
"snap",
@@ -12088,7 +12390,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
- "relative-path",
+ "relative-path 1.9.3",
"rustc_version",
"syn 2.0.117",
"unicode-ident",
@@ -12309,7 +12611,7 @@ checksum = "1d99feebc72bae7ab76ba994bb5e121b8d83d910ca40b36e0921f53becc41784"
dependencies = [
"core-foundation 0.10.1",
"core-foundation-sys",
- "jni",
+ "jni 0.21.1",
"log",
"once_cell",
"rustls",
@@ -12346,6 +12648,25 @@ version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a0d197bd2c9dc6e53b84da9556a69ba4cdfab8619eb41a8bd1cc2027a0f6b1d"
+[[package]]
+name = "rustyline"
+version = "17.0.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e902948a25149d50edc1a8e0141aad50f54e22ba83ff988cf8f7c9ef07f50564"
+dependencies = [
+ "bitflags 2.11.1",
+ "cfg-if",
+ "clipboard-win",
+ "libc",
+ "log",
+ "memchr",
+ "nix 0.30.1",
+ "unicode-segmentation",
+ "unicode-width 0.2.1",
+ "utf8parse",
+ "windows-sys 0.60.2",
+]
+
[[package]]
name = "ryu"
version = "1.0.20"
@@ -12724,6 +13045,19 @@ dependencies = [
"unsafe-libyaml",
]
+[[package]]
+name = "serde_yaml_ng"
+version = "0.10.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7b4db627b98b36d4203a7b458cf3573730f2bb591b28871d916dfa9efabfd41f"
+dependencies = [
+ "indexmap 2.13.0",
+ "itoa",
+ "ryu",
+ "serde",
+ "unsafe-libyaml",
+]
+
[[package]]
name = "servers"
version = "1.1.0"
@@ -12820,7 +13154,7 @@ dependencies = [
"prost 0.14.1",
"query",
"quoted-string",
- "rand 0.9.1",
+ "rand 0.9.4",
"regex",
"reqwest 0.13.2",
"rust-embed",
@@ -12998,7 +13332,7 @@ version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c962f626b54771990066e5435ec8331d1462576cd2d1e62f24076ae014f92112"
dependencies = [
- "getrandom 0.3.3",
+ "getrandom 0.3.4",
"halfbrown",
"ref-cast",
"serde",
@@ -13007,6 +13341,16 @@ dependencies = [
"value-trait",
]
+[[package]]
+name = "simd_cesu8"
+version = "1.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "94f90157bb87cddf702797c5dadfa0be7d266cdf49e22da2fcaa32eff75b2c33"
+dependencies = [
+ "rustc_version",
+ "simdutf8",
+]
+
[[package]]
name = "simdutf8"
version = "0.1.5"
@@ -13898,12 +14242,12 @@ dependencies = [
[[package]]
name = "syslog_loose"
-version = "0.21.0"
+version = "0.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "161028c00842709450114c39db3b29f44c898055ed8833bb9b535aba7facf30e"
+checksum = "d6ec4df26907adce53e94eac201a9ba38744baea3bc97f34ffd591d5646231a6"
dependencies = [
"chrono",
- "nom 7.1.3",
+ "nom 8.0.0",
]
[[package]]
@@ -14147,12 +14491,23 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16"
dependencies = [
"fastrand",
- "getrandom 0.3.3",
+ "getrandom 0.3.4",
"once_cell",
"rustix 1.0.7",
"windows-sys 0.61.2",
]
+[[package]]
+name = "term"
+version = "0.7.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f"
+dependencies = [
+ "dirs-next",
+ "rustversion",
+ "winapi",
+]
+
[[package]]
name = "term"
version = "1.0.2"
@@ -14205,7 +14560,7 @@ dependencies = [
"nix 0.28.0",
"partition",
"paste",
- "rand 0.9.1",
+ "rand 0.9.4",
"rand_chacha 0.9.0",
"reqwest 0.13.2",
"rustls",
@@ -14292,7 +14647,7 @@ dependencies = [
"plugins",
"prost 0.14.1",
"query",
- "rand 0.9.1",
+ "rand 0.9.4",
"rstest",
"rstest_reuse",
"sea-query",
@@ -14582,7 +14937,7 @@ dependencies = [
"pin-project-lite",
"postgres-protocol",
"postgres-types",
- "rand 0.9.1",
+ "rand 0.9.4",
"socket2 0.5.10",
"tokio",
"tokio-util",
@@ -15233,6 +15588,12 @@ version = "0.3.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5"
+[[package]]
+name = "unicode-general-category"
+version = "1.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0b993bddc193ae5bd0d623b49ec06ac3e9312875fdae725a975c51db1cc1677f"
+
[[package]]
name = "unicode-ident"
version = "1.0.22"
@@ -15366,11 +15727,21 @@ checksum = "b672338555252d43fd2240c714dc444b8c6fb0a5c5335e65a07bba7742735ddb"
dependencies = [
"getrandom 0.4.1",
"js-sys",
- "rand 0.9.1",
+ "rand 0.9.4",
"serde_core",
"wasm-bindgen",
]
+[[package]]
+name = "uuid-simd"
+version = "0.8.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "23b082222b4f6619906941c17eb2297fff4c2fb96cb60164170522942a200bd8"
+dependencies = [
+ "outref",
+ "vsimd",
+]
+
[[package]]
name = "valuable"
version = "0.1.1"
@@ -15453,9 +15824,9 @@ dependencies = [
[[package]]
name = "vrl"
-version = "0.25.0"
+version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4f49394b948406ea1564aa00152e011d87a38ad35d277ebddda257a9ee39c419"
+checksum = "925a4d3321b18a200c82c3ec02ee2be2b4bf16db07a5ce7e2a9a888b795ea862"
dependencies = [
"aes",
"aes-siv",
@@ -15485,8 +15856,10 @@ dependencies = [
"domain",
"dyn-clone",
"encoding_rs",
+ "exitcode",
"fancy-regex",
"flate2",
+ "getrandom 0.3.4",
"grok",
"hex",
"hmac",
@@ -15496,12 +15869,15 @@ dependencies = [
"indexmap 2.13.0",
"indoc",
"influxdb-line-protocol",
+ "ipcrypt-rs",
"itertools 0.14.0",
+ "jsonschema",
"lalrpop",
"lalrpop-util",
"lz4_flex 0.11.6",
"md-5 0.10.6",
- "nom 7.1.3",
+ "nom 8.0.0",
+ "nom-language",
"ofb",
"onig",
"ordered-float 4.6.0",
@@ -15510,20 +15886,27 @@ dependencies = [
"percent-encoding",
"pest",
"pest_derive",
+ "prettytable-rs",
"prost 0.13.5",
"prost-reflect",
"psl",
"psl-types",
"publicsuffix",
"quoted_printable",
- "rand 0.8.5",
+ "rand 0.9.4",
"regex",
+ "relative-path 2.0.1",
+ "reqwest 0.12.28",
+ "reqwest-middleware",
+ "reqwest-retry",
"roxmltree",
"rust_decimal",
+ "rustyline",
"seahash",
"serde",
"serde_json",
"serde_yaml",
+ "serde_yaml_ng",
"sha-1",
"sha2 0.10.9",
"sha3",
@@ -15531,6 +15914,8 @@ dependencies = [
"snafu 0.8.6",
"snap",
"strip-ansi-escapes",
+ "strum 0.26.3",
+ "strum_macros 0.26.4",
"syslog_loose",
"termcolor",
"thiserror 2.0.17",
@@ -15541,7 +15926,9 @@ dependencies = [
"url",
"utf8-width",
"uuid",
+ "webbrowser",
"woothee",
+ "xxhash-rust",
"zstd",
]
@@ -15585,15 +15972,6 @@ version = "0.11.1+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
-[[package]]
-name = "wasi"
-version = "0.14.2+wasi-0.2.4"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3"
-dependencies = [
- "wit-bindgen-rt",
-]
-
[[package]]
name = "wasip2"
version = "1.0.2+wasi-0.2.9"
@@ -15734,6 +16112,20 @@ dependencies = [
"semver",
]
+[[package]]
+name = "wasmtimer"
+version = "0.4.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1c598d6b99ea013e35844697fc4670d08339d5cda15588f193c6beedd12f644b"
+dependencies = [
+ "futures",
+ "js-sys",
+ "parking_lot 0.12.4",
+ "pin-utils",
+ "slab",
+ "wasm-bindgen",
+]
+
[[package]]
name = "web-sys"
version = "0.3.95"
@@ -15754,6 +16146,22 @@ dependencies = [
"wasm-bindgen",
]
+[[package]]
+name = "webbrowser"
+version = "1.2.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0fc95580916af1e68ff6a7be07446fc5db73ebf71cf092de939bbf5f7e189f72"
+dependencies = [
+ "core-foundation 0.10.1",
+ "jni 0.22.4",
+ "log",
+ "ndk-context",
+ "objc2",
+ "objc2-foundation",
+ "url",
+ "web-sys",
+]
+
[[package]]
name = "webpki"
version = "0.22.4"
@@ -16040,6 +16448,15 @@ dependencies = [
"windows-targets 0.52.6",
]
+[[package]]
+name = "windows-sys"
+version = "0.60.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb"
+dependencies = [
+ "windows-targets 0.53.5",
+]
+
[[package]]
name = "windows-sys"
version = "0.61.2"
@@ -16088,13 +16505,30 @@ dependencies = [
"windows_aarch64_gnullvm 0.52.6",
"windows_aarch64_msvc 0.52.6",
"windows_i686_gnu 0.52.6",
- "windows_i686_gnullvm",
+ "windows_i686_gnullvm 0.52.6",
"windows_i686_msvc 0.52.6",
"windows_x86_64_gnu 0.52.6",
"windows_x86_64_gnullvm 0.52.6",
"windows_x86_64_msvc 0.52.6",
]
+[[package]]
+name = "windows-targets"
+version = "0.53.5"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "4945f9f551b88e0d65f3db0bc25c33b8acea4d9e41163edf90dcd0b19f9069f3"
+dependencies = [
+ "windows-link 0.2.1",
+ "windows_aarch64_gnullvm 0.53.1",
+ "windows_aarch64_msvc 0.53.1",
+ "windows_i686_gnu 0.53.1",
+ "windows_i686_gnullvm 0.53.1",
+ "windows_i686_msvc 0.53.1",
+ "windows_x86_64_gnu 0.53.1",
+ "windows_x86_64_gnullvm 0.53.1",
+ "windows_x86_64_msvc 0.53.1",
+]
+
[[package]]
name = "windows-threading"
version = "0.1.0"
@@ -16122,6 +16556,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
+[[package]]
+name = "windows_aarch64_gnullvm"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53"
+
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
@@ -16140,6 +16580,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
+[[package]]
+name = "windows_aarch64_msvc"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006"
+
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
@@ -16158,12 +16604,24 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
+[[package]]
+name = "windows_i686_gnu"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "960e6da069d81e09becb0ca57a65220ddff016ff2d6af6a223cf372a506593a3"
+
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
+[[package]]
+name = "windows_i686_gnullvm"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c"
+
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
@@ -16182,6 +16640,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
+[[package]]
+name = "windows_i686_msvc"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2"
+
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
@@ -16200,6 +16664,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
+[[package]]
+name = "windows_x86_64_gnu"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499"
+
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
@@ -16218,6 +16688,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
+[[package]]
+name = "windows_x86_64_gnullvm"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1"
+
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
@@ -16236,6 +16712,12 @@ version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
+[[package]]
+name = "windows_x86_64_msvc"
+version = "0.53.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
+
[[package]]
name = "winnow"
version = "0.5.40"
@@ -16283,15 +16765,6 @@ dependencies = [
"wit-parser",
]
-[[package]]
-name = "wit-bindgen-rt"
-version = "0.39.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1"
-dependencies = [
- "bitflags 2.11.1",
-]
-
[[package]]
name = "wit-bindgen-rust"
version = "0.51.0"
@@ -16445,6 +16918,12 @@ dependencies = [
"rustix 1.0.7",
]
+[[package]]
+name = "xxhash-rust"
+version = "0.8.15"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
+
[[package]]
name = "yaml-rust"
version = "0.4.5"
diff --git a/Cargo.toml b/Cargo.toml
index 56200a24d6..ee8d4dcf11 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -259,7 +259,7 @@ tracing-opentelemetry = "0.31.0"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
typetag = "0.2"
uuid = { version = "1.17", features = ["serde", "v4", "v7", "fast-rng"] }
-vrl = "0.25"
+vrl = "0.33"
zstd = "0.13"
# DO_NOT_REMOVE_THIS: END_OF_EXTERNAL_DEPENDENCIES
diff --git a/config/config.md b/config/config.md
index d9cffaf122..df06d2153c 100644
--- a/config/config.md
+++ b/config/config.md
@@ -451,6 +451,7 @@
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. |
+| `concurrent_query_limiter_timeout` | String | `100ms` | Timeout to acquire a permit from the concurrent query limiter when `max_concurrent_queries` is reached. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index d558918daf..9351c4e85d 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -20,6 +20,9 @@ init_regions_parallelism = 16
## The maximum concurrent queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0
+## Timeout to acquire a permit from the concurrent query limiter when `max_concurrent_queries` is reached.
+concurrent_query_limiter_timeout = "100ms"
+
## Enable telemetry to collect anonymous usage data. Enabled by default.
#+ enable_telemetry = true
diff --git a/src/cli/src/data/export_v2/command.rs b/src/cli/src/data/export_v2/command.rs
index db0f576a4e..bb027bbef1 100644
--- a/src/cli/src/data/export_v2/command.rs
+++ b/src/cli/src/data/export_v2/command.rs
@@ -1077,7 +1077,9 @@ async fn verify_snapshot(storage: &OpenDalStorage) -> Result {
));
}
let data_files = storage.list_files_recursive("data/").await?;
- if let Some(path) = data_files.first() {
+ // Report the lexicographically smallest path so the message is stable
+ // regardless of listing order across backends.
+ if let Some(path) = data_files.iter().min() {
report.push_error(format!(
"Schema-only snapshot should not contain data files (found '{}')",
path
@@ -1103,75 +1105,113 @@ fn summarize_chunks(manifest: &Manifest) -> VerifyChunkSummary {
}
}
+/// A data file declared by a completed chunk that is expected to exist in storage.
+#[derive(Debug)]
+struct ChunkFile {
+ chunk_id: u32,
+ path: String,
+}
+
+/// Expected snapshot contents derived purely from the manifest (no object-store IO).
+///
+/// Separating planning from scanning makes it obvious which problems come from
+/// the manifest alone and which require comparing against actual storage.
+#[derive(Debug, Default)]
+struct VerifyPlan {
+ /// Valid data files declared by completed chunks; each must exist in storage.
+ files_to_check: Vec,
+ /// All syntactically-safe data paths declared by any chunk, regardless of
+ /// status. Used as the orphan-detection baseline so a listed-but-invalid
+ /// file is not also reported as unexpected.
+ claimed_data_files: HashSet,
+ /// Total data-file references in completed chunks (valid + invalid).
+ data_files_total: usize,
+ /// Problems detectable from the manifest alone.
+ problems: Vec,
+}
+
+/// Actual data files discovered under `data/` (the only object-store IO in
+/// chunk/data-file verification).
+#[derive(Debug)]
+struct VerifyDataScan {
+ existing_data_files: HashSet,
+}
+
+/// Result of reconciling the manifest plan against the storage scan.
+#[derive(Debug, Default)]
+struct VerifyOutcome {
+ data_files_total: usize,
+ data_files_verified: usize,
+ problems: Vec,
+}
+
async fn verify_chunks_and_data_files(
storage: &OpenDalStorage,
report: &mut VerifyReport,
) -> Result<()> {
- let existing_files: HashSet<_> = storage
- .list_files_recursive("data/")
- .await?
- .into_iter()
- .collect();
- let mut data_files_total = 0;
- let mut data_files_verified = 0;
- let mut problems = Vec::new();
- let mut seen_chunk_ids = HashSet::new();
- let mut claimed_data_files = HashSet::new();
+ let plan = build_verify_plan(&report.manifest);
+ let scan = scan_data_files(storage).await?;
+ let outcome = reconcile_plan_with_scan(plan, &scan);
- for chunk in &report.manifest.chunks {
+ report.data_files_total = outcome.data_files_total;
+ report.data_files_verified = outcome.data_files_verified;
+ report.problems.extend(outcome.problems);
+
+ Ok(())
+}
+
+/// Builds the expected-state plan from the manifest. Pure; performs no IO.
+fn build_verify_plan(manifest: &Manifest) -> VerifyPlan {
+ let mut plan = VerifyPlan::default();
+ let mut seen_chunk_ids = HashSet::new();
+
+ for chunk in &manifest.chunks {
if !seen_chunk_ids.insert(chunk.id) {
- problems.push(VerifyProblem {
+ plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: duplicate chunk id", chunk.id),
});
}
for file in &chunk.files {
if let Some(path) = safe_manifest_data_file_path(file) {
- claimed_data_files.insert(path.to_string());
+ plan.claimed_data_files.insert(path.to_string());
}
}
match chunk.status {
ChunkStatus::Completed => {
if chunk.files.is_empty() {
- problems.push(VerifyProblem {
+ plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: completed chunk has no data files", chunk.id),
});
continue;
}
- let allowed_prefixes = report
- .manifest
+ let allowed_prefixes = manifest
.schemas
.iter()
.map(|schema| data_dir_for_schema_chunk(schema, chunk.id))
.collect::>();
for file in &chunk.files {
- data_files_total += 1;
- let Some(path) = valid_manifest_data_file_path(file, &allowed_prefixes) else {
- problems.push(VerifyProblem {
+ plan.data_files_total += 1;
+ match valid_manifest_data_file_path(file, &allowed_prefixes) {
+ Some(path) => plan.files_to_check.push(ChunkFile {
+ chunk_id: chunk.id,
+ path: path.to_string(),
+ }),
+ None => plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!(
"Chunk {}: invalid data file path '{}'",
chunk.id, file
),
- });
- continue;
- };
-
- if existing_files.contains(path) {
- data_files_verified += 1;
- } else {
- problems.push(VerifyProblem {
- severity: VerifySeverity::Error,
- message: format!("Chunk {}: missing file '{}'", chunk.id, path),
- });
+ }),
}
}
}
ChunkStatus::Skipped => {
if !chunk.files.is_empty() {
- problems.push(VerifyProblem {
+ plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!(
"Chunk {}: skipped chunk should not list data files",
@@ -1181,20 +1221,20 @@ async fn verify_chunks_and_data_files(
}
}
ChunkStatus::Pending => {
- problems.push(VerifyProblem {
+ plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: status is 'pending'", chunk.id),
});
}
ChunkStatus::InProgress => {
- problems.push(VerifyProblem {
+ plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: status is 'in_progress'", chunk.id),
});
}
ChunkStatus::Failed => {
let reason = chunk.error.as_deref().unwrap_or("unknown error");
- problems.push(VerifyProblem {
+ plan.problems.push(VerifyProblem {
severity: VerifySeverity::Error,
message: format!("Chunk {}: status is 'failed' (error: {})", chunk.id, reason),
});
@@ -1202,20 +1242,60 @@ async fn verify_chunks_and_data_files(
}
}
- for path in &existing_files {
- if !claimed_data_files.contains(path) {
+ plan
+}
+
+/// Lists all data files under `data/`. This is the only object-store IO in
+/// chunk/data-file verification.
+async fn scan_data_files(storage: &OpenDalStorage) -> Result {
+ let existing_data_files = storage
+ .list_files_recursive("data/")
+ .await?
+ .into_iter()
+ .collect();
+ Ok(VerifyDataScan {
+ existing_data_files,
+ })
+}
+
+/// Reconciles the manifest plan against the storage scan. Pure; performs no IO.
+///
+/// Emits missing-file problems for expected files absent from storage and
+/// unexpected-file problems for storage files no chunk claims. Unexpected files
+/// are sorted by path so output is deterministic regardless of listing order.
+fn reconcile_plan_with_scan(plan: VerifyPlan, scan: &VerifyDataScan) -> VerifyOutcome {
+ let mut problems = plan.problems;
+ let mut data_files_verified = 0;
+
+ for file in &plan.files_to_check {
+ if scan.existing_data_files.contains(&file.path) {
+ data_files_verified += 1;
+ } else {
problems.push(VerifyProblem {
severity: VerifySeverity::Error,
- message: format!("Unexpected data file '{}' is not listed in manifest", path),
+ message: format!("Chunk {}: missing file '{}'", file.chunk_id, file.path),
});
}
}
- report.data_files_total = data_files_total;
- report.data_files_verified = data_files_verified;
- report.problems.extend(problems);
+ let mut orphans: Vec<&String> = scan
+ .existing_data_files
+ .iter()
+ .filter(|path| !plan.claimed_data_files.contains(*path))
+ .collect();
+ orphans.sort();
+ for path in orphans {
+ problems.push(VerifyProblem {
+ severity: VerifySeverity::Error,
+ message: format!("Unexpected data file '{}' is not listed in manifest", path),
+ });
+ }
- Ok(())
+ VerifyOutcome {
+ data_files_total: plan.data_files_total,
+ data_files_verified,
+ problems,
+ }
}
fn valid_manifest_data_file_path<'a>(
@@ -2294,6 +2374,90 @@ mod tests {
);
}
+ #[test]
+ fn test_build_verify_plan_classifies_chunks_without_io() {
+ let mut manifest = test_manifest(
+ chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
+ false,
+ true,
+ );
+ // test_manifest(complete) gives: chunk 1 completed (1 file), chunk 2 skipped.
+ let mut failed = ChunkMeta::new(3, TimeRange::unbounded());
+ failed.mark_failed("boom".to_string());
+ manifest.chunks.push(failed);
+ manifest
+ .chunks
+ .push(ChunkMeta::new(4, TimeRange::unbounded()));
+
+ let plan = build_verify_plan(&manifest);
+
+ assert_eq!(plan.files_to_check.len(), 1);
+ assert_eq!(plan.files_to_check[0].chunk_id, 1);
+ assert_eq!(plan.files_to_check[0].path, "data/public/1/file.parquet");
+ assert_eq!(plan.data_files_total, 1);
+ assert!(
+ plan.claimed_data_files
+ .contains("data/public/1/file.parquet")
+ );
+ assert_eq!(plan.problems.len(), 2);
+ assert!(
+ plan.problems
+ .iter()
+ .any(|problem| problem.message.contains("status is 'failed'"))
+ );
+ assert!(
+ plan.problems
+ .iter()
+ .any(|problem| problem.message.contains("status is 'pending'"))
+ );
+ }
+
+ #[tokio::test]
+ async fn test_verify_snapshot_produces_deterministic_problem_output() {
+ let dir = tempdir().unwrap();
+ let manifest = test_manifest(
+ chrono::Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap(),
+ false,
+ true,
+ );
+ write_root_manifest(dir.path(), manifest);
+ write_snapshot_file(dir.path(), "schema/schemas.json", b"[]");
+ write_default_ddl_files(dir.path());
+ write_snapshot_file(dir.path(), "data/public/1/file.parquet", b"data");
+ // Many orphan files under a known chunk prefix to stress ordering.
+ for i in 0..50 {
+ write_snapshot_file(
+ dir.path(),
+ &format!("data/public/1/orphan_{:02}.parquet", i),
+ b"x",
+ );
+ }
+
+ let storage = file_storage_for_dir(dir.path());
+ let messages = |report: &VerifyReport| {
+ report
+ .problems
+ .iter()
+ .map(|problem| problem.message.clone())
+ .collect::>()
+ };
+ let first = messages(&verify_snapshot(&storage).await.unwrap());
+ let second = messages(&verify_snapshot(&storage).await.unwrap());
+
+ // Output is identical across runs despite HashSet-based scanning.
+ assert_eq!(first, second);
+
+ let orphans = first
+ .iter()
+ .filter(|message| message.contains("Unexpected data file"))
+ .cloned()
+ .collect::>();
+ assert_eq!(orphans.len(), 50);
+ let mut sorted = orphans.clone();
+ sorted.sort();
+ assert_eq!(orphans, sorted);
+ }
+
fn write_test_manifest(root: &std::path::Path, dir: &str, manifest: Manifest) {
let snapshot_dir = root.join(dir);
std::fs::create_dir_all(&snapshot_dir).unwrap();
diff --git a/src/cmd/src/datanode/scanbench.rs b/src/cmd/src/datanode/scanbench.rs
index b26705991c..b2a715ad31 100644
--- a/src/cmd/src/datanode/scanbench.rs
+++ b/src/cmd/src/datanode/scanbench.rs
@@ -524,6 +524,7 @@ impl ScanbenchCommand {
options: HashMap::default(),
skip_wal_replay: !self.enable_wal,
checkpoint: None,
+ requirements: Default::default(),
};
engine
diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs
index e36f94c0d2..d9d7b8b648 100644
--- a/src/common/datasource/src/file_format.rs
+++ b/src/common/datasource/src/file_format.rs
@@ -61,6 +61,7 @@ pub const FORMAT_COMPRESSION_TYPE: &str = "compression_type";
pub const FORMAT_DELIMITER: &str = "delimiter";
pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
pub const FORMAT_HAS_HEADER: &str = "has_header";
+pub const FORMAT_SKIP_BAD_RECORDS: &str = "skip_bad_records";
pub const FORMAT_TYPE: &str = "format";
pub const FILE_PATTERN: &str = "pattern";
pub const TIMESTAMP_FORMAT: &str = "timestamp_format";
diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs
index 77ea553f35..2b39051b48 100644
--- a/src/common/datasource/src/file_format/csv.rs
+++ b/src/common/datasource/src/file_format/csv.rs
@@ -13,15 +13,24 @@
// limitations under the License.
use std::collections::HashMap;
+use std::io;
use std::str::FromStr;
+use std::sync::Arc;
+use std::task::Poll;
use arrow::csv::reader::Format;
use arrow::csv::{self, WriterBuilder};
+use arrow::error::ArrowError;
use arrow::record_batch::RecordBatch;
-use arrow_schema::Schema;
+use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
+use bytes::{Buf, Bytes};
use common_runtime;
+use common_telemetry::warn;
use datafusion::physical_plan::SendableRecordBatchStream;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use futures::StreamExt;
+use futures::stream::BoxStream;
use object_store::ObjectStore;
use snafu::ResultExt;
use tokio_util::compat::FuturesAsyncReadCompatExt;
@@ -34,9 +43,12 @@ use crate::file_format::{self, FileFormat, stream_to_file};
use crate::share_buffer::SharedBuffer;
use crate::util::normalize_infer_schema;
+const SKIP_BAD_RECORDS_BATCH_SIZE: usize = 1;
+
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CsvFormat {
pub has_header: bool,
+ pub skip_bad_records: bool,
pub delimiter: u8,
pub schema_infer_max_record: Option,
pub compression_type: CompressionType,
@@ -76,13 +88,11 @@ impl TryFrom<&HashMap> for CsvFormat {
})?);
};
if let Some(has_header) = value.get(file_format::FORMAT_HAS_HEADER) {
- format.has_header = has_header.parse().map_err(|_| {
- error::ParseFormatSnafu {
- key: file_format::FORMAT_HAS_HEADER,
- value: has_header,
- }
- .build()
- })?;
+ format.has_header = parse_bool(file_format::FORMAT_HAS_HEADER, has_header)?;
+ };
+ if let Some(skip_bad_records) = value.get(file_format::FORMAT_SKIP_BAD_RECORDS) {
+ format.skip_bad_records =
+ parse_bool(file_format::FORMAT_SKIP_BAD_RECORDS, skip_bad_records)?;
};
if let Some(timestamp_format) = value.get(file_format::TIMESTAMP_FORMAT) {
format.timestamp_format = Some(timestamp_format.clone());
@@ -97,10 +107,17 @@ impl TryFrom<&HashMap> for CsvFormat {
}
}
+fn parse_bool(key: &'static str, value: &str) -> Result {
+ value
+ .parse()
+ .map_err(|_| error::ParseFormatSnafu { key, value }.build())
+}
+
impl Default for CsvFormat {
fn default() -> Self {
Self {
has_header: true,
+ skip_bad_records: false,
delimiter: b',',
schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
compression_type: CompressionType::Uncompressed,
@@ -189,10 +206,136 @@ impl DfRecordBatchEncoder for csv::Writer {
}
}
+/// Builds a CSV stream that can skip selected record-level parse/cast errors.
+///
+/// This recovery path intentionally uses one-record batches. It is slower than
+/// normal CSV scanning, but keeps each parse/cast failure isolated to a single
+/// record. Arrow's CSV decoder clears buffered rows before type parsing, so a
+/// failed multi-row flush cannot be safely retried row by row without replaying
+/// input bytes.
+pub async fn tolerant_csv_stream(
+ store: &ObjectStore,
+ path: &str,
+ schema: SchemaRef,
+ projection: Vec,
+ format: &CsvFormat,
+) -> Result {
+ let meta = store
+ .stat(path)
+ .await
+ .context(error::ReadObjectSnafu { path })?;
+
+ let reader = store
+ .reader(path)
+ .await
+ .context(error::ReadObjectSnafu { path })?
+ .into_bytes_stream(0..meta.content_length())
+ .await
+ .context(error::ReadObjectSnafu { path })?;
+
+ let reader = format.compression_type.convert_stream(reader).boxed();
+ tolerant_csv_stream_from_reader(
+ reader,
+ path,
+ schema,
+ projection,
+ format.has_header,
+ format.delimiter,
+ )
+}
+
+fn tolerant_csv_stream_from_reader(
+ reader: BoxStream<'static, io::Result>,
+ path: &str,
+ schema: SchemaRef,
+ projection: Vec,
+ has_header: bool,
+ delimiter: u8,
+) -> Result {
+ let projected_schema = Arc::new(
+ schema
+ .project(&projection)
+ .context(error::InferSchemaSnafu)?,
+ );
+ let mut decoder = csv::ReaderBuilder::new(schema)
+ .with_header(has_header)
+ .with_delimiter(delimiter)
+ .with_batch_size(SKIP_BAD_RECORDS_BATCH_SIZE)
+ .with_projection(projection)
+ .build_decoder();
+
+ let path = path.to_string();
+ let mut upstream = reader.fuse();
+ let mut buffered = Bytes::new();
+ let mut input_finished = false;
+ let stream = futures::stream::poll_fn(move |cx| {
+ loop {
+ while !input_finished {
+ if buffered.is_empty() {
+ match futures::ready!(upstream.poll_next_unpin(cx)) {
+ Some(Ok(bytes)) if bytes.is_empty() => continue,
+ Some(Ok(bytes)) => buffered = bytes,
+ Some(Err(error)) => return Poll::Ready(Some(Err(error.into()))),
+ None => input_finished = true,
+ }
+ }
+
+ let decoded = decoder.decode(buffered.as_ref())?;
+ if decoded > 0 {
+ buffered.advance(decoded);
+ continue;
+ }
+
+ if decoder.capacity() == 0 || input_finished {
+ break;
+ }
+
+ if buffered.is_empty() {
+ continue;
+ }
+
+ return Poll::Ready(Some(Err(ArrowError::ParseError(
+ "CSV decoder made no progress while input bytes remain".to_string(),
+ ))));
+ }
+
+ match decoder.flush() {
+ Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))),
+ Ok(None) if input_finished => return Poll::Ready(None),
+ Ok(None) => continue,
+ Err(error) if is_skippable_arrow_error(&error) => {
+ warn!(
+ "Skipping bad CSV record while copying from {}: {}",
+ path, error
+ );
+ }
+ Err(error) => return Poll::Ready(Some(Err(error))),
+ }
+ }
+ })
+ .map(|result: std::result::Result| result.map_err(Into::into));
+
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ projected_schema,
+ stream,
+ )))
+}
+
+pub fn is_skippable_arrow_error(error: &ArrowError) -> bool {
+ matches!(
+ error,
+ ArrowError::ParseError(_)
+ | ArrowError::CastError(_)
+ | ArrowError::ComputeError(_)
+ | ArrowError::InvalidArgumentError(_)
+ )
+}
+
#[cfg(test)]
mod tests {
use std::sync::Arc;
+ use arrow_schema::{DataType, Field};
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_test_util::find_workspace_path;
@@ -205,7 +348,7 @@ mod tests {
use super::*;
use crate::file_format::{
FORMAT_COMPRESSION_TYPE, FORMAT_DELIMITER, FORMAT_HAS_HEADER,
- FORMAT_SCHEMA_INFER_MAX_RECORD, FileFormat, file_to_stream,
+ FORMAT_SCHEMA_INFER_MAX_RECORD, FORMAT_SKIP_BAD_RECORDS, FileFormat, file_to_stream,
};
use crate::test_util::{format_schema, test_store};
@@ -331,11 +474,29 @@ mod tests {
schema_infer_max_record: Some(2000),
delimiter: b'\t',
has_header: false,
+ skip_bad_records: false,
timestamp_format: None,
time_format: None,
date_format: None
}
);
+
+ let map = HashMap::from([(FORMAT_SKIP_BAD_RECORDS.to_string(), "true".to_string())]);
+ let format = CsvFormat::try_from(&map).unwrap();
+
+ assert_eq!(
+ format,
+ CsvFormat {
+ skip_bad_records: true,
+ ..CsvFormat::default()
+ }
+ );
+ }
+
+ #[test]
+ fn test_try_from_rejects_invalid_bool_options() {
+ let map = HashMap::from([(FORMAT_SKIP_BAD_RECORDS.to_string(), "yes".to_string())]);
+ assert!(CsvFormat::try_from(&map).is_err());
}
#[tokio::test]
@@ -496,4 +657,63 @@ mod tests {
assert_eq!(expected, pretty_print);
}
}
+
+ #[tokio::test]
+ async fn test_tolerant_csv_stream_continues_after_parse_error() {
+ let temp_dir = common_test_util::temp_dir::create_temp_dir("test_tolerant_csv_stream");
+ let csv_file_path = temp_dir.path().join("input.csv");
+ std::fs::write(
+ &csv_file_path,
+ "id,name,value\n1,Alice,10.5\nbad,Bad,20.0\nworse,Bad,21.0\n2,Bob,30.5",
+ )
+ .unwrap();
+
+ let store = test_store("/");
+ let schema = Arc::new(arrow_schema::Schema::new(vec![
+ Field::new("id", DataType::UInt32, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("value", DataType::Float64, false),
+ ]));
+ let path = csv_file_path.to_str().unwrap();
+
+ let stream =
+ tolerant_csv_stream(&store, path, schema, vec![0, 1, 2], &CsvFormat::default())
+ .await
+ .unwrap();
+ let batches = stream.try_collect::>().await.unwrap();
+ let pretty_print = arrow::util::pretty::pretty_format_batches(&batches)
+ .unwrap()
+ .to_string();
+ let expected = r#"+----+-------+-------+
+| id | name | value |
++----+-------+-------+
+| 1 | Alice | 10.5 |
+| 2 | Bob | 30.5 |
++----+-------+-------+"#;
+ assert_eq!(expected, pretty_print);
+ }
+
+ #[tokio::test]
+ async fn test_tolerant_csv_stream_fails_on_structural_csv_error() {
+ let temp_dir =
+ common_test_util::temp_dir::create_temp_dir("test_tolerant_csv_stream_csv_error");
+ let csv_file_path = temp_dir.path().join("input.csv");
+ std::fs::write(&csv_file_path, "id,name,value\n1,Alice,10.5\n2,Bob\n").unwrap();
+
+ let store = test_store("/");
+ let schema = Arc::new(arrow_schema::Schema::new(vec![
+ Field::new("id", DataType::UInt32, false),
+ Field::new("name", DataType::Utf8, false),
+ Field::new("value", DataType::Float64, false),
+ ]));
+ let path = csv_file_path.to_str().unwrap();
+
+ let stream =
+ tolerant_csv_stream(&store, path, schema, vec![0, 1, 2], &CsvFormat::default())
+ .await
+ .unwrap();
+ let error = stream.try_collect::>().await.unwrap_err();
+
+ assert!(error.to_string().contains("incorrect number of fields"));
+ }
}
diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs
index 3fa6b1bad0..6872b9ad55 100644
--- a/src/common/meta/src/instruction.rs
+++ b/src/common/meta/src/instruction.rs
@@ -18,7 +18,7 @@ use std::time::Duration;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use store_api::region_engine::SyncRegionFromRequest;
-use store_api::region_request::RegionFlushReason;
+use store_api::region_request::{RegionFlushReason, RegionRequirements};
use store_api::storage::{FileRefsManifest, GcReport, RegionId, RegionNumber};
use strum::Display;
use table::metadata::TableId;
@@ -179,12 +179,24 @@ impl Display for OpenRegion {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
- "OpenRegion(region_ident={}, region_storage_path={})",
- self.region_ident, self.region_storage_path
+ "OpenRegion(region_ident={}, region_storage_path={}, reason={:?})",
+ self.region_ident, self.region_storage_path, self.reason
)
}
}
+/// The reason why an open region instruction is triggered.
+#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
+pub enum OpenRegionReason {
+ /// Open triggered before region migration.
+ RegionMigration,
+ /// Open triggered by region failover.
+ RegionFailover,
+ /// Open triggered when adding a follower region.
+ #[cfg(feature = "enterprise")]
+ RegionFollower,
+}
+
#[serde_with::serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct OpenRegion {
@@ -196,6 +208,10 @@ pub struct OpenRegion {
pub region_wal_options: HashMap,
#[serde(default)]
pub skip_wal_replay: bool,
+ #[serde(default, skip_serializing_if = "Option::is_none")]
+ pub reason: Option,
+ #[serde(default)]
+ pub requirements: RegionRequirements,
}
impl OpenRegion {
@@ -205,6 +221,8 @@ impl OpenRegion {
region_options: HashMap,
region_wal_options: HashMap,
skip_wal_replay: bool,
+ reason: Option,
+ requirements: RegionRequirements,
) -> Self {
Self {
region_ident,
@@ -212,6 +230,8 @@ impl OpenRegion {
region_options,
region_wal_options,
skip_wal_replay,
+ reason,
+ requirements,
}
}
}
@@ -1126,11 +1146,13 @@ mod tests {
HashMap::new(),
HashMap::new(),
false,
+ None,
+ RegionRequirements::empty(),
)]);
let serialized = serde_json::to_string(&open_region).unwrap();
assert_eq!(
- r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false}]}"#,
+ r#"{"OpenRegions":[{"region_ident":{"datanode_id":2,"table_id":1024,"region_number":1,"engine":"mito2"},"region_storage_path":"test/foo","region_options":{},"region_wal_options":{},"skip_wal_replay":false,"requirements":{"object_storage":false}}]}"#,
serialized
);
@@ -1213,6 +1235,8 @@ mod tests {
HashMap::new(),
HashMap::new(),
false,
+ None,
+ RegionRequirements::empty(),
)]);
assert_eq!(open_region_instruction, open_region);
@@ -1368,10 +1392,41 @@ mod tests {
region_options,
region_wal_options: HashMap::new(),
skip_wal_replay: false,
+ reason: None,
+ requirements: RegionRequirements::empty(),
};
assert_eq!(expected, deserialized);
}
+ #[test]
+ fn test_serialize_open_region_with_reason_and_requirements() {
+ let open_region = OpenRegion::new(
+ RegionIdent {
+ datanode_id: 2,
+ table_id: 1024,
+ region_number: 1,
+ engine: "mito2".to_string(),
+ },
+ "test/foo",
+ HashMap::new(),
+ HashMap::new(),
+ false,
+ Some(OpenRegionReason::RegionMigration),
+ RegionRequirements::object_storage(),
+ );
+
+ let serialized = serde_json::to_string(&open_region).unwrap();
+ assert!(serialized.contains(r#""reason":"RegionMigration""#));
+ assert!(serialized.contains(r#""object_storage":true"#));
+
+ let deserialized: OpenRegion = serde_json::from_str(&serialized).unwrap();
+ assert_eq!(Some(OpenRegionReason::RegionMigration), deserialized.reason);
+ assert_eq!(
+ RegionRequirements::object_storage(),
+ deserialized.requirements
+ );
+ }
+
#[test]
fn test_flush_regions_creation() {
let region_id = RegionId::new(1024, 1);
diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs
index 2ce306006b..b757c95121 100644
--- a/src/datanode/src/config.rs
+++ b/src/datanode/src/config.rs
@@ -14,6 +14,8 @@
//! Datanode configurations
+use std::time::Duration;
+
use common_base::readable_size::ReadableSize;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::memory::MemoryOptions;
@@ -75,6 +77,10 @@ pub struct DatanodeOptions {
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub max_concurrent_queries: usize,
+ /// Timeout to acquire a permit from the concurrent query limiter when
+ /// `max_concurrent_queries` is reached. Only effective when the limiter is enabled.
+ #[serde(with = "humantime_serde")]
+ pub concurrent_query_limiter_timeout: Duration,
/// Options for different store engines.
pub region_engine: Vec,
pub logging: LoggingOptions,
@@ -131,6 +137,7 @@ impl Default for DatanodeOptions {
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
max_concurrent_queries: 0,
+ concurrent_query_limiter_timeout: Duration::from_millis(100),
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index 9a2fe3d982..12d7c5109c 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -445,8 +445,7 @@ impl DatanodeBuilder {
event_listener,
table_provider_factory,
opts.max_concurrent_queries,
- //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
- Duration::from_millis(100),
+ opts.concurrent_query_limiter_timeout,
opts.grpc.flight_compression,
);
diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs
index 10948a3e7c..79e0baaef3 100644
--- a/src/datanode/src/heartbeat/handler.rs
+++ b/src/datanode/src/heartbeat/handler.rs
@@ -313,7 +313,7 @@ mod tests {
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionRole;
- use store_api::region_request::{RegionCloseRequest, RegionRequest};
+ use store_api::region_request::{RegionCloseRequest, RegionRequest, RegionRequirements};
use store_api::storage::RegionId;
use tokio::sync::mpsc::{self, Receiver};
@@ -442,6 +442,8 @@ mod tests {
HashMap::new(),
HashMap::new(),
false,
+ None,
+ RegionRequirements::empty(),
)])
}
diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs
index 56c07a3efe..9c483e588d 100644
--- a/src/datanode/src/heartbeat/handler/open_region.rs
+++ b/src/datanode/src/heartbeat/handler/open_region.rs
@@ -14,6 +14,7 @@
use common_meta::instruction::{InstructionReply, OpenRegion, SimpleReply};
use common_meta::wal_provider::prepare_wal_options;
+use common_telemetry::info;
use store_api::path_utils::table_dir;
use store_api::region_request::{PathType, RegionOpenRequest};
use store_api::storage::RegionId;
@@ -41,8 +42,13 @@ impl InstructionHandler for OpenRegionsHandler {
mut region_options,
region_wal_options,
skip_wal_replay,
+ reason,
+ requirements,
} = open_region;
let region_id = RegionId::new(region_ident.table_id, region_ident.region_number);
+ info!(
+ "Received open region instruction, region_id: {region_id}, reason: {reason:?}"
+ );
prepare_wal_options(&mut region_options, region_id, ®ion_wal_options);
let request = RegionOpenRequest {
engine: region_ident.engine,
@@ -51,6 +57,7 @@ impl InstructionHandler for OpenRegionsHandler {
options: region_options,
skip_wal_replay,
checkpoint: None,
+ requirements,
};
(region_id, request)
})
@@ -85,7 +92,7 @@ mod tests {
use mito2::engine::MITO_ENGINE_NAME;
use mito2::test_util::{CreateRequestBuilder, TestEnv};
use store_api::path_utils::table_dir;
- use store_api::region_request::{RegionCloseRequest, RegionRequest};
+ use store_api::region_request::{RegionCloseRequest, RegionRequest, RegionRequirements};
use store_api::storage::RegionId;
use crate::heartbeat::handler::RegionHeartbeatResponseHandler;
@@ -98,17 +105,21 @@ mod tests {
) -> Instruction {
let region_idents = region_ids
.into_iter()
- .map(|region_id| OpenRegion {
- region_ident: RegionIdent {
- datanode_id: 0,
- table_id: region_id.table_id(),
- region_number: region_id.region_number(),
- engine: MITO_ENGINE_NAME.to_string(),
- },
- region_storage_path: storage_path.to_string(),
- region_options: HashMap::new(),
- region_wal_options: HashMap::new(),
- skip_wal_replay: false,
+ .map(|region_id| {
+ OpenRegion::new(
+ RegionIdent {
+ datanode_id: 0,
+ table_id: region_id.table_id(),
+ region_number: region_id.region_number(),
+ engine: MITO_ENGINE_NAME.to_string(),
+ },
+ storage_path,
+ HashMap::new(),
+ HashMap::new(),
+ false,
+ None,
+ RegionRequirements::empty(),
+ )
})
.collect();
diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs
index d5711e1761..ce831353d1 100644
--- a/src/datanode/src/region_server.rs
+++ b/src/datanode/src/region_server.rs
@@ -49,6 +49,7 @@ use common_telemetry::{debug, error, info, warn};
use dashmap::DashMap;
use datafusion::datasource::TableProvider;
use datafusion_common::tree_node::TreeNode;
+use datatypes::schema::SchemaRef;
use either::Either;
use futures_util::Stream;
use futures_util::future::try_join_all;
@@ -82,7 +83,7 @@ use store_api::region_request::{
RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
-use tokio::sync::{Semaphore, SemaphorePermit};
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::timeout;
use tonic::{Request, Response, Result as TonicResult};
@@ -257,7 +258,7 @@ impl RegionServer {
request: api::v1::region::QueryRequest,
query_ctx: QueryContextRef,
) -> Result {
- let _permit = if let Some(p) = &self.inner.parallelism {
+ let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
} else {
None
@@ -298,14 +299,13 @@ impl RegionServer {
)
.await?;
- Ok(wrap_flow_region_watermark_stream(
- stream, region_id, &query_ctx,
- ))
+ let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx);
+ Ok(maybe_guard_stream(stream, permit))
}
#[tracing::instrument(skip_all)]
pub async fn handle_read(&self, request: QueryRequest) -> Result {
- let _permit = if let Some(p) = &self.inner.parallelism {
+ let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
} else {
None
@@ -332,9 +332,8 @@ impl RegionServer {
.handle_read(QueryRequest { plan, ..request }, query_ctx.clone())
.await?;
- Ok(wrap_flow_region_watermark_stream(
- stream, region_id, &query_ctx,
- ))
+ let stream = wrap_flow_region_watermark_stream(stream, region_id, &query_ctx);
+ Ok(maybe_guard_stream(stream, permit))
}
/// Returns all opened and reportable regions.
@@ -1058,7 +1057,7 @@ struct RegionServerInner {
}
struct RegionServerParallelism {
- semaphore: Semaphore,
+ semaphore: Arc,
timeout: Duration,
}
@@ -1071,19 +1070,68 @@ impl RegionServerParallelism {
return None;
}
Some(RegionServerParallelism {
- semaphore: Semaphore::new(max_concurrent_queries),
+ semaphore: Arc::new(Semaphore::new(max_concurrent_queries)),
timeout: concurrent_query_limiter_timeout,
})
}
- pub async fn acquire(&self) -> Result> {
- timeout(self.timeout, self.semaphore.acquire())
+ pub async fn acquire(&self) -> Result {
+ timeout(self.timeout, self.semaphore.clone().acquire_owned())
.await
.context(ConcurrentQueryLimiterTimeoutSnafu)?
.context(ConcurrentQueryLimiterClosedSnafu)
}
}
+/// Wraps a record batch stream and holds a concurrency permit until the stream is
+/// fully consumed (dropped), so `max_concurrent_queries` bounds the number of
+/// in-flight read streams, not just query planning.
+struct PermitGuardedStream {
+ inner: SendableRecordBatchStream,
+ _permit: OwnedSemaphorePermit,
+}
+
+impl RecordBatchStream for PermitGuardedStream {
+ fn name(&self) -> &str {
+ self.inner.name()
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.inner.schema()
+ }
+
+ fn output_ordering(&self) -> Option<&[OrderOption]> {
+ self.inner.output_ordering()
+ }
+
+ fn metrics(&self) -> Option {
+ self.inner.metrics()
+ }
+}
+
+impl Stream for PermitGuardedStream {
+ type Item = common_recordbatch::error::Result;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll