Compare commits

..

23 Commits

Author SHA1 Message Date
WenyXu
b363c044f9 feat(metric-engine): add metadata region cache
feat: use lru

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 12:26:48 +00:00
Weny Xu
f44816cc15 feat: add metrics for reconciliation procedures (#6652)
* feat: add metrics for reconciliation procedures

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: improve error handling

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix(datanode): handle ignore_nonexistent_region flag in open_all_regions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* refactor: merge metrics

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: minor refactor

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 12:23:56 +00:00
jeremyhi
c4c2b87615 fix: sequence peek with remote value (#6648)
* fix: sequence peek with remote value

* chore: more ut

* chore: add more ut
2025-08-06 12:23:35 +00:00
Weny Xu
1f194af999 fix: fix sequence peek method to return correct values when sequence is not initialized (#6643)
fix: improve sequence peek method to handle uninitialized sequences

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 12:23:22 +00:00
Weny Xu
e6482acd47 feat: introduce reconciliation interface (#6614)
* feat: introduce reconcile interface

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: upgrade proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 12:23:14 +00:00
Weny Xu
ba5cb48231 feat: introduce reconcile catalog procedure (#6613)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 12:18:19 +00:00
Weny Xu
a8166f800b refactor: remove procedure executor from DDL manager (#6625)
* refactor: remove procedure executor from DDL manager

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: clippy

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from  CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 12:18:08 +00:00
Weny Xu
6fdc0b99b3 feat: introduce reconcile logical tables procedure (#6588)
* feat: introduce reconcile logical tables procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: lock logical tables

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 12:14:00 +00:00
Weny Xu
d3a1c80fbd feat: introduce reconcile database procedure (#6612)
* feat: introduce reconcile database procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: hold the schema lock

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add todo

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: rename to `fast_fail`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add logs

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 12:13:54 +00:00
Weny Xu
1434582cc3 feat: introduce reconcile table procedure (#6584)
* feat: introduce `SyncColumns`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: introduce reconcile table procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggesions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: add comments

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 12:13:28 +00:00
Weny Xu
01e0ce6f29 feat: ignore internal keys in metadata snapshots (#6606)
feat: ignore dumpping internal keys

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:56:38 +00:00
Weny Xu
ce1d0b6c4c feat: allow setting next table id via http api (#6597)
* feat: allow reset next table id via http api

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggesions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:56:32 +00:00
Weny Xu
686ee9f579 feat: allow igoring nonexistent regions in recovery mode (#6592)
* feat: allow ignoring nonexistent regions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: ignore nonexistent regions during startup in recovery mode

Signed-off-by: WenyXu <wenymedia@gmail.com>

* feat: allow enabling recovery mode via http api

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:56:24 +00:00
Lanqing Yang
44a368bc4e feat: move metasrv admin to http server while keep tonic for backward compatibility (#6466)
* feat: move metasrv admin to http server while keep tonic for backward compatibility

Signed-off-by: lyang24 <lanqingy93@gmail.com>

* refactor with nest method

Signed-off-by: lyang24 <lanqingy93@gmail.com>

---------

Signed-off-by: lyang24 <lanqingy93@gmail.com>
Co-authored-by: lyang24 <lanqingy@usc.edu>
2025-08-06 11:56:16 +00:00
Weny Xu
6c54f2b6c0 feat: implement pause/resume functionality for procedure manager (#6393)
* feat: implement pause/resume functionality for procedure manager

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:52:34 +00:00
Weny Xu
9bb37a6a14 refactor: support multiple index operations in single alter region request (#6487)
* refactor: support multiple index operations in single alter region request

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update greptime-proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:48:29 +00:00
Yingwen
cfd6c1c3e0 feat: Support ListMetadataRequest to retrieve regions' metadata (#6348)
* feat: support list metadata in region server

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add test for list region metadata

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: return null if region not exists

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update greptime-proto

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2025-08-06 11:44:59 +00:00
Weny Xu
c0f40ce8ed feat: add table reconciliation utilities (#6519)
* feat: add table reconciliation utilities

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit tests

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestison from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update comment

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:30:19 +00:00
Weny Xu
c9501053e5 fix: fix state transition in create table procedure (#6523)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:30:09 +00:00
Weny Xu
e3ee08d300 refactor(meta): separate validation and execution logic in alter logical tables procedure (#6478)
* refactor(meta): separate validation and execution logic in alter logical tables procedure

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:22:05 +00:00
Weny Xu
76657e9c89 refactor(meta): extract AlterTableExecutor from AlterTableProcedure (#6470)
* refactor(meta): extract `AlterTableExecutor` from `AlterTableProcedure`

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:21:59 +00:00
Weny Xu
9629225f56 feat: add column metadata to response extensions (#6451)
Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:21:48 +00:00
Weny Xu
165f156e69 feat: persist column ids in table metadata (#6457)
* feat: persist column ids in table metadata

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
2025-08-06 11:17:58 +00:00
29 changed files with 270 additions and 679 deletions

View File

@@ -1,42 +0,0 @@
#!/bin/bash
# Get current version
CURRENT_VERSION=$1
if [ -z "$CURRENT_VERSION" ]; then
echo "Error: Failed to get current version"
exit 1
fi
# Get the latest version from GitHub Releases
API_RESPONSE=$(curl -s "https://api.github.com/repos/GreptimeTeam/greptimedb/releases/latest")
if [ -z "$API_RESPONSE" ] || [ "$(echo "$API_RESPONSE" | jq -r '.message')" = "Not Found" ]; then
echo "Error: Failed to fetch latest version from GitHub"
exit 1
fi
# Get the latest version
LATEST_VERSION=$(echo "$API_RESPONSE" | jq -r '.tag_name')
if [ -z "$LATEST_VERSION" ] || [ "$LATEST_VERSION" = "null" ]; then
echo "Error: No valid version found in GitHub releases"
exit 1
fi
# Cleaned up version number format (removed possible 'v' prefix and -nightly suffix)
CLEAN_CURRENT=$(echo "$CURRENT_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
CLEAN_LATEST=$(echo "$LATEST_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
echo "Current version: $CLEAN_CURRENT"
echo "Latest release version: $CLEAN_LATEST"
# Use sort -V to compare versions
HIGHER_VERSION=$(printf "%s\n%s" "$CLEAN_CURRENT" "$CLEAN_LATEST" | sort -V | tail -n1)
if [ "$HIGHER_VERSION" = "$CLEAN_CURRENT" ]; then
echo "Current version ($CLEAN_CURRENT) is NEWER than or EQUAL to latest ($CLEAN_LATEST)"
echo "should-push-latest-tag=true" >> $GITHUB_OUTPUT
else
echo "Current version ($CLEAN_CURRENT) is OLDER than latest ($CLEAN_LATEST)"
echo "should-push-latest-tag=false" >> $GITHUB_OUTPUT
fi

View File

@@ -110,8 +110,6 @@ jobs:
# The 'version' use as the global tag name of the release workflow.
version: ${{ steps.create-version.outputs.version }}
should-push-latest-tag: ${{ steps.check-version.outputs.should-push-latest-tag }}
steps:
- name: Checkout
uses: actions/checkout@v4
@@ -137,11 +135,6 @@ jobs:
GITHUB_REF_NAME: ${{ github.ref_name }}
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
- name: Check version
id: check-version
run: |
./.github/scripts/check-version.sh "${{ steps.create-version.outputs.version }}"
- name: Allocate linux-amd64 runner
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
uses: ./.github/actions/start-runner
@@ -321,7 +314,7 @@ jobs:
image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
image-registry-password: ${{ secrets.DOCKERHUB_TOKEN }}
version: ${{ needs.allocate-runners.outputs.version }}
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
push-latest-tag: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
- name: Set build image result
id: set-build-image-result
@@ -368,7 +361,7 @@ jobs:
dev-mode: false
upload-to-s3: true
update-version-info: true
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
push-latest-tag: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
publish-github-release:
name: Create GitHub release and upload artifacts

158
Cargo.lock generated
View File

@@ -211,7 +211,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"common-base",
"common-decimal",
@@ -944,7 +944,7 @@ dependencies = [
[[package]]
name = "auth"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -1586,7 +1586,7 @@ dependencies = [
[[package]]
name = "cache"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"catalog",
"common-error",
@@ -1621,7 +1621,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"arrow 54.2.1",
@@ -1659,8 +1659,6 @@ dependencies = [
"partition",
"paste",
"prometheus",
"promql-parser",
"rand 0.9.0",
"rustc-hash 2.0.0",
"serde_json",
"session",
@@ -1961,7 +1959,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
[[package]]
name = "cli"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-stream",
"async-trait",
@@ -2006,7 +2004,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.5",
"substrait 0.15.4",
"table",
"tempfile",
"tokio",
@@ -2015,7 +2013,7 @@ dependencies = [
[[package]]
name = "client"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"arc-swap",
@@ -2045,7 +2043,7 @@ dependencies = [
"rand 0.9.0",
"serde_json",
"snafu 0.8.5",
"substrait 0.15.5",
"substrait 0.15.4",
"substrait 0.37.3",
"tokio",
"tokio-stream",
@@ -2086,7 +2084,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-trait",
"auth",
@@ -2147,7 +2145,7 @@ dependencies = [
"snafu 0.8.5",
"stat",
"store-api",
"substrait 0.15.5",
"substrait 0.15.4",
"table",
"temp-env",
"tempfile",
@@ -2194,7 +2192,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"anymap2",
"async-trait",
@@ -2216,11 +2214,11 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "0.15.5"
version = "0.15.4"
[[package]]
name = "common-config"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"common-base",
"common-error",
@@ -2245,7 +2243,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"arrow 54.2.1",
"arrow-schema 54.3.1",
@@ -2282,7 +2280,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2295,7 +2293,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"common-macro",
"http 1.1.0",
@@ -2306,7 +2304,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-trait",
"common-error",
@@ -2315,7 +2313,6 @@ dependencies = [
"common-meta",
"greptime-proto",
"meta-client",
"session",
"snafu 0.8.5",
"tokio",
"tonic 0.12.3",
@@ -2323,7 +2320,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -2376,7 +2373,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-trait",
"common-runtime",
@@ -2393,7 +2390,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"arrow-flight",
@@ -2425,7 +2422,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"common-base",
@@ -2444,7 +2441,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"arc-swap",
"common-query",
@@ -2458,7 +2455,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"anyhow",
"common-error",
@@ -2474,7 +2471,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"anymap2",
"api",
@@ -2540,7 +2537,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2549,11 +2546,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "0.15.5"
version = "0.15.4"
[[package]]
name = "common-pprof"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"common-error",
"common-macro",
@@ -2565,7 +2562,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-stream",
"async-trait",
@@ -2592,7 +2589,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-trait",
"common-procedure",
@@ -2602,7 +2599,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -2628,7 +2625,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"arc-swap",
"common-error",
@@ -2648,7 +2645,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -2678,14 +2675,14 @@ dependencies = [
[[package]]
name = "common-session"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"strum 0.27.1",
]
[[package]]
name = "common-telemetry"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"backtrace",
"common-error",
@@ -2713,7 +2710,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"client",
"common-grpc",
@@ -2726,7 +2723,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"arrow 54.2.1",
"chrono",
@@ -2744,7 +2741,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"build-data",
"cargo-manifest",
@@ -2755,7 +2752,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"common-base",
"common-error",
@@ -2778,7 +2775,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"common-telemetry",
@@ -3734,7 +3731,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"arrow-flight",
@@ -3787,7 +3784,7 @@ dependencies = [
"session",
"snafu 0.8.5",
"store-api",
"substrait 0.15.5",
"substrait 0.15.4",
"table",
"tokio",
"toml 0.8.19",
@@ -3796,7 +3793,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"arrow 54.2.1",
"arrow-array 54.2.1",
@@ -4456,7 +4453,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
[[package]]
name = "file-engine"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -4593,7 +4590,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"arrow 54.2.1",
@@ -4658,7 +4655,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 0.15.5",
"substrait 0.15.4",
"table",
"tokio",
"tonic 0.12.3",
@@ -4713,7 +4710,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
[[package]]
name = "frontend"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"arc-swap",
@@ -4773,7 +4770,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"strfmt",
"substrait 0.15.5",
"substrait 0.15.4",
"table",
"tokio",
"tokio-util",
@@ -5163,7 +5160,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f3103a8c9b8ce162457d0a3e3ca00d53d1a8bd06#f3103a8c9b8ce162457d0a3e3ca00d53d1a8bd06"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=551c70f2b352b6bc55ab769129344aeab8f22d11#551c70f2b352b6bc55ab769129344aeab8f22d11"
dependencies = [
"prost 0.13.5",
"serde",
@@ -5934,7 +5931,7 @@ dependencies = [
[[package]]
name = "index"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -6819,7 +6816,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "log-query"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"chrono",
"common-error",
@@ -6831,7 +6828,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-stream",
"async-trait",
@@ -7129,7 +7126,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -7157,7 +7154,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -7253,7 +7250,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"aquamarine",
@@ -7345,7 +7342,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"bytes",
@@ -7368,7 +7365,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"aquamarine",
@@ -8118,7 +8115,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"anyhow",
"bytes",
@@ -8432,7 +8429,7 @@ dependencies = [
[[package]]
name = "operator"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -8487,7 +8484,7 @@ dependencies = [
"sql",
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"store-api",
"substrait 0.15.5",
"substrait 0.15.4",
"table",
"tokio",
"tokio-util",
@@ -8754,7 +8751,7 @@ dependencies = [
[[package]]
name = "partition"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -9042,7 +9039,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9185,7 +9182,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"auth",
"clap 4.5.19",
@@ -9498,7 +9495,7 @@ dependencies = [
[[package]]
name = "promql"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"async-trait",
@@ -9780,7 +9777,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-compression 0.4.13",
"async-trait",
@@ -9822,7 +9819,7 @@ dependencies = [
[[package]]
name = "query"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -9888,7 +9885,7 @@ dependencies = [
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
"statrs",
"store-api",
"substrait 0.15.5",
"substrait 0.15.4",
"table",
"tokio",
"tokio-stream",
@@ -11174,7 +11171,7 @@ dependencies = [
[[package]]
name = "servers"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"ahash 0.8.11",
"api",
@@ -11295,7 +11292,7 @@ dependencies = [
[[package]]
name = "session"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"arc-swap",
@@ -11634,7 +11631,7 @@ dependencies = [
[[package]]
name = "sql"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"chrono",
@@ -11689,7 +11686,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-trait",
"clap 4.5.19",
@@ -11989,7 +11986,7 @@ dependencies = [
[[package]]
name = "stat"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"nix 0.30.1",
]
@@ -12015,7 +12012,7 @@ dependencies = [
[[package]]
name = "store-api"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"aquamarine",
@@ -12176,7 +12173,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"async-trait",
"bytes",
@@ -12356,7 +12353,7 @@ dependencies = [
[[package]]
name = "table"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"async-trait",
@@ -12617,7 +12614,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "tests-fuzz"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"arbitrary",
"async-trait",
@@ -12661,7 +12658,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "0.15.5"
version = "0.15.4"
dependencies = [
"api",
"arrow-flight",
@@ -12728,7 +12725,7 @@ dependencies = [
"sql",
"sqlx",
"store-api",
"substrait 0.15.5",
"substrait 0.15.4",
"table",
"tempfile",
"time",
@@ -12738,7 +12735,6 @@ dependencies = [
"tonic 0.12.3",
"tower 0.5.2",
"url",
"urlencoding",
"uuid",
"yaml-rust",
"zstd 0.13.2",

View File

@@ -71,7 +71,7 @@ members = [
resolver = "2"
[workspace.package]
version = "0.15.5"
version = "0.15.4"
edition = "2021"
license = "Apache-2.0"
@@ -134,7 +134,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f3103a8c9b8ce162457d0a3e3ca00d53d1a8bd06" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "551c70f2b352b6bc55ab769129344aeab8f22d11" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -43,8 +43,6 @@ moka = { workspace = true, features = ["future", "sync"] }
partition.workspace = true
paste.workspace = true
prometheus.workspace = true
promql-parser.workspace = true
rand.workspace = true
rustc-hash.workspace = true
serde_json.workspace = true
session.workspace = true

View File

@@ -14,24 +14,17 @@
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{Debug, Display, Formatter};
use std::fmt::{Debug, Formatter};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant, UNIX_EPOCH};
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
use common_base::cancellation::CancellationHandle;
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
use common_frontend::slow_query_event::SlowQueryEvent;
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{debug, info, warn};
use common_time::util::current_time_millis;
use meta_client::MetaClientRef;
use promql_parser::parser::EvalStmt;
use rand::random;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use sql::statements::statement::Statement;
use tokio::sync::mpsc::Sender;
use crate::error;
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
@@ -51,23 +44,6 @@ pub struct ProcessManager {
frontend_selector: Option<MetaClientSelector>,
}
/// Represents a parsed query statement, functionally equivalent to [query::parser::QueryStatement].
/// This enum is defined here to avoid cyclic dependencies with the query parser module.
#[derive(Debug, Clone)]
pub enum QueryStatement {
Sql(Statement),
Promql(EvalStmt),
}
impl Display for QueryStatement {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
QueryStatement::Sql(stmt) => write!(f, "{}", stmt),
QueryStatement::Promql(eval_stmt) => write!(f, "{}", eval_stmt),
}
}
}
impl ProcessManager {
/// Create a [ProcessManager] instance with server address and kv client.
pub fn new(server_addr: String, meta_client: Option<MetaClientRef>) -> Self {
@@ -91,7 +67,6 @@ impl ProcessManager {
query: String,
client: String,
query_id: Option<ProcessId>,
_slow_query_timer: Option<SlowQueryTimer>,
) -> Ticket {
let id = query_id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed));
let process = ProcessInfo {
@@ -118,7 +93,6 @@ impl ProcessManager {
manager: self.clone(),
id,
cancellation_handle,
_slow_query_timer,
}
}
@@ -249,7 +223,6 @@ pub struct Ticket {
pub(crate) manager: ProcessManagerRef,
pub(crate) id: ProcessId,
pub cancellation_handle: Arc<CancellationHandle>,
_slow_query_timer: Option<SlowQueryTimer>,
}
impl Drop for Ticket {
@@ -290,107 +263,6 @@ impl Debug for CancellableProcess {
}
}
/// SlowQueryTimer is used to log slow query when it's dropped.
/// In drop(), it will check if the query is slow and send the slow query event to the handler.
pub struct SlowQueryTimer {
start: Instant,
stmt: QueryStatement,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
}
impl SlowQueryTimer {
pub fn new(
stmt: QueryStatement,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
) -> Self {
Self {
start: Instant::now(),
stmt,
query_ctx,
threshold,
sample_ratio,
tx,
}
}
}
impl SlowQueryTimer {
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
let mut slow_query_event = SlowQueryEvent {
cost: elapsed.as_millis() as u64,
threshold: threshold.as_millis() as u64,
query: "".to_string(),
query_ctx: self.query_ctx.clone(),
// The following fields are only used for PromQL queries.
is_promql: false,
promql_range: None,
promql_step: None,
promql_start: None,
promql_end: None,
};
match &self.stmt {
QueryStatement::Promql(stmt) => {
slow_query_event.is_promql = true;
slow_query_event.query = stmt.expr.to_string();
slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
let start = stmt
.start
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let end = stmt
.end
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
slow_query_event.promql_range = Some((end - start) as u64);
slow_query_event.promql_start = Some(start);
slow_query_event.promql_end = Some(end);
}
QueryStatement::Sql(stmt) => {
slow_query_event.query = stmt.to_string();
}
}
// Send SlowQueryEvent to the handler.
if let Err(e) = self.tx.try_send(slow_query_event) {
error!(e; "Failed to send slow query event");
}
}
}
impl Drop for SlowQueryTimer {
fn drop(&mut self) {
if let Some(threshold) = self.threshold {
// Calculate the elaspsed duration since the timer is created.
let elapsed = self.start.elapsed();
if elapsed > threshold {
if let Some(ratio) = self.sample_ratio {
// Only capture a portion of slow queries based on sample_ratio.
// Generate a random number in [0, 1) and compare it with sample_ratio.
if ratio >= 1.0 || random::<f64>() <= ratio {
self.send_slow_query_event(elapsed, threshold);
}
} else {
// Captures all slow queries if sample_ratio is not set.
self.send_slow_query_event(elapsed, threshold);
}
}
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -406,7 +278,6 @@ mod tests {
"SELECT * FROM table".to_string(),
"".to_string(),
None,
None,
);
let running_processes = process_manager.local_processes(None).unwrap();
@@ -430,7 +301,6 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
Some(custom_id),
None,
);
assert_eq!(ticket.id, custom_id);
@@ -451,7 +321,6 @@ mod tests {
"SELECT * FROM table1".to_string(),
"client1".to_string(),
None,
None,
);
let ticket2 = process_manager.clone().register_query(
@@ -460,7 +329,6 @@ mod tests {
"SELECT * FROM table2".to_string(),
"client2".to_string(),
None,
None,
);
let running_processes = process_manager.local_processes(Some("public")).unwrap();
@@ -482,7 +350,6 @@ mod tests {
"SELECT * FROM table1".to_string(),
"client1".to_string(),
None,
None,
);
let _ticket2 = process_manager.clone().register_query(
@@ -491,7 +358,6 @@ mod tests {
"SELECT * FROM table2".to_string(),
"client2".to_string(),
None,
None,
);
// Test listing processes for specific catalog
@@ -518,7 +384,6 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
None,
);
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
process_manager.deregister_query("public".to_string(), ticket.id);
@@ -535,7 +400,6 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
None,
);
assert!(!ticket.cancellation_handle.is_cancelled());
@@ -553,7 +417,6 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
None,
);
assert!(!ticket.cancellation_handle.is_cancelled());
let killed = process_manager
@@ -599,7 +462,6 @@ mod tests {
"SELECT COUNT(*) FROM users WHERE age > 18".to_string(),
"test_client".to_string(),
Some(42),
None,
);
let processes = process_manager.local_processes(None).unwrap();
@@ -626,7 +488,6 @@ mod tests {
"SELECT * FROM table".to_string(),
"client1".to_string(),
None,
None,
);
// Process should be registered

View File

@@ -12,7 +12,6 @@ common-macro.workspace = true
common-meta.workspace = true
greptime-proto.workspace = true
meta-client.workspace = true
session.workspace = true
snafu.workspace = true
tonic.workspace = true

View File

@@ -19,7 +19,6 @@ use snafu::OptionExt;
pub mod error;
pub mod selector;
pub mod slow_query_event;
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DisplayProcessId {

View File

@@ -1,28 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use session::context::QueryContextRef;
#[derive(Debug)]
pub struct SlowQueryEvent {
pub cost: u64,
pub threshold: u64,
pub query: String,
pub is_promql: bool,
pub query_ctx: QueryContextRef,
pub promql_range: Option<u64>,
pub promql_step: Option<u64>,
pub promql_start: Option<i64>,
pub promql_end: Option<i64>,
}

View File

@@ -32,7 +32,7 @@ use std::time::{Duration, SystemTime};
use async_stream::stream;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::process_manager::{ProcessManagerRef, QueryStatement as CatalogQueryStatement};
use catalog::process_manager::ProcessManagerRef;
use catalog::CatalogManagerRef;
use client::OutputData;
use common_base::cancellation::CancellableFuture;
@@ -64,9 +64,8 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
use query::query_engine::DescribeResult;
use query::QueryEngineRef;
use servers::error::{
self as server_error, AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
};
@@ -184,40 +183,36 @@ impl Instance {
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
let query_interceptor = query_interceptor.as_ref();
if should_capture_statement(Some(&stmt)) {
let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| {
recorder.start(CatalogQueryStatement::Sql(stmt.clone()), query_ctx.clone())
});
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
stmt.to_string(),
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
slow_query_timer,
);
let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
.map_err(|_| error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => OutputData::Stream(Box::pin(
CancellableStreamWrapper::new(stream, ticket),
)),
other => other,
};
Output { data, meta }
})
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
} else {
self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
.await
}
None
};
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
stmt.to_string(),
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
);
let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
.map_err(|_| error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => {
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
}
other => other,
};
Output { data, meta }
})
}
async fn exec_statement_with_timeout(
@@ -433,54 +428,13 @@ impl SqlQueryHandler for Instance {
}
}
async fn do_exec_plan(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
if should_capture_statement(stmt.as_ref()) {
// It's safe to unwrap here because we've already checked the type.
let stmt = stmt.unwrap();
let query = stmt.to_string();
let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| {
recorder.start(CatalogQueryStatement::Sql(stmt), query_ctx.clone())
});
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
query,
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
slow_query_timer,
);
let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
.await
.map_err(|_| error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => OutputData::Stream(Box::pin(
CancellableStreamWrapper::new(stream, ticket),
)),
other => other,
};
Output { data, meta }
})
.context(ExecLogicalPlanSnafu)
} else {
// plan should be prepared before exec
// we'll do check there
self.query_engine
.execute(plan.clone(), query_ctx)
.await
.context(ExecLogicalPlanSnafu)
}
async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
// plan should be prepared before exec
// we'll do check there
self.query_engine
.execute(plan.clone(), query_ctx)
.await
.context(ExecLogicalPlanSnafu)
}
#[tracing::instrument(skip_all)]
@@ -574,6 +528,12 @@ impl PrometheusHandler for Instance {
}
})?;
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
recorder.start(stmt.clone(), query_ctx.clone())
} else {
None
};
let plan = self
.statement_executor
.plan(&stmt, query_ctx.clone())
@@ -583,47 +543,10 @@ impl PrometheusHandler for Instance {
interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
// Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
let query_statement = if let QueryStatement::Promql(eval_stmt) = stmt {
CatalogQueryStatement::Promql(eval_stmt)
} else {
// It should not happen since the query is already parsed successfully.
return UnexpectedResultSnafu {
reason: "The query should always be promql.".to_string(),
}
.fail();
};
let query = query_statement.to_string();
let slow_query_timer = self
.slow_query_recorder
.as_ref()
.and_then(|recorder| recorder.start(query_statement, query_ctx.clone()));
let ticket = self.process_manager.register_query(
query_ctx.current_catalog().to_string(),
vec![query_ctx.current_schema()],
query,
query_ctx.conn_info().to_string(),
Some(query_ctx.process_id()),
slow_query_timer,
);
let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
let output = self
.statement_executor
.exec_plan(plan, query_ctx.clone())
.await
.map_err(|_| servers::error::CancelledSnafu.build())?
.map(|output| {
let Output { meta, data } = output;
let data = match data {
OutputData::Stream(stream) => {
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
}
other => other,
};
Output { data, meta }
})
.map_err(BoxedError::new)
.context(ExecuteQuerySnafu)?;
@@ -841,15 +764,6 @@ fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<(
.context(SqlExecInterceptedSnafu)
}
// Create a query ticket and slow query timer if the statement is a query or readonly statement.
fn should_capture_statement(stmt: Option<&Statement>) -> bool {
if let Some(stmt) = stmt {
matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
} else {
false
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;

View File

@@ -119,8 +119,7 @@ impl GrpcQueryHandler for Instance {
.await
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output =
SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone())
.await?;
SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
attach_timer(output, timer)
}
@@ -398,7 +397,7 @@ impl Instance {
.context(common_query::error::GeneralDataFusionSnafu)
.context(SubstraitDecodeLogicalPlanSnafu)?;
let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?;
let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
Ok(attach_timer(output, timer))
}

View File

@@ -22,5 +22,5 @@ pub(crate) mod limiter;
pub(crate) mod metrics;
pub mod server;
pub mod service_config;
pub mod slow_query_recorder;
pub(crate) mod slow_query_recorder;
mod stream_wrapper;

View File

@@ -14,21 +14,22 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant, UNIX_EPOCH};
use api::v1::value::ValueData;
use api::v1::{
ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest,
RowInsertRequests, Rows, SemanticType,
};
use catalog::process_manager::{QueryStatement as CatalogQueryStatement, SlowQueryTimer};
use catalog::CatalogManagerRef;
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
use common_frontend::slow_query_event::SlowQueryEvent;
use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions};
use common_telemetry::{debug, error, info, slow};
use common_time::timestamp::{TimeUnit, Timestamp};
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::parser::QueryStatement;
use rand::random;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::ResultExt;
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
@@ -38,16 +39,16 @@ use tokio::task::JoinHandle;
use crate::error::{CatalogSnafu, Result, TableOperationSnafu};
pub const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
pub const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
pub const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
pub const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
pub const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
pub const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
pub const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
pub const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
pub const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
pub const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d";
const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024;
@@ -60,6 +61,19 @@ pub struct SlowQueryRecorder {
_handle: Arc<JoinHandle<()>>,
}
#[derive(Debug)]
struct SlowQueryEvent {
cost: u64,
threshold: u64,
query: String,
is_promql: bool,
query_ctx: QueryContextRef,
promql_range: Option<u64>,
promql_step: Option<u64>,
promql_start: Option<i64>,
promql_end: Option<i64>,
}
impl SlowQueryRecorder {
/// Create a new SlowQueryRecorder.
pub fn new(
@@ -101,17 +115,18 @@ impl SlowQueryRecorder {
/// The timer sets the start time when created and calculates the elapsed duration when dropped.
pub fn start(
&self,
stmt: CatalogQueryStatement,
stmt: QueryStatement,
query_ctx: QueryContextRef,
) -> Option<SlowQueryTimer> {
if self.slow_query_opts.enable {
Some(SlowQueryTimer::new(
Some(SlowQueryTimer {
stmt,
query_ctx,
self.slow_query_opts.threshold,
self.slow_query_opts.sample_ratio,
self.tx.clone(),
))
start: Instant::now(), // Set the initial start time.
threshold: self.slow_query_opts.threshold,
sample_ratio: self.slow_query_opts.sample_ratio,
tx: self.tx.clone(),
})
} else {
None
}
@@ -432,3 +447,85 @@ impl SlowQueryEventHandler {
]
}
}
/// SlowQueryTimer is used to log slow query when it's dropped.
/// In drop(), it will check if the query is slow and send the slow query event to the handler.
pub struct SlowQueryTimer {
start: Instant,
stmt: QueryStatement,
query_ctx: QueryContextRef,
threshold: Option<Duration>,
sample_ratio: Option<f64>,
tx: Sender<SlowQueryEvent>,
}
impl SlowQueryTimer {
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
let mut slow_query_event = SlowQueryEvent {
cost: elapsed.as_millis() as u64,
threshold: threshold.as_millis() as u64,
query: "".to_string(),
query_ctx: self.query_ctx.clone(),
// The following fields are only used for PromQL queries.
is_promql: false,
promql_range: None,
promql_step: None,
promql_start: None,
promql_end: None,
};
match &self.stmt {
QueryStatement::Promql(stmt) => {
slow_query_event.is_promql = true;
slow_query_event.query = stmt.expr.to_string();
slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
let start = stmt
.start
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let end = stmt
.end
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
slow_query_event.promql_range = Some((end - start) as u64);
slow_query_event.promql_start = Some(start);
slow_query_event.promql_end = Some(end);
}
QueryStatement::Sql(stmt) => {
slow_query_event.query = stmt.to_string();
}
}
// Send SlowQueryEvent to the handler.
if let Err(e) = self.tx.try_send(slow_query_event) {
error!(e; "Failed to send slow query event");
}
}
}
impl Drop for SlowQueryTimer {
fn drop(&mut self) {
if let Some(threshold) = self.threshold {
// Calculate the elaspsed duration since the timer is created.
let elapsed = self.start.elapsed();
if elapsed > threshold {
if let Some(ratio) = self.sample_ratio {
// Only capture a portion of slow queries based on sample_ratio.
// Generate a random number in [0, 1) and compare it with sample_ratio.
if ratio >= 1.0 || random::<f64>() <= ratio {
self.send_slow_query_event(elapsed, threshold);
}
} else {
// Captures all slow queries if sample_ratio is not set.
self.send_slow_query_event(elapsed, threshold);
}
}
}
}
}

View File

@@ -187,7 +187,6 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
@@ -211,7 +210,6 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
// Cancel before creating the wrapper
@@ -236,7 +234,6 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
@@ -264,7 +261,6 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();
@@ -293,7 +289,6 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
@@ -324,7 +319,6 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
@@ -353,7 +347,6 @@ mod tests {
"query".to_string(),
"client".to_string(),
None,
None,
);
let cancellation_handle = ticket.cancellation_handle.clone();

View File

@@ -66,10 +66,6 @@ const COLUMN_PREFIX: &str = "__column_";
/// itself.
pub struct MetadataRegion {
pub(crate) mito: MitoEngine,
/// The cache for contents(key-value pairs) of region metadata.
///
/// The cache should be invalidated when any new values are put into the metadata region or any
/// values are deleted from the metadata region.
cache: Cache<RegionId, RegionMetadataCacheEntry>,
/// Logical lock for operations that need to be serialized. Like update & read region columns.
///
@@ -84,19 +80,14 @@ struct RegionMetadataCacheEntry {
size: usize,
}
/// The max size of the region metadata cache.
const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
/// The TTL of the region metadata cache.
const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
impl MetadataRegion {
pub fn new(mito: MitoEngine) -> Self {
let cache = Cache::builder()
.max_capacity(MAX_CACHE_SIZE)
// Use the LRU eviction policy to minimize frequent mito scans.
// Recently accessed items are retained longer in the cache.
.eviction_policy(EvictionPolicy::lru())
.time_to_live(CACHE_TTL)
.time_to_live(Duration::from_secs(60))
.weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
.build();
Self {
@@ -391,11 +382,11 @@ impl MetadataRegion {
}
}
async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
async fn load_all(&self, region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
let scan_req = MetadataRegion::build_read_request();
let record_batch_stream = self
.mito
.scan_to_stream(metadata_region_id, scan_req)
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
@@ -416,12 +407,12 @@ impl MetadataRegion {
async fn get_all_with_prefix(
&self,
metadata_region_id: RegionId,
region_id: RegionId,
prefix: &str,
) -> Result<HashMap<String, String>> {
let region_metadata = self
.cache
.try_get_with(metadata_region_id, self.load_all(metadata_region_id))
.try_get_with(region_id, self.load_all(region_id))
.await
.context(CacheGetSnafu)?;
@@ -455,17 +446,16 @@ impl MetadataRegion {
/// Delete the given keys. For performance consideration, this method
/// doesn't check if those keys exist or not.
async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> {
let delete_request = Self::build_delete_request(keys);
self.mito
.handle_request(
metadata_region_id,
region_id,
store_api::region_request::RegionRequest::Delete(delete_request),
)
.await
.context(MitoWriteOperationSnafu)?;
// Invalidates the region metadata cache if any values are deleted from the metadata region.
self.cache.invalidate(&metadata_region_id).await;
self.cache.invalidate(&region_id).await;
Ok(())
}
@@ -556,7 +546,7 @@ impl MetadataRegion {
write_region_id: bool,
logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
) -> Result<()> {
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let region_id = utils::to_metadata_region_id(physical_region_id);
let iter = logical_regions
.into_iter()
.flat_map(|(logical_region_id, column_metadatas)| {
@@ -583,13 +573,12 @@ impl MetadataRegion {
let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
self.mito
.handle_request(
metadata_region_id,
region_id,
store_api::region_request::RegionRequest::Put(put_request),
)
.await
.context(MitoWriteOperationSnafu)?;
// Invalidates the region metadata cache if any new values are put into the metadata region.
self.cache.invalidate(&metadata_region_id).await;
self.cache.invalidate(&region_id).await;
Ok(())
}

View File

@@ -624,12 +624,6 @@ pub enum Error {
#[snafu(display("Unknown hint: {}", hint))]
UnknownHint { hint: String },
#[snafu(display("Query has been cancelled"))]
Cancelled {
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -753,8 +747,6 @@ impl ErrorExt for Error {
DurationOverflow { .. } => StatusCode::InvalidArguments,
HandleOtelArrowRequest { .. } => StatusCode::Internal,
Cancelled { .. } => StatusCode::Cancelled,
}
}

View File

@@ -1213,7 +1213,6 @@ mod test {
use query::parser::PromQuery;
use query::query_engine::DescribeResult;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
use tokio::time::Instant;
@@ -1244,7 +1243,6 @@ mod test {
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {

View File

@@ -20,7 +20,6 @@
use datafusion_expr::LogicalPlan;
use datatypes::schema::Schema;
use sql::statements::statement::Statement;
pub mod addrs;
pub mod configurator;
@@ -56,8 +55,6 @@ pub mod tls;
#[derive(Clone)]
pub struct SqlPlan {
query: String,
// Store the parsed statement to determine if it is a query and whether to track it.
statement: Option<Statement>,
plan: Option<LogicalPlan>,
schema: Option<Schema>,
}

View File

@@ -136,7 +136,6 @@ impl MysqlInstanceShim {
async fn do_exec_plan(
&self,
query: &str,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
@@ -145,7 +144,7 @@ impl MysqlInstanceShim {
{
Ok(output)
} else {
self.query_handler.do_exec_plan(stmt, plan, query_ctx).await
self.query_handler.do_exec_plan(plan, query_ctx).await
}
}
@@ -232,7 +231,6 @@ impl MysqlInstanceShim {
self.save_plan(
SqlPlan {
query: query.to_string(),
statement: Some(statement),
plan: None,
schema: None,
},
@@ -242,7 +240,6 @@ impl MysqlInstanceShim {
self.save_plan(
SqlPlan {
query: query.to_string(),
statement: Some(statement),
plan,
schema,
},
@@ -294,13 +291,8 @@ impl MysqlInstanceShim {
debug!("Mysql execute prepared plan: {}", plan.display_indent());
vec![
self.do_exec_plan(
&sql_plan.query,
sql_plan.statement.clone(),
plan,
query_ctx.clone(),
)
.await,
self.do_exec_plan(&sql_plan.query, plan, query_ctx.clone())
.await,
]
}
None => {

View File

@@ -228,7 +228,6 @@ impl QueryParser for DefaultQueryParser {
if sql.is_empty() || fixtures::matches(sql) {
return Ok(SqlPlan {
query: sql.to_owned(),
statement: None,
plan: None,
schema: None,
});
@@ -249,7 +248,7 @@ impl QueryParser for DefaultQueryParser {
let describe_result = self
.query_handler
.do_describe(stmt.clone(), query_ctx)
.do_describe(stmt, query_ctx)
.await
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
@@ -265,7 +264,6 @@ impl QueryParser for DefaultQueryParser {
Ok(SqlPlan {
query: sql.to_owned(),
statement: Some(stmt),
plan,
schema,
})
@@ -320,7 +318,7 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner {
)?))
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
self.query_handler
.do_exec_plan(sql_plan.statement.clone(), plan, query_ctx.clone())
.do_exec_plan(plan, query_ctx.clone())
.await
} else {
// manually replace variables in prepared statement when no

View File

@@ -41,7 +41,6 @@ pub trait SqlQueryHandler {
async fn do_exec_plan(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error>;
@@ -89,14 +88,9 @@ where
.collect()
}
async fn do_exec_plan(
&self,
stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
self.0
.do_exec_plan(stmt, plan, query_ctx)
.do_exec_plan(plan, query_ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecutePlanSnafu)

View File

@@ -31,7 +31,6 @@ use servers::influxdb::InfluxdbRequest;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::InfluxdbLineProtocolHandler;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
struct DummyInstance {
@@ -60,7 +59,6 @@ impl SqlQueryHandler for DummyInstance {
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {

View File

@@ -28,7 +28,6 @@ use servers::opentsdb::codec::DataPoint;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
struct DummyInstance {
@@ -60,7 +59,6 @@ impl SqlQueryHandler for DummyInstance {
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {

View File

@@ -35,7 +35,6 @@ use servers::prom_store::{snappy_compress, Metrics};
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse};
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use tokio::sync::mpsc;
struct DummyInstance {
@@ -88,7 +87,6 @@ impl SqlQueryHandler for DummyInstance {
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> std::result::Result<Output, Self::Error> {

View File

@@ -68,12 +68,7 @@ impl SqlQueryHandler for DummyInstance {
vec![Ok(output)]
}
async fn do_exec_plan(
&self,
_stmt: Option<Statement>,
plan: LogicalPlan,
query_ctx: QueryContextRef,
) -> Result<Output> {
async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
Ok(self.query_engine.execute(plan, query_ctx).await.unwrap())
}

View File

@@ -100,5 +100,4 @@ session = { workspace = true, features = ["testing"] }
store-api.workspace = true
tokio-postgres = { workspace = true }
url = "2.3"
urlencoding = "2.1"
yaml-rust = "0.4"

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use cache::{
build_datanode_cache_registry, build_fundamental_cache_registry,
@@ -42,7 +41,6 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::build_wal_options_allocator;
use common_procedure::options::ProcedureConfig;
use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::SlowQueryOptions;
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
use datanode::datanode::DatanodeBuilder;
use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
@@ -329,13 +327,6 @@ impl GreptimeDbStandaloneBuilder {
metadata_store: kv_backend_config,
wal: self.metasrv_wal_config.clone().into(),
grpc: GrpcOptions::default().with_server_addr("127.0.0.1:4001"),
// Enable slow query log with 1s threshold to run the slow query test.
slow_query: Some(SlowQueryOptions {
enable: true,
// Set the threshold to 1s to run the slow query test.
threshold: Some(Duration::from_secs(1)),
..Default::default()
}),
..StandaloneOptions::default()
};

View File

@@ -15,7 +15,6 @@
use std::collections::BTreeMap;
use std::io::Write;
use std::str::FromStr;
use std::time::Duration;
use api::prom_store::remote::label_matcher::Type as MatcherType;
use api::prom_store::remote::{
@@ -24,13 +23,10 @@ use api::prom_store::remote::{
use auth::user_provider_from_option;
use axum::http::{HeaderName, HeaderValue, StatusCode};
use chrono::Utc;
use common_catalog::consts::{
trace_services_table_name, DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME,
};
use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
use common_error::status_code::StatusCode as ErrorCode;
use flate2::write::GzEncoder;
use flate2::Compression;
use frontend::slow_query_recorder::{SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME};
use log_query::{Context, Limit, LogQuery, TimeFilter};
use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAdapter};
use loki_proto::prost_types::Timestamp;
@@ -59,7 +55,6 @@ use tests_integration::test_util::{
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
StorageType,
};
use urlencoding::encode;
use yaml_rust::YamlLoader;
#[macro_export]
@@ -93,7 +88,6 @@ macro_rules! http_tests {
test_http_auth,
test_sql_api,
test_http_sql_slow_query,
test_prometheus_promql_api,
test_prom_http_api,
test_metrics_api,
@@ -539,29 +533,6 @@ pub async fn test_sql_api(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_http_sql_slow_query(store_type: StorageType) {
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await;
let client = TestClient::new(app).await;
let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte";
let encoded_slow_query = encode(slow_query);
let query_params = format!("/v1/sql?sql={encoded_slow_query}");
let res = client.get(&query_params).send().await;
assert_eq!(res.status(), StatusCode::OK);
// Wait for the slow query to be recorded.
tokio::time::sleep(Duration::from_secs(5)).await;
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!("SELECT {} FROM {table}", SLOW_QUERY_TABLE_QUERY_COLUMN_NAME);
let expected = format!(r#"[["{}"]]"#, slow_query);
validate_data("test_http_sql_slow_query", &client, &query, &expected).await;
guard.remove_all().await;
}
pub async fn test_prometheus_promql_api(store_type: StorageType) {
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "promql_api").await;
let client = TestClient::new(app).await;
@@ -1300,7 +1271,7 @@ write_interval = "30s"
[slow_query]
enable = true
record_type = "system_table"
threshold = "1s"
threshold = "30s"
sample_ratio = 1.0
ttl = "30d"

View File

@@ -16,12 +16,6 @@ use std::collections::HashMap;
use auth::user_provider_from_option;
use chrono::{DateTime, NaiveDate, NaiveDateTime, SecondsFormat, Utc};
use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
use frontend::slow_query_recorder::{
SLOW_QUERY_TABLE_COST_COLUMN_NAME, SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
};
use sqlx::mysql::{MySqlConnection, MySqlDatabaseError, MySqlPoolOptions};
use sqlx::postgres::{PgDatabaseError, PgPoolOptions};
use sqlx::{Connection, Executor, Row};
@@ -70,12 +64,10 @@ macro_rules! sql_tests {
test_mysql_crud,
test_mysql_timezone,
test_mysql_async_timestamp,
test_mysql_slow_query,
test_postgres_auth,
test_postgres_crud,
test_postgres_timezone,
test_postgres_bytea,
test_postgres_slow_query,
test_postgres_datestyle,
test_postgres_parameter_inference,
test_postgres_array_types,
@@ -588,56 +580,6 @@ pub async fn test_postgres_crud(store_type: StorageType) {
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_mysql_slow_query(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (mut guard, fe_mysql_server) =
setup_mysql_server(store_type, "test_mysql_slow_query").await;
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
let pool = MySqlPoolOptions::new()
.max_connections(2)
.connect(&format!("mysql://{addr}/public"))
.await
.unwrap();
// The slow query will run at least longer than 1s.
let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte";
// Simulate a slow query.
sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
// Wait for the slow query to be recorded.
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!(
"SELECT {}, {}, {}, {} FROM {table}",
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME
);
let rows = sqlx::query(&query).fetch_all(&pool).await.unwrap();
assert_eq!(rows.len(), 1);
// Check the results.
let row = &rows[0];
let cost: u64 = row.get(0);
let threshold: u64 = row.get(1);
let query: String = row.get(2);
let is_promql: bool = row.get(3);
assert!(cost > 0 && threshold > 0 && cost > threshold);
assert_eq!(query, slow_query);
assert!(!is_promql);
let _ = fe_mysql_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_bytea(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_bytea").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
@@ -708,46 +650,6 @@ pub async fn test_postgres_bytea(store_type: StorageType) {
guard.remove_all().await;
}
pub async fn test_postgres_slow_query(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_slow_query").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();
let pool = PgPoolOptions::new()
.max_connections(2)
.connect(&format!("postgres://{addr}/public"))
.await
.unwrap();
let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte";
let _ = sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
// Wait for the slow query to be recorded.
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
let query = format!(
"SELECT {}, {}, {}, {} FROM {table}",
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME
);
let rows = sqlx::query(&query).fetch_all(&pool).await.unwrap();
assert_eq!(rows.len(), 1);
let row = &rows[0];
let cost: i64 = row.get(0);
let threshold: i64 = row.get(1);
let query: String = row.get(2);
let is_promql: bool = row.get(3);
assert!(cost > 0 && threshold > 0 && cost > threshold);
assert_eq!(query, slow_query);
assert!(!is_promql);
let _ = fe_pg_server.shutdown().await;
guard.remove_all().await;
}
pub async fn test_postgres_datestyle(store_type: StorageType) {
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_datestyle").await;
let addr = fe_pg_server.bind_addr().unwrap().to_string();