mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 12:52:57 +00:00
Compare commits
23 Commits
v0.15.5
...
v0.15_reco
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b363c044f9 | ||
|
|
f44816cc15 | ||
|
|
c4c2b87615 | ||
|
|
1f194af999 | ||
|
|
e6482acd47 | ||
|
|
ba5cb48231 | ||
|
|
a8166f800b | ||
|
|
6fdc0b99b3 | ||
|
|
d3a1c80fbd | ||
|
|
1434582cc3 | ||
|
|
01e0ce6f29 | ||
|
|
ce1d0b6c4c | ||
|
|
686ee9f579 | ||
|
|
44a368bc4e | ||
|
|
6c54f2b6c0 | ||
|
|
9bb37a6a14 | ||
|
|
cfd6c1c3e0 | ||
|
|
c0f40ce8ed | ||
|
|
c9501053e5 | ||
|
|
e3ee08d300 | ||
|
|
76657e9c89 | ||
|
|
9629225f56 | ||
|
|
165f156e69 |
42
.github/scripts/check-version.sh
vendored
42
.github/scripts/check-version.sh
vendored
@@ -1,42 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Get current version
|
||||
CURRENT_VERSION=$1
|
||||
if [ -z "$CURRENT_VERSION" ]; then
|
||||
echo "Error: Failed to get current version"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Get the latest version from GitHub Releases
|
||||
API_RESPONSE=$(curl -s "https://api.github.com/repos/GreptimeTeam/greptimedb/releases/latest")
|
||||
|
||||
if [ -z "$API_RESPONSE" ] || [ "$(echo "$API_RESPONSE" | jq -r '.message')" = "Not Found" ]; then
|
||||
echo "Error: Failed to fetch latest version from GitHub"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Get the latest version
|
||||
LATEST_VERSION=$(echo "$API_RESPONSE" | jq -r '.tag_name')
|
||||
|
||||
if [ -z "$LATEST_VERSION" ] || [ "$LATEST_VERSION" = "null" ]; then
|
||||
echo "Error: No valid version found in GitHub releases"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Cleaned up version number format (removed possible 'v' prefix and -nightly suffix)
|
||||
CLEAN_CURRENT=$(echo "$CURRENT_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
|
||||
CLEAN_LATEST=$(echo "$LATEST_VERSION" | sed 's/^v//' | sed 's/-nightly-.*//')
|
||||
|
||||
echo "Current version: $CLEAN_CURRENT"
|
||||
echo "Latest release version: $CLEAN_LATEST"
|
||||
|
||||
# Use sort -V to compare versions
|
||||
HIGHER_VERSION=$(printf "%s\n%s" "$CLEAN_CURRENT" "$CLEAN_LATEST" | sort -V | tail -n1)
|
||||
|
||||
if [ "$HIGHER_VERSION" = "$CLEAN_CURRENT" ]; then
|
||||
echo "Current version ($CLEAN_CURRENT) is NEWER than or EQUAL to latest ($CLEAN_LATEST)"
|
||||
echo "should-push-latest-tag=true" >> $GITHUB_OUTPUT
|
||||
else
|
||||
echo "Current version ($CLEAN_CURRENT) is OLDER than latest ($CLEAN_LATEST)"
|
||||
echo "should-push-latest-tag=false" >> $GITHUB_OUTPUT
|
||||
fi
|
||||
11
.github/workflows/release.yml
vendored
11
.github/workflows/release.yml
vendored
@@ -110,8 +110,6 @@ jobs:
|
||||
|
||||
# The 'version' use as the global tag name of the release workflow.
|
||||
version: ${{ steps.create-version.outputs.version }}
|
||||
|
||||
should-push-latest-tag: ${{ steps.check-version.outputs.should-push-latest-tag }}
|
||||
steps:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
@@ -137,11 +135,6 @@ jobs:
|
||||
GITHUB_REF_NAME: ${{ github.ref_name }}
|
||||
NIGHTLY_RELEASE_PREFIX: ${{ env.NIGHTLY_RELEASE_PREFIX }}
|
||||
|
||||
- name: Check version
|
||||
id: check-version
|
||||
run: |
|
||||
./.github/scripts/check-version.sh "${{ steps.create-version.outputs.version }}"
|
||||
|
||||
- name: Allocate linux-amd64 runner
|
||||
if: ${{ inputs.build_linux_amd64_artifacts || github.event_name == 'push' || github.event_name == 'schedule' }}
|
||||
uses: ./.github/actions/start-runner
|
||||
@@ -321,7 +314,7 @@ jobs:
|
||||
image-registry-username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
image-registry-password: ${{ secrets.DOCKERHUB_TOKEN }}
|
||||
version: ${{ needs.allocate-runners.outputs.version }}
|
||||
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
push-latest-tag: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
|
||||
- name: Set build image result
|
||||
id: set-build-image-result
|
||||
@@ -368,7 +361,7 @@ jobs:
|
||||
dev-mode: false
|
||||
upload-to-s3: true
|
||||
update-version-info: true
|
||||
push-latest-tag: ${{ needs.allocate-runners.outputs.should-push-latest-tag == 'true' && github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
push-latest-tag: ${{ github.ref_type == 'tag' && !contains(github.ref_name, 'nightly') && github.event_name != 'schedule' }}
|
||||
|
||||
publish-github-release:
|
||||
name: Create GitHub release and upload artifacts
|
||||
|
||||
158
Cargo.lock
generated
158
Cargo.lock
generated
@@ -211,7 +211,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
|
||||
|
||||
[[package]]
|
||||
name = "api"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-decimal",
|
||||
@@ -944,7 +944,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "auth"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -1586,7 +1586,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cache"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"catalog",
|
||||
"common-error",
|
||||
@@ -1621,7 +1621,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "catalog"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow 54.2.1",
|
||||
@@ -1659,8 +1659,6 @@ dependencies = [
|
||||
"partition",
|
||||
"paste",
|
||||
"prometheus",
|
||||
"promql-parser",
|
||||
"rand 0.9.0",
|
||||
"rustc-hash 2.0.0",
|
||||
"serde_json",
|
||||
"session",
|
||||
@@ -1961,7 +1959,7 @@ checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97"
|
||||
|
||||
[[package]]
|
||||
name = "cli"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2006,7 +2004,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.15.5",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
@@ -2015,7 +2013,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "client"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -2045,7 +2043,7 @@ dependencies = [
|
||||
"rand 0.9.0",
|
||||
"serde_json",
|
||||
"snafu 0.8.5",
|
||||
"substrait 0.15.5",
|
||||
"substrait 0.15.4",
|
||||
"substrait 0.37.3",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -2086,7 +2084,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "cmd"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"auth",
|
||||
@@ -2147,7 +2145,7 @@ dependencies = [
|
||||
"snafu 0.8.5",
|
||||
"stat",
|
||||
"store-api",
|
||||
"substrait 0.15.5",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"temp-env",
|
||||
"tempfile",
|
||||
@@ -2194,7 +2192,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
|
||||
|
||||
[[package]]
|
||||
name = "common-base"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"async-trait",
|
||||
@@ -2216,11 +2214,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-catalog"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
|
||||
[[package]]
|
||||
name = "common-config"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -2245,7 +2243,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-datasource"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-schema 54.3.1",
|
||||
@@ -2282,7 +2280,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-decimal"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"bigdecimal 0.4.8",
|
||||
"common-error",
|
||||
@@ -2295,7 +2293,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-error"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-macro",
|
||||
"http 1.1.0",
|
||||
@@ -2306,7 +2304,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-frontend"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-error",
|
||||
@@ -2315,7 +2313,6 @@ dependencies = [
|
||||
"common-meta",
|
||||
"greptime-proto",
|
||||
"meta-client",
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"tokio",
|
||||
"tonic 0.12.3",
|
||||
@@ -2323,7 +2320,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-function"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -2376,7 +2373,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-greptimedb-telemetry"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-runtime",
|
||||
@@ -2393,7 +2390,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -2425,7 +2422,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-grpc-expr"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-base",
|
||||
@@ -2444,7 +2441,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-macro"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-query",
|
||||
@@ -2458,7 +2455,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-mem-prof"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"common-error",
|
||||
@@ -2474,7 +2471,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-meta"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"anymap2",
|
||||
"api",
|
||||
@@ -2540,7 +2537,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-options"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-grpc",
|
||||
"humantime-serde",
|
||||
@@ -2549,11 +2546,11 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-plugins"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
|
||||
[[package]]
|
||||
name = "common-pprof"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-error",
|
||||
"common-macro",
|
||||
@@ -2565,7 +2562,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -2592,7 +2589,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-procedure-test"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"common-procedure",
|
||||
@@ -2602,7 +2599,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-query"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -2628,7 +2625,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-recordbatch"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"common-error",
|
||||
@@ -2648,7 +2645,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-runtime"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -2678,14 +2675,14 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-session"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"strum 0.27.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "common-telemetry"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"backtrace",
|
||||
"common-error",
|
||||
@@ -2713,7 +2710,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-test-util"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"client",
|
||||
"common-grpc",
|
||||
@@ -2726,7 +2723,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-time"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"chrono",
|
||||
@@ -2744,7 +2741,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-version"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"build-data",
|
||||
"cargo-manifest",
|
||||
@@ -2755,7 +2752,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-wal"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
@@ -2778,7 +2775,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "common-workload"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"common-telemetry",
|
||||
@@ -3734,7 +3731,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datanode"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -3787,7 +3784,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.5",
|
||||
"store-api",
|
||||
"substrait 0.15.5",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.19",
|
||||
@@ -3796,7 +3793,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "datatypes"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arrow 54.2.1",
|
||||
"arrow-array 54.2.1",
|
||||
@@ -4456,7 +4453,7 @@ checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6"
|
||||
|
||||
[[package]]
|
||||
name = "file-engine"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -4593,7 +4590,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
|
||||
|
||||
[[package]]
|
||||
name = "flow"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow 54.2.1",
|
||||
@@ -4658,7 +4655,7 @@ dependencies = [
|
||||
"sql",
|
||||
"store-api",
|
||||
"strum 0.27.1",
|
||||
"substrait 0.15.5",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"tonic 0.12.3",
|
||||
@@ -4713,7 +4710,7 @@ checksum = "6c2141d6d6c8512188a7891b4b01590a45f6dac67afb4f255c4124dbb86d4eaa"
|
||||
|
||||
[[package]]
|
||||
name = "frontend"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -4773,7 +4770,7 @@ dependencies = [
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
"store-api",
|
||||
"strfmt",
|
||||
"substrait 0.15.5",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -5163,7 +5160,7 @@ dependencies = [
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=f3103a8c9b8ce162457d0a3e3ca00d53d1a8bd06#f3103a8c9b8ce162457d0a3e3ca00d53d1a8bd06"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=551c70f2b352b6bc55ab769129344aeab8f22d11#551c70f2b352b6bc55ab769129344aeab8f22d11"
|
||||
dependencies = [
|
||||
"prost 0.13.5",
|
||||
"serde",
|
||||
@@ -5934,7 +5931,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "index"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"asynchronous-codec",
|
||||
@@ -6819,7 +6816,7 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
||||
|
||||
[[package]]
|
||||
name = "log-query"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"common-error",
|
||||
@@ -6831,7 +6828,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "log-store"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
@@ -7129,7 +7126,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-client"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -7157,7 +7154,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "meta-srv"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -7253,7 +7250,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "metric-engine"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -7345,7 +7342,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito-codec"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"bytes",
|
||||
@@ -7368,7 +7365,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "mito2"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -8118,7 +8115,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "object-store"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"bytes",
|
||||
@@ -8432,7 +8429,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "operator"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -8487,7 +8484,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
"store-api",
|
||||
"substrait 0.15.5",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
@@ -8754,7 +8751,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "partition"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -9042,7 +9039,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
||||
|
||||
[[package]]
|
||||
name = "pipeline"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -9185,7 +9182,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "plugins"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"auth",
|
||||
"clap 4.5.19",
|
||||
@@ -9498,7 +9495,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "promql"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"async-trait",
|
||||
@@ -9780,7 +9777,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "puffin"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-compression 0.4.13",
|
||||
"async-trait",
|
||||
@@ -9822,7 +9819,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "query"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -9888,7 +9885,7 @@ dependencies = [
|
||||
"sqlparser 0.54.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=0cf6c04490d59435ee965edd2078e8855bd8471e)",
|
||||
"statrs",
|
||||
"store-api",
|
||||
"substrait 0.15.5",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
@@ -11174,7 +11171,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "servers"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"ahash 0.8.11",
|
||||
"api",
|
||||
@@ -11295,7 +11292,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "session"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arc-swap",
|
||||
@@ -11634,7 +11631,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sql"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"chrono",
|
||||
@@ -11689,7 +11686,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "sqlness-runner"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"clap 4.5.19",
|
||||
@@ -11989,7 +11986,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "stat"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"nix 0.30.1",
|
||||
]
|
||||
@@ -12015,7 +12012,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "store-api"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"aquamarine",
|
||||
@@ -12176,7 +12173,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "substrait"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
@@ -12356,7 +12353,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "table"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
@@ -12617,7 +12614,7 @@ checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
|
||||
|
||||
[[package]]
|
||||
name = "tests-fuzz"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"arbitrary",
|
||||
"async-trait",
|
||||
@@ -12661,7 +12658,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "tests-integration"
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
@@ -12728,7 +12725,7 @@ dependencies = [
|
||||
"sql",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
"substrait 0.15.5",
|
||||
"substrait 0.15.4",
|
||||
"table",
|
||||
"tempfile",
|
||||
"time",
|
||||
@@ -12738,7 +12735,6 @@ dependencies = [
|
||||
"tonic 0.12.3",
|
||||
"tower 0.5.2",
|
||||
"url",
|
||||
"urlencoding",
|
||||
"uuid",
|
||||
"yaml-rust",
|
||||
"zstd 0.13.2",
|
||||
|
||||
@@ -71,7 +71,7 @@ members = [
|
||||
resolver = "2"
|
||||
|
||||
[workspace.package]
|
||||
version = "0.15.5"
|
||||
version = "0.15.4"
|
||||
edition = "2021"
|
||||
license = "Apache-2.0"
|
||||
|
||||
@@ -134,7 +134,7 @@ etcd-client = "0.14"
|
||||
fst = "0.4.7"
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "f3103a8c9b8ce162457d0a3e3ca00d53d1a8bd06" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "551c70f2b352b6bc55ab769129344aeab8f22d11" }
|
||||
hex = "0.4"
|
||||
http = "1"
|
||||
humantime = "2.1"
|
||||
|
||||
@@ -43,8 +43,6 @@ moka = { workspace = true, features = ["future", "sync"] }
|
||||
partition.workspace = true
|
||||
paste.workspace = true
|
||||
prometheus.workspace = true
|
||||
promql-parser.workspace = true
|
||||
rand.workspace = true
|
||||
rustc-hash.workspace = true
|
||||
serde_json.workspace = true
|
||||
session.workspace = true
|
||||
|
||||
@@ -14,24 +14,17 @@
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::atomic::{AtomicU32, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, Instant, UNIX_EPOCH};
|
||||
|
||||
use api::v1::frontend::{KillProcessRequest, ListProcessRequest, ProcessInfo};
|
||||
use common_base::cancellation::CancellationHandle;
|
||||
use common_frontend::selector::{FrontendSelector, MetaClientSelector};
|
||||
use common_frontend::slow_query_event::SlowQueryEvent;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use common_telemetry::{debug, info, warn};
|
||||
use common_time::util::current_time_millis;
|
||||
use meta_client::MetaClientRef;
|
||||
use promql_parser::parser::EvalStmt;
|
||||
use rand::random;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements::statement::Statement;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
|
||||
use crate::error;
|
||||
use crate::metrics::{PROCESS_KILL_COUNT, PROCESS_LIST_COUNT};
|
||||
@@ -51,23 +44,6 @@ pub struct ProcessManager {
|
||||
frontend_selector: Option<MetaClientSelector>,
|
||||
}
|
||||
|
||||
/// Represents a parsed query statement, functionally equivalent to [query::parser::QueryStatement].
|
||||
/// This enum is defined here to avoid cyclic dependencies with the query parser module.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum QueryStatement {
|
||||
Sql(Statement),
|
||||
Promql(EvalStmt),
|
||||
}
|
||||
|
||||
impl Display for QueryStatement {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
QueryStatement::Sql(stmt) => write!(f, "{}", stmt),
|
||||
QueryStatement::Promql(eval_stmt) => write!(f, "{}", eval_stmt),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ProcessManager {
|
||||
/// Create a [ProcessManager] instance with server address and kv client.
|
||||
pub fn new(server_addr: String, meta_client: Option<MetaClientRef>) -> Self {
|
||||
@@ -91,7 +67,6 @@ impl ProcessManager {
|
||||
query: String,
|
||||
client: String,
|
||||
query_id: Option<ProcessId>,
|
||||
_slow_query_timer: Option<SlowQueryTimer>,
|
||||
) -> Ticket {
|
||||
let id = query_id.unwrap_or_else(|| self.next_id.fetch_add(1, Ordering::Relaxed));
|
||||
let process = ProcessInfo {
|
||||
@@ -118,7 +93,6 @@ impl ProcessManager {
|
||||
manager: self.clone(),
|
||||
id,
|
||||
cancellation_handle,
|
||||
_slow_query_timer,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -249,7 +223,6 @@ pub struct Ticket {
|
||||
pub(crate) manager: ProcessManagerRef,
|
||||
pub(crate) id: ProcessId,
|
||||
pub cancellation_handle: Arc<CancellationHandle>,
|
||||
_slow_query_timer: Option<SlowQueryTimer>,
|
||||
}
|
||||
|
||||
impl Drop for Ticket {
|
||||
@@ -290,107 +263,6 @@ impl Debug for CancellableProcess {
|
||||
}
|
||||
}
|
||||
|
||||
/// SlowQueryTimer is used to log slow query when it's dropped.
|
||||
/// In drop(), it will check if the query is slow and send the slow query event to the handler.
|
||||
pub struct SlowQueryTimer {
|
||||
start: Instant,
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
threshold: Option<Duration>,
|
||||
sample_ratio: Option<f64>,
|
||||
tx: Sender<SlowQueryEvent>,
|
||||
}
|
||||
|
||||
impl SlowQueryTimer {
|
||||
pub fn new(
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
threshold: Option<Duration>,
|
||||
sample_ratio: Option<f64>,
|
||||
tx: Sender<SlowQueryEvent>,
|
||||
) -> Self {
|
||||
Self {
|
||||
start: Instant::now(),
|
||||
stmt,
|
||||
query_ctx,
|
||||
threshold,
|
||||
sample_ratio,
|
||||
tx,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SlowQueryTimer {
|
||||
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
|
||||
let mut slow_query_event = SlowQueryEvent {
|
||||
cost: elapsed.as_millis() as u64,
|
||||
threshold: threshold.as_millis() as u64,
|
||||
query: "".to_string(),
|
||||
query_ctx: self.query_ctx.clone(),
|
||||
|
||||
// The following fields are only used for PromQL queries.
|
||||
is_promql: false,
|
||||
promql_range: None,
|
||||
promql_step: None,
|
||||
promql_start: None,
|
||||
promql_end: None,
|
||||
};
|
||||
|
||||
match &self.stmt {
|
||||
QueryStatement::Promql(stmt) => {
|
||||
slow_query_event.is_promql = true;
|
||||
slow_query_event.query = stmt.expr.to_string();
|
||||
slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
|
||||
|
||||
let start = stmt
|
||||
.start
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64;
|
||||
|
||||
let end = stmt
|
||||
.end
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64;
|
||||
|
||||
slow_query_event.promql_range = Some((end - start) as u64);
|
||||
slow_query_event.promql_start = Some(start);
|
||||
slow_query_event.promql_end = Some(end);
|
||||
}
|
||||
QueryStatement::Sql(stmt) => {
|
||||
slow_query_event.query = stmt.to_string();
|
||||
}
|
||||
}
|
||||
|
||||
// Send SlowQueryEvent to the handler.
|
||||
if let Err(e) = self.tx.try_send(slow_query_event) {
|
||||
error!(e; "Failed to send slow query event");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SlowQueryTimer {
|
||||
fn drop(&mut self) {
|
||||
if let Some(threshold) = self.threshold {
|
||||
// Calculate the elaspsed duration since the timer is created.
|
||||
let elapsed = self.start.elapsed();
|
||||
if elapsed > threshold {
|
||||
if let Some(ratio) = self.sample_ratio {
|
||||
// Only capture a portion of slow queries based on sample_ratio.
|
||||
// Generate a random number in [0, 1) and compare it with sample_ratio.
|
||||
if ratio >= 1.0 || random::<f64>() <= ratio {
|
||||
self.send_slow_query_event(elapsed, threshold);
|
||||
}
|
||||
} else {
|
||||
// Captures all slow queries if sample_ratio is not set.
|
||||
self.send_slow_query_event(elapsed, threshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
@@ -406,7 +278,6 @@ mod tests {
|
||||
"SELECT * FROM table".to_string(),
|
||||
"".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let running_processes = process_manager.local_processes(None).unwrap();
|
||||
@@ -430,7 +301,6 @@ mod tests {
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
Some(custom_id),
|
||||
None,
|
||||
);
|
||||
|
||||
assert_eq!(ticket.id, custom_id);
|
||||
@@ -451,7 +321,6 @@ mod tests {
|
||||
"SELECT * FROM table1".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let ticket2 = process_manager.clone().register_query(
|
||||
@@ -460,7 +329,6 @@ mod tests {
|
||||
"SELECT * FROM table2".to_string(),
|
||||
"client2".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let running_processes = process_manager.local_processes(Some("public")).unwrap();
|
||||
@@ -482,7 +350,6 @@ mod tests {
|
||||
"SELECT * FROM table1".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let _ticket2 = process_manager.clone().register_query(
|
||||
@@ -491,7 +358,6 @@ mod tests {
|
||||
"SELECT * FROM table2".to_string(),
|
||||
"client2".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
// Test listing processes for specific catalog
|
||||
@@ -518,7 +384,6 @@ mod tests {
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
assert_eq!(process_manager.local_processes(None).unwrap().len(), 1);
|
||||
process_manager.deregister_query("public".to_string(), ticket.id);
|
||||
@@ -535,7 +400,6 @@ mod tests {
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
assert!(!ticket.cancellation_handle.is_cancelled());
|
||||
@@ -553,7 +417,6 @@ mod tests {
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
assert!(!ticket.cancellation_handle.is_cancelled());
|
||||
let killed = process_manager
|
||||
@@ -599,7 +462,6 @@ mod tests {
|
||||
"SELECT COUNT(*) FROM users WHERE age > 18".to_string(),
|
||||
"test_client".to_string(),
|
||||
Some(42),
|
||||
None,
|
||||
);
|
||||
|
||||
let processes = process_manager.local_processes(None).unwrap();
|
||||
@@ -626,7 +488,6 @@ mod tests {
|
||||
"SELECT * FROM table".to_string(),
|
||||
"client1".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
// Process should be registered
|
||||
|
||||
@@ -12,7 +12,6 @@ common-macro.workspace = true
|
||||
common-meta.workspace = true
|
||||
greptime-proto.workspace = true
|
||||
meta-client.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
|
||||
@@ -19,7 +19,6 @@ use snafu::OptionExt;
|
||||
|
||||
pub mod error;
|
||||
pub mod selector;
|
||||
pub mod slow_query_event;
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub struct DisplayProcessId {
|
||||
|
||||
@@ -1,28 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct SlowQueryEvent {
|
||||
pub cost: u64,
|
||||
pub threshold: u64,
|
||||
pub query: String,
|
||||
pub is_promql: bool,
|
||||
pub query_ctx: QueryContextRef,
|
||||
pub promql_range: Option<u64>,
|
||||
pub promql_step: Option<u64>,
|
||||
pub promql_start: Option<i64>,
|
||||
pub promql_end: Option<i64>,
|
||||
}
|
||||
@@ -32,7 +32,7 @@ use std::time::{Duration, SystemTime};
|
||||
use async_stream::stream;
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use catalog::process_manager::{ProcessManagerRef, QueryStatement as CatalogQueryStatement};
|
||||
use catalog::process_manager::ProcessManagerRef;
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::OutputData;
|
||||
use common_base::cancellation::CancellableFuture;
|
||||
@@ -64,9 +64,8 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
|
||||
use query::query_engine::options::{validate_catalog_and_schema, QueryOptions};
|
||||
use query::query_engine::DescribeResult;
|
||||
use query::QueryEngineRef;
|
||||
use servers::error::{
|
||||
self as server_error, AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu, UnexpectedResultSnafu,
|
||||
};
|
||||
use servers::error as server_error;
|
||||
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
|
||||
use servers::interceptor::{
|
||||
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
|
||||
};
|
||||
@@ -184,40 +183,36 @@ impl Instance {
|
||||
let query_interceptor = self.plugins.get::<SqlQueryInterceptorRef<Error>>();
|
||||
let query_interceptor = query_interceptor.as_ref();
|
||||
|
||||
if should_capture_statement(Some(&stmt)) {
|
||||
let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| {
|
||||
recorder.start(CatalogQueryStatement::Sql(stmt.clone()), query_ctx.clone())
|
||||
});
|
||||
|
||||
let ticket = self.process_manager.register_query(
|
||||
query_ctx.current_catalog().to_string(),
|
||||
vec![query_ctx.current_schema()],
|
||||
stmt.to_string(),
|
||||
query_ctx.conn_info().to_string(),
|
||||
Some(query_ctx.process_id()),
|
||||
slow_query_timer,
|
||||
);
|
||||
|
||||
let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
|
||||
|
||||
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
|
||||
.await
|
||||
.map_err(|_| error::CancelledSnafu.build())?
|
||||
.map(|output| {
|
||||
let Output { meta, data } = output;
|
||||
|
||||
let data = match data {
|
||||
OutputData::Stream(stream) => OutputData::Stream(Box::pin(
|
||||
CancellableStreamWrapper::new(stream, ticket),
|
||||
)),
|
||||
other => other,
|
||||
};
|
||||
Output { data, meta }
|
||||
})
|
||||
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
|
||||
recorder.start(QueryStatement::Sql(stmt.clone()), query_ctx.clone())
|
||||
} else {
|
||||
self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor)
|
||||
.await
|
||||
}
|
||||
None
|
||||
};
|
||||
|
||||
let ticket = self.process_manager.register_query(
|
||||
query_ctx.current_catalog().to_string(),
|
||||
vec![query_ctx.current_schema()],
|
||||
stmt.to_string(),
|
||||
query_ctx.conn_info().to_string(),
|
||||
Some(query_ctx.process_id()),
|
||||
);
|
||||
|
||||
let query_fut = self.exec_statement_with_timeout(stmt, query_ctx, query_interceptor);
|
||||
|
||||
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
|
||||
.await
|
||||
.map_err(|_| error::CancelledSnafu.build())?
|
||||
.map(|output| {
|
||||
let Output { meta, data } = output;
|
||||
|
||||
let data = match data {
|
||||
OutputData::Stream(stream) => {
|
||||
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
|
||||
}
|
||||
other => other,
|
||||
};
|
||||
Output { data, meta }
|
||||
})
|
||||
}
|
||||
|
||||
async fn exec_statement_with_timeout(
|
||||
@@ -433,54 +428,13 @@ impl SqlQueryHandler for Instance {
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_exec_plan(
|
||||
&self,
|
||||
stmt: Option<Statement>,
|
||||
plan: LogicalPlan,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
if should_capture_statement(stmt.as_ref()) {
|
||||
// It's safe to unwrap here because we've already checked the type.
|
||||
let stmt = stmt.unwrap();
|
||||
let query = stmt.to_string();
|
||||
let slow_query_timer = self.slow_query_recorder.as_ref().and_then(|recorder| {
|
||||
recorder.start(CatalogQueryStatement::Sql(stmt), query_ctx.clone())
|
||||
});
|
||||
|
||||
let ticket = self.process_manager.register_query(
|
||||
query_ctx.current_catalog().to_string(),
|
||||
vec![query_ctx.current_schema()],
|
||||
query,
|
||||
query_ctx.conn_info().to_string(),
|
||||
Some(query_ctx.process_id()),
|
||||
slow_query_timer,
|
||||
);
|
||||
|
||||
let query_fut = self.query_engine.execute(plan.clone(), query_ctx);
|
||||
|
||||
CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
|
||||
.await
|
||||
.map_err(|_| error::CancelledSnafu.build())?
|
||||
.map(|output| {
|
||||
let Output { meta, data } = output;
|
||||
|
||||
let data = match data {
|
||||
OutputData::Stream(stream) => OutputData::Stream(Box::pin(
|
||||
CancellableStreamWrapper::new(stream, ticket),
|
||||
)),
|
||||
other => other,
|
||||
};
|
||||
Output { data, meta }
|
||||
})
|
||||
.context(ExecLogicalPlanSnafu)
|
||||
} else {
|
||||
// plan should be prepared before exec
|
||||
// we'll do check there
|
||||
self.query_engine
|
||||
.execute(plan.clone(), query_ctx)
|
||||
.await
|
||||
.context(ExecLogicalPlanSnafu)
|
||||
}
|
||||
async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
// plan should be prepared before exec
|
||||
// we'll do check there
|
||||
self.query_engine
|
||||
.execute(plan.clone(), query_ctx)
|
||||
.await
|
||||
.context(ExecLogicalPlanSnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -574,6 +528,12 @@ impl PrometheusHandler for Instance {
|
||||
}
|
||||
})?;
|
||||
|
||||
let _slow_query_timer = if let Some(recorder) = &self.slow_query_recorder {
|
||||
recorder.start(stmt.clone(), query_ctx.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let plan = self
|
||||
.statement_executor
|
||||
.plan(&stmt, query_ctx.clone())
|
||||
@@ -583,47 +543,10 @@ impl PrometheusHandler for Instance {
|
||||
|
||||
interceptor.pre_execute(query, Some(&plan), query_ctx.clone())?;
|
||||
|
||||
// Take the EvalStmt from the original QueryStatement and use it to create the CatalogQueryStatement.
|
||||
let query_statement = if let QueryStatement::Promql(eval_stmt) = stmt {
|
||||
CatalogQueryStatement::Promql(eval_stmt)
|
||||
} else {
|
||||
// It should not happen since the query is already parsed successfully.
|
||||
return UnexpectedResultSnafu {
|
||||
reason: "The query should always be promql.".to_string(),
|
||||
}
|
||||
.fail();
|
||||
};
|
||||
let query = query_statement.to_string();
|
||||
|
||||
let slow_query_timer = self
|
||||
.slow_query_recorder
|
||||
.as_ref()
|
||||
.and_then(|recorder| recorder.start(query_statement, query_ctx.clone()));
|
||||
|
||||
let ticket = self.process_manager.register_query(
|
||||
query_ctx.current_catalog().to_string(),
|
||||
vec![query_ctx.current_schema()],
|
||||
query,
|
||||
query_ctx.conn_info().to_string(),
|
||||
Some(query_ctx.process_id()),
|
||||
slow_query_timer,
|
||||
);
|
||||
|
||||
let query_fut = self.statement_executor.exec_plan(plan, query_ctx.clone());
|
||||
|
||||
let output = CancellableFuture::new(query_fut, ticket.cancellation_handle.clone())
|
||||
let output = self
|
||||
.statement_executor
|
||||
.exec_plan(plan, query_ctx.clone())
|
||||
.await
|
||||
.map_err(|_| servers::error::CancelledSnafu.build())?
|
||||
.map(|output| {
|
||||
let Output { meta, data } = output;
|
||||
let data = match data {
|
||||
OutputData::Stream(stream) => {
|
||||
OutputData::Stream(Box::pin(CancellableStreamWrapper::new(stream, ticket)))
|
||||
}
|
||||
other => other,
|
||||
};
|
||||
Output { data, meta }
|
||||
})
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExecuteQuerySnafu)?;
|
||||
|
||||
@@ -841,15 +764,6 @@ fn validate_database(name: &ObjectName, query_ctx: &QueryContextRef) -> Result<(
|
||||
.context(SqlExecInterceptedSnafu)
|
||||
}
|
||||
|
||||
// Create a query ticket and slow query timer if the statement is a query or readonly statement.
|
||||
fn should_capture_statement(stmt: Option<&Statement>) -> bool {
|
||||
if let Some(stmt) = stmt {
|
||||
matches!(stmt, Statement::Query(_)) || stmt.is_readonly()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::collections::HashMap;
|
||||
|
||||
@@ -119,8 +119,7 @@ impl GrpcQueryHandler for Instance {
|
||||
.await
|
||||
.context(SubstraitDecodeLogicalPlanSnafu)?;
|
||||
let output =
|
||||
SqlQueryHandler::do_exec_plan(self, None, logical_plan, ctx.clone())
|
||||
.await?;
|
||||
SqlQueryHandler::do_exec_plan(self, logical_plan, ctx.clone()).await?;
|
||||
|
||||
attach_timer(output, timer)
|
||||
}
|
||||
@@ -398,7 +397,7 @@ impl Instance {
|
||||
.context(common_query::error::GeneralDataFusionSnafu)
|
||||
.context(SubstraitDecodeLogicalPlanSnafu)?;
|
||||
|
||||
let output = SqlQueryHandler::do_exec_plan(self, None, optimized_plan, ctx.clone()).await?;
|
||||
let output = SqlQueryHandler::do_exec_plan(self, optimized_plan, ctx.clone()).await?;
|
||||
|
||||
Ok(attach_timer(output, timer))
|
||||
}
|
||||
|
||||
@@ -22,5 +22,5 @@ pub(crate) mod limiter;
|
||||
pub(crate) mod metrics;
|
||||
pub mod server;
|
||||
pub mod service_config;
|
||||
pub mod slow_query_recorder;
|
||||
pub(crate) mod slow_query_recorder;
|
||||
mod stream_wrapper;
|
||||
|
||||
@@ -14,21 +14,22 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant, UNIX_EPOCH};
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{
|
||||
ColumnDataType, ColumnDef, ColumnSchema, CreateTableExpr, Row, RowInsertRequest,
|
||||
RowInsertRequests, Rows, SemanticType,
|
||||
};
|
||||
use catalog::process_manager::{QueryStatement as CatalogQueryStatement, SlowQueryTimer};
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::{default_engine, DEFAULT_PRIVATE_SCHEMA_NAME};
|
||||
use common_frontend::slow_query_event::SlowQueryEvent;
|
||||
use common_telemetry::logging::{SlowQueriesRecordType, SlowQueryOptions};
|
||||
use common_telemetry::{debug, error, info, slow};
|
||||
use common_time::timestamp::{TimeUnit, Timestamp};
|
||||
use operator::insert::InserterRef;
|
||||
use operator::statement::StatementExecutorRef;
|
||||
use query::parser::QueryStatement;
|
||||
use rand::random;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::ResultExt;
|
||||
use store_api::mito_engine_options::{APPEND_MODE_KEY, TTL_KEY};
|
||||
@@ -38,16 +39,16 @@ use tokio::task::JoinHandle;
|
||||
|
||||
use crate::error::{CatalogSnafu, Result, TableOperationSnafu};
|
||||
|
||||
pub const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
|
||||
pub const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
|
||||
pub const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
|
||||
pub const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
|
||||
pub const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
|
||||
pub const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
|
||||
pub const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
|
||||
pub const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
|
||||
pub const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
|
||||
pub const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
|
||||
const SLOW_QUERY_TABLE_NAME: &str = "slow_queries";
|
||||
const SLOW_QUERY_TABLE_COST_COLUMN_NAME: &str = "cost";
|
||||
const SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME: &str = "threshold";
|
||||
const SLOW_QUERY_TABLE_QUERY_COLUMN_NAME: &str = "query";
|
||||
const SLOW_QUERY_TABLE_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
|
||||
const SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME: &str = "is_promql";
|
||||
const SLOW_QUERY_TABLE_PROMQL_START_COLUMN_NAME: &str = "promql_start";
|
||||
const SLOW_QUERY_TABLE_PROMQL_END_COLUMN_NAME: &str = "promql_end";
|
||||
const SLOW_QUERY_TABLE_PROMQL_RANGE_COLUMN_NAME: &str = "promql_range";
|
||||
const SLOW_QUERY_TABLE_PROMQL_STEP_COLUMN_NAME: &str = "promql_step";
|
||||
|
||||
const DEFAULT_SLOW_QUERY_TABLE_TTL: &str = "30d";
|
||||
const DEFAULT_SLOW_QUERY_EVENTS_CHANNEL_SIZE: usize = 1024;
|
||||
@@ -60,6 +61,19 @@ pub struct SlowQueryRecorder {
|
||||
_handle: Arc<JoinHandle<()>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct SlowQueryEvent {
|
||||
cost: u64,
|
||||
threshold: u64,
|
||||
query: String,
|
||||
is_promql: bool,
|
||||
query_ctx: QueryContextRef,
|
||||
promql_range: Option<u64>,
|
||||
promql_step: Option<u64>,
|
||||
promql_start: Option<i64>,
|
||||
promql_end: Option<i64>,
|
||||
}
|
||||
|
||||
impl SlowQueryRecorder {
|
||||
/// Create a new SlowQueryRecorder.
|
||||
pub fn new(
|
||||
@@ -101,17 +115,18 @@ impl SlowQueryRecorder {
|
||||
/// The timer sets the start time when created and calculates the elapsed duration when dropped.
|
||||
pub fn start(
|
||||
&self,
|
||||
stmt: CatalogQueryStatement,
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Option<SlowQueryTimer> {
|
||||
if self.slow_query_opts.enable {
|
||||
Some(SlowQueryTimer::new(
|
||||
Some(SlowQueryTimer {
|
||||
stmt,
|
||||
query_ctx,
|
||||
self.slow_query_opts.threshold,
|
||||
self.slow_query_opts.sample_ratio,
|
||||
self.tx.clone(),
|
||||
))
|
||||
start: Instant::now(), // Set the initial start time.
|
||||
threshold: self.slow_query_opts.threshold,
|
||||
sample_ratio: self.slow_query_opts.sample_ratio,
|
||||
tx: self.tx.clone(),
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
@@ -432,3 +447,85 @@ impl SlowQueryEventHandler {
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
/// SlowQueryTimer is used to log slow query when it's dropped.
|
||||
/// In drop(), it will check if the query is slow and send the slow query event to the handler.
|
||||
pub struct SlowQueryTimer {
|
||||
start: Instant,
|
||||
stmt: QueryStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
threshold: Option<Duration>,
|
||||
sample_ratio: Option<f64>,
|
||||
tx: Sender<SlowQueryEvent>,
|
||||
}
|
||||
|
||||
impl SlowQueryTimer {
|
||||
fn send_slow_query_event(&self, elapsed: Duration, threshold: Duration) {
|
||||
let mut slow_query_event = SlowQueryEvent {
|
||||
cost: elapsed.as_millis() as u64,
|
||||
threshold: threshold.as_millis() as u64,
|
||||
query: "".to_string(),
|
||||
query_ctx: self.query_ctx.clone(),
|
||||
|
||||
// The following fields are only used for PromQL queries.
|
||||
is_promql: false,
|
||||
promql_range: None,
|
||||
promql_step: None,
|
||||
promql_start: None,
|
||||
promql_end: None,
|
||||
};
|
||||
|
||||
match &self.stmt {
|
||||
QueryStatement::Promql(stmt) => {
|
||||
slow_query_event.is_promql = true;
|
||||
slow_query_event.query = stmt.expr.to_string();
|
||||
slow_query_event.promql_step = Some(stmt.interval.as_millis() as u64);
|
||||
|
||||
let start = stmt
|
||||
.start
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64;
|
||||
|
||||
let end = stmt
|
||||
.end
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64;
|
||||
|
||||
slow_query_event.promql_range = Some((end - start) as u64);
|
||||
slow_query_event.promql_start = Some(start);
|
||||
slow_query_event.promql_end = Some(end);
|
||||
}
|
||||
QueryStatement::Sql(stmt) => {
|
||||
slow_query_event.query = stmt.to_string();
|
||||
}
|
||||
}
|
||||
|
||||
// Send SlowQueryEvent to the handler.
|
||||
if let Err(e) = self.tx.try_send(slow_query_event) {
|
||||
error!(e; "Failed to send slow query event");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SlowQueryTimer {
|
||||
fn drop(&mut self) {
|
||||
if let Some(threshold) = self.threshold {
|
||||
// Calculate the elaspsed duration since the timer is created.
|
||||
let elapsed = self.start.elapsed();
|
||||
if elapsed > threshold {
|
||||
if let Some(ratio) = self.sample_ratio {
|
||||
// Only capture a portion of slow queries based on sample_ratio.
|
||||
// Generate a random number in [0, 1) and compare it with sample_ratio.
|
||||
if ratio >= 1.0 || random::<f64>() <= ratio {
|
||||
self.send_slow_query_event(elapsed, threshold);
|
||||
}
|
||||
} else {
|
||||
// Captures all slow queries if sample_ratio is not set.
|
||||
self.send_slow_query_event(elapsed, threshold);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -187,7 +187,6 @@ mod tests {
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
@@ -211,7 +210,6 @@ mod tests {
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
// Cancel before creating the wrapper
|
||||
@@ -236,7 +234,6 @@ mod tests {
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let cancellation_handle = ticket.cancellation_handle.clone();
|
||||
|
||||
@@ -264,7 +261,6 @@ mod tests {
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let cancellation_handle = ticket.cancellation_handle.clone();
|
||||
|
||||
@@ -293,7 +289,6 @@ mod tests {
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let mut cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
@@ -324,7 +319,6 @@ mod tests {
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
|
||||
let cancellable_stream = CancellableStreamWrapper::new(Box::pin(mock_stream), ticket);
|
||||
@@ -353,7 +347,6 @@ mod tests {
|
||||
"query".to_string(),
|
||||
"client".to_string(),
|
||||
None,
|
||||
None,
|
||||
);
|
||||
let cancellation_handle = ticket.cancellation_handle.clone();
|
||||
|
||||
|
||||
@@ -66,10 +66,6 @@ const COLUMN_PREFIX: &str = "__column_";
|
||||
/// itself.
|
||||
pub struct MetadataRegion {
|
||||
pub(crate) mito: MitoEngine,
|
||||
/// The cache for contents(key-value pairs) of region metadata.
|
||||
///
|
||||
/// The cache should be invalidated when any new values are put into the metadata region or any
|
||||
/// values are deleted from the metadata region.
|
||||
cache: Cache<RegionId, RegionMetadataCacheEntry>,
|
||||
/// Logical lock for operations that need to be serialized. Like update & read region columns.
|
||||
///
|
||||
@@ -84,19 +80,14 @@ struct RegionMetadataCacheEntry {
|
||||
size: usize,
|
||||
}
|
||||
|
||||
/// The max size of the region metadata cache.
|
||||
const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
|
||||
/// The TTL of the region metadata cache.
|
||||
const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
impl MetadataRegion {
|
||||
pub fn new(mito: MitoEngine) -> Self {
|
||||
let cache = Cache::builder()
|
||||
.max_capacity(MAX_CACHE_SIZE)
|
||||
// Use the LRU eviction policy to minimize frequent mito scans.
|
||||
// Recently accessed items are retained longer in the cache.
|
||||
.eviction_policy(EvictionPolicy::lru())
|
||||
.time_to_live(CACHE_TTL)
|
||||
.time_to_live(Duration::from_secs(60))
|
||||
.weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
|
||||
.build();
|
||||
Self {
|
||||
@@ -391,11 +382,11 @@ impl MetadataRegion {
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
|
||||
async fn load_all(&self, region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
|
||||
let scan_req = MetadataRegion::build_read_request();
|
||||
let record_batch_stream = self
|
||||
.mito
|
||||
.scan_to_stream(metadata_region_id, scan_req)
|
||||
.scan_to_stream(region_id, scan_req)
|
||||
.await
|
||||
.context(MitoReadOperationSnafu)?;
|
||||
|
||||
@@ -416,12 +407,12 @@ impl MetadataRegion {
|
||||
|
||||
async fn get_all_with_prefix(
|
||||
&self,
|
||||
metadata_region_id: RegionId,
|
||||
region_id: RegionId,
|
||||
prefix: &str,
|
||||
) -> Result<HashMap<String, String>> {
|
||||
let region_metadata = self
|
||||
.cache
|
||||
.try_get_with(metadata_region_id, self.load_all(metadata_region_id))
|
||||
.try_get_with(region_id, self.load_all(region_id))
|
||||
.await
|
||||
.context(CacheGetSnafu)?;
|
||||
|
||||
@@ -455,17 +446,16 @@ impl MetadataRegion {
|
||||
|
||||
/// Delete the given keys. For performance consideration, this method
|
||||
/// doesn't check if those keys exist or not.
|
||||
async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
|
||||
async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> {
|
||||
let delete_request = Self::build_delete_request(keys);
|
||||
self.mito
|
||||
.handle_request(
|
||||
metadata_region_id,
|
||||
region_id,
|
||||
store_api::region_request::RegionRequest::Delete(delete_request),
|
||||
)
|
||||
.await
|
||||
.context(MitoWriteOperationSnafu)?;
|
||||
// Invalidates the region metadata cache if any values are deleted from the metadata region.
|
||||
self.cache.invalidate(&metadata_region_id).await;
|
||||
self.cache.invalidate(®ion_id).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -556,7 +546,7 @@ impl MetadataRegion {
|
||||
write_region_id: bool,
|
||||
logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
|
||||
) -> Result<()> {
|
||||
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
|
||||
let region_id = utils::to_metadata_region_id(physical_region_id);
|
||||
let iter = logical_regions
|
||||
.into_iter()
|
||||
.flat_map(|(logical_region_id, column_metadatas)| {
|
||||
@@ -583,13 +573,12 @@ impl MetadataRegion {
|
||||
let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
|
||||
self.mito
|
||||
.handle_request(
|
||||
metadata_region_id,
|
||||
region_id,
|
||||
store_api::region_request::RegionRequest::Put(put_request),
|
||||
)
|
||||
.await
|
||||
.context(MitoWriteOperationSnafu)?;
|
||||
// Invalidates the region metadata cache if any new values are put into the metadata region.
|
||||
self.cache.invalidate(&metadata_region_id).await;
|
||||
self.cache.invalidate(®ion_id).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -624,12 +624,6 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Unknown hint: {}", hint))]
|
||||
UnknownHint { hint: String },
|
||||
|
||||
#[snafu(display("Query has been cancelled"))]
|
||||
Cancelled {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -753,8 +747,6 @@ impl ErrorExt for Error {
|
||||
DurationOverflow { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
HandleOtelArrowRequest { .. } => StatusCode::Internal,
|
||||
|
||||
Cancelled { .. } => StatusCode::Cancelled,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1213,7 +1213,6 @@ mod test {
|
||||
use query::parser::PromQuery;
|
||||
use query::query_engine::DescribeResult;
|
||||
use session::context::QueryContextRef;
|
||||
use sql::statements::statement::Statement;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::time::Instant;
|
||||
|
||||
@@ -1244,7 +1243,6 @@ mod test {
|
||||
|
||||
async fn do_exec_plan(
|
||||
&self,
|
||||
_stmt: Option<Statement>,
|
||||
_plan: LogicalPlan,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, Self::Error> {
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use datatypes::schema::Schema;
|
||||
use sql::statements::statement::Statement;
|
||||
|
||||
pub mod addrs;
|
||||
pub mod configurator;
|
||||
@@ -56,8 +55,6 @@ pub mod tls;
|
||||
#[derive(Clone)]
|
||||
pub struct SqlPlan {
|
||||
query: String,
|
||||
// Store the parsed statement to determine if it is a query and whether to track it.
|
||||
statement: Option<Statement>,
|
||||
plan: Option<LogicalPlan>,
|
||||
schema: Option<Schema>,
|
||||
}
|
||||
|
||||
@@ -136,7 +136,6 @@ impl MysqlInstanceShim {
|
||||
async fn do_exec_plan(
|
||||
&self,
|
||||
query: &str,
|
||||
stmt: Option<Statement>,
|
||||
plan: LogicalPlan,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
@@ -145,7 +144,7 @@ impl MysqlInstanceShim {
|
||||
{
|
||||
Ok(output)
|
||||
} else {
|
||||
self.query_handler.do_exec_plan(stmt, plan, query_ctx).await
|
||||
self.query_handler.do_exec_plan(plan, query_ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -232,7 +231,6 @@ impl MysqlInstanceShim {
|
||||
self.save_plan(
|
||||
SqlPlan {
|
||||
query: query.to_string(),
|
||||
statement: Some(statement),
|
||||
plan: None,
|
||||
schema: None,
|
||||
},
|
||||
@@ -242,7 +240,6 @@ impl MysqlInstanceShim {
|
||||
self.save_plan(
|
||||
SqlPlan {
|
||||
query: query.to_string(),
|
||||
statement: Some(statement),
|
||||
plan,
|
||||
schema,
|
||||
},
|
||||
@@ -294,13 +291,8 @@ impl MysqlInstanceShim {
|
||||
|
||||
debug!("Mysql execute prepared plan: {}", plan.display_indent());
|
||||
vec![
|
||||
self.do_exec_plan(
|
||||
&sql_plan.query,
|
||||
sql_plan.statement.clone(),
|
||||
plan,
|
||||
query_ctx.clone(),
|
||||
)
|
||||
.await,
|
||||
self.do_exec_plan(&sql_plan.query, plan, query_ctx.clone())
|
||||
.await,
|
||||
]
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -228,7 +228,6 @@ impl QueryParser for DefaultQueryParser {
|
||||
if sql.is_empty() || fixtures::matches(sql) {
|
||||
return Ok(SqlPlan {
|
||||
query: sql.to_owned(),
|
||||
statement: None,
|
||||
plan: None,
|
||||
schema: None,
|
||||
});
|
||||
@@ -249,7 +248,7 @@ impl QueryParser for DefaultQueryParser {
|
||||
|
||||
let describe_result = self
|
||||
.query_handler
|
||||
.do_describe(stmt.clone(), query_ctx)
|
||||
.do_describe(stmt, query_ctx)
|
||||
.await
|
||||
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
|
||||
|
||||
@@ -265,7 +264,6 @@ impl QueryParser for DefaultQueryParser {
|
||||
|
||||
Ok(SqlPlan {
|
||||
query: sql.to_owned(),
|
||||
statement: Some(stmt),
|
||||
plan,
|
||||
schema,
|
||||
})
|
||||
@@ -320,7 +318,7 @@ impl ExtendedQueryHandler for PostgresServerHandlerInner {
|
||||
)?))
|
||||
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
|
||||
self.query_handler
|
||||
.do_exec_plan(sql_plan.statement.clone(), plan, query_ctx.clone())
|
||||
.do_exec_plan(plan, query_ctx.clone())
|
||||
.await
|
||||
} else {
|
||||
// manually replace variables in prepared statement when no
|
||||
|
||||
@@ -41,7 +41,6 @@ pub trait SqlQueryHandler {
|
||||
|
||||
async fn do_exec_plan(
|
||||
&self,
|
||||
stmt: Option<Statement>,
|
||||
plan: LogicalPlan,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, Self::Error>;
|
||||
@@ -89,14 +88,9 @@ where
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn do_exec_plan(
|
||||
&self,
|
||||
stmt: Option<Statement>,
|
||||
plan: LogicalPlan,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
self.0
|
||||
.do_exec_plan(stmt, plan, query_ctx)
|
||||
.do_exec_plan(plan, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecutePlanSnafu)
|
||||
|
||||
@@ -31,7 +31,6 @@ use servers::influxdb::InfluxdbRequest;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use servers::query_handler::InfluxdbLineProtocolHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use sql::statements::statement::Statement;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
struct DummyInstance {
|
||||
@@ -60,7 +59,6 @@ impl SqlQueryHandler for DummyInstance {
|
||||
|
||||
async fn do_exec_plan(
|
||||
&self,
|
||||
_stmt: Option<Statement>,
|
||||
_plan: LogicalPlan,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, Self::Error> {
|
||||
|
||||
@@ -28,7 +28,6 @@ use servers::opentsdb::codec::DataPoint;
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use servers::query_handler::OpentsdbProtocolHandler;
|
||||
use session::context::QueryContextRef;
|
||||
use sql::statements::statement::Statement;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
struct DummyInstance {
|
||||
@@ -60,7 +59,6 @@ impl SqlQueryHandler for DummyInstance {
|
||||
|
||||
async fn do_exec_plan(
|
||||
&self,
|
||||
_stmt: Option<Statement>,
|
||||
_plan: LogicalPlan,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, Self::Error> {
|
||||
|
||||
@@ -35,7 +35,6 @@ use servers::prom_store::{snappy_compress, Metrics};
|
||||
use servers::query_handler::sql::SqlQueryHandler;
|
||||
use servers::query_handler::{PromStoreProtocolHandler, PromStoreResponse};
|
||||
use session::context::QueryContextRef;
|
||||
use sql::statements::statement::Statement;
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
struct DummyInstance {
|
||||
@@ -88,7 +87,6 @@ impl SqlQueryHandler for DummyInstance {
|
||||
|
||||
async fn do_exec_plan(
|
||||
&self,
|
||||
_stmt: Option<Statement>,
|
||||
_plan: LogicalPlan,
|
||||
_query_ctx: QueryContextRef,
|
||||
) -> std::result::Result<Output, Self::Error> {
|
||||
|
||||
@@ -68,12 +68,7 @@ impl SqlQueryHandler for DummyInstance {
|
||||
vec![Ok(output)]
|
||||
}
|
||||
|
||||
async fn do_exec_plan(
|
||||
&self,
|
||||
_stmt: Option<Statement>,
|
||||
plan: LogicalPlan,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<Output> {
|
||||
async fn do_exec_plan(&self, plan: LogicalPlan, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
Ok(self.query_engine.execute(plan, query_ctx).await.unwrap())
|
||||
}
|
||||
|
||||
|
||||
@@ -100,5 +100,4 @@ session = { workspace = true, features = ["testing"] }
|
||||
store-api.workspace = true
|
||||
tokio-postgres = { workspace = true }
|
||||
url = "2.3"
|
||||
urlencoding = "2.1"
|
||||
yaml-rust = "0.4"
|
||||
|
||||
@@ -13,7 +13,6 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use cache::{
|
||||
build_datanode_cache_registry, build_fundamental_cache_registry,
|
||||
@@ -42,7 +41,6 @@ use common_meta::sequence::SequenceBuilder;
|
||||
use common_meta::wal_options_allocator::build_wal_options_allocator;
|
||||
use common_procedure::options::ProcedureConfig;
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
use common_telemetry::logging::SlowQueryOptions;
|
||||
use common_wal::config::{DatanodeWalConfig, MetasrvWalConfig};
|
||||
use datanode::datanode::DatanodeBuilder;
|
||||
use flow::{FlownodeBuilder, FrontendClient, GrpcQueryHandlerWithBoxedError};
|
||||
@@ -329,13 +327,6 @@ impl GreptimeDbStandaloneBuilder {
|
||||
metadata_store: kv_backend_config,
|
||||
wal: self.metasrv_wal_config.clone().into(),
|
||||
grpc: GrpcOptions::default().with_server_addr("127.0.0.1:4001"),
|
||||
// Enable slow query log with 1s threshold to run the slow query test.
|
||||
slow_query: Some(SlowQueryOptions {
|
||||
enable: true,
|
||||
// Set the threshold to 1s to run the slow query test.
|
||||
threshold: Some(Duration::from_secs(1)),
|
||||
..Default::default()
|
||||
}),
|
||||
..StandaloneOptions::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::io::Write;
|
||||
use std::str::FromStr;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::prom_store::remote::label_matcher::Type as MatcherType;
|
||||
use api::prom_store::remote::{
|
||||
@@ -24,13 +23,10 @@ use api::prom_store::remote::{
|
||||
use auth::user_provider_from_option;
|
||||
use axum::http::{HeaderName, HeaderValue, StatusCode};
|
||||
use chrono::Utc;
|
||||
use common_catalog::consts::{
|
||||
trace_services_table_name, DEFAULT_PRIVATE_SCHEMA_NAME, TRACE_TABLE_NAME,
|
||||
};
|
||||
use common_catalog::consts::{trace_services_table_name, TRACE_TABLE_NAME};
|
||||
use common_error::status_code::StatusCode as ErrorCode;
|
||||
use flate2::write::GzEncoder;
|
||||
use flate2::Compression;
|
||||
use frontend::slow_query_recorder::{SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME};
|
||||
use log_query::{Context, Limit, LogQuery, TimeFilter};
|
||||
use loki_proto::logproto::{EntryAdapter, LabelPairAdapter, PushRequest, StreamAdapter};
|
||||
use loki_proto::prost_types::Timestamp;
|
||||
@@ -59,7 +55,6 @@ use tests_integration::test_util::{
|
||||
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
|
||||
StorageType,
|
||||
};
|
||||
use urlencoding::encode;
|
||||
use yaml_rust::YamlLoader;
|
||||
|
||||
#[macro_export]
|
||||
@@ -93,7 +88,6 @@ macro_rules! http_tests {
|
||||
|
||||
test_http_auth,
|
||||
test_sql_api,
|
||||
test_http_sql_slow_query,
|
||||
test_prometheus_promql_api,
|
||||
test_prom_http_api,
|
||||
test_metrics_api,
|
||||
@@ -539,29 +533,6 @@ pub async fn test_sql_api(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_http_sql_slow_query(store_type: StorageType) {
|
||||
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "sql_api").await;
|
||||
let client = TestClient::new(app).await;
|
||||
|
||||
let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte";
|
||||
let encoded_slow_query = encode(slow_query);
|
||||
|
||||
let query_params = format!("/v1/sql?sql={encoded_slow_query}");
|
||||
let res = client.get(&query_params).send().await;
|
||||
assert_eq!(res.status(), StatusCode::OK);
|
||||
|
||||
// Wait for the slow query to be recorded.
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
|
||||
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
|
||||
let query = format!("SELECT {} FROM {table}", SLOW_QUERY_TABLE_QUERY_COLUMN_NAME);
|
||||
|
||||
let expected = format!(r#"[["{}"]]"#, slow_query);
|
||||
validate_data("test_http_sql_slow_query", &client, &query, &expected).await;
|
||||
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_prometheus_promql_api(store_type: StorageType) {
|
||||
let (app, mut guard) = setup_test_http_app_with_frontend(store_type, "promql_api").await;
|
||||
let client = TestClient::new(app).await;
|
||||
@@ -1300,7 +1271,7 @@ write_interval = "30s"
|
||||
[slow_query]
|
||||
enable = true
|
||||
record_type = "system_table"
|
||||
threshold = "1s"
|
||||
threshold = "30s"
|
||||
sample_ratio = 1.0
|
||||
ttl = "30d"
|
||||
|
||||
|
||||
@@ -16,12 +16,6 @@ use std::collections::HashMap;
|
||||
|
||||
use auth::user_provider_from_option;
|
||||
use chrono::{DateTime, NaiveDate, NaiveDateTime, SecondsFormat, Utc};
|
||||
use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
|
||||
use frontend::slow_query_recorder::{
|
||||
SLOW_QUERY_TABLE_COST_COLUMN_NAME, SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME,
|
||||
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
|
||||
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
|
||||
};
|
||||
use sqlx::mysql::{MySqlConnection, MySqlDatabaseError, MySqlPoolOptions};
|
||||
use sqlx::postgres::{PgDatabaseError, PgPoolOptions};
|
||||
use sqlx::{Connection, Executor, Row};
|
||||
@@ -70,12 +64,10 @@ macro_rules! sql_tests {
|
||||
test_mysql_crud,
|
||||
test_mysql_timezone,
|
||||
test_mysql_async_timestamp,
|
||||
test_mysql_slow_query,
|
||||
test_postgres_auth,
|
||||
test_postgres_crud,
|
||||
test_postgres_timezone,
|
||||
test_postgres_bytea,
|
||||
test_postgres_slow_query,
|
||||
test_postgres_datestyle,
|
||||
test_postgres_parameter_inference,
|
||||
test_postgres_array_types,
|
||||
@@ -588,56 +580,6 @@ pub async fn test_postgres_crud(store_type: StorageType) {
|
||||
let _ = fe_pg_server.shutdown().await;
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_mysql_slow_query(store_type: StorageType) {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
|
||||
let (mut guard, fe_mysql_server) =
|
||||
setup_mysql_server(store_type, "test_mysql_slow_query").await;
|
||||
let addr = fe_mysql_server.bind_addr().unwrap().to_string();
|
||||
|
||||
let pool = MySqlPoolOptions::new()
|
||||
.max_connections(2)
|
||||
.connect(&format!("mysql://{addr}/public"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// The slow query will run at least longer than 1s.
|
||||
let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte";
|
||||
|
||||
// Simulate a slow query.
|
||||
sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
|
||||
|
||||
// Wait for the slow query to be recorded.
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
|
||||
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
|
||||
let query = format!(
|
||||
"SELECT {}, {}, {}, {} FROM {table}",
|
||||
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
|
||||
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
|
||||
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
|
||||
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME
|
||||
);
|
||||
|
||||
let rows = sqlx::query(&query).fetch_all(&pool).await.unwrap();
|
||||
assert_eq!(rows.len(), 1);
|
||||
|
||||
// Check the results.
|
||||
let row = &rows[0];
|
||||
let cost: u64 = row.get(0);
|
||||
let threshold: u64 = row.get(1);
|
||||
let query: String = row.get(2);
|
||||
let is_promql: bool = row.get(3);
|
||||
|
||||
assert!(cost > 0 && threshold > 0 && cost > threshold);
|
||||
assert_eq!(query, slow_query);
|
||||
assert!(!is_promql);
|
||||
|
||||
let _ = fe_mysql_server.shutdown().await;
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_postgres_bytea(store_type: StorageType) {
|
||||
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_bytea").await;
|
||||
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
||||
@@ -708,46 +650,6 @@ pub async fn test_postgres_bytea(store_type: StorageType) {
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_postgres_slow_query(store_type: StorageType) {
|
||||
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_slow_query").await;
|
||||
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
||||
|
||||
let pool = PgPoolOptions::new()
|
||||
.max_connections(2)
|
||||
.connect(&format!("postgres://{addr}/public"))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let slow_query = "WITH RECURSIVE slow_cte AS (SELECT 1 AS n, md5(random()) AS hash UNION ALL SELECT n + 1, md5(concat(hash, n)) FROM slow_cte WHERE n < 4500) SELECT COUNT(*) FROM slow_cte";
|
||||
let _ = sqlx::query(slow_query).fetch_all(&pool).await.unwrap();
|
||||
|
||||
// Wait for the slow query to be recorded.
|
||||
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
|
||||
|
||||
let table = format!("{}.{}", DEFAULT_PRIVATE_SCHEMA_NAME, SLOW_QUERY_TABLE_NAME);
|
||||
let query = format!(
|
||||
"SELECT {}, {}, {}, {} FROM {table}",
|
||||
SLOW_QUERY_TABLE_COST_COLUMN_NAME,
|
||||
SLOW_QUERY_TABLE_THRESHOLD_COLUMN_NAME,
|
||||
SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
|
||||
SLOW_QUERY_TABLE_IS_PROMQL_COLUMN_NAME
|
||||
);
|
||||
let rows = sqlx::query(&query).fetch_all(&pool).await.unwrap();
|
||||
assert_eq!(rows.len(), 1);
|
||||
let row = &rows[0];
|
||||
let cost: i64 = row.get(0);
|
||||
let threshold: i64 = row.get(1);
|
||||
let query: String = row.get(2);
|
||||
let is_promql: bool = row.get(3);
|
||||
|
||||
assert!(cost > 0 && threshold > 0 && cost > threshold);
|
||||
assert_eq!(query, slow_query);
|
||||
assert!(!is_promql);
|
||||
|
||||
let _ = fe_pg_server.shutdown().await;
|
||||
guard.remove_all().await;
|
||||
}
|
||||
|
||||
pub async fn test_postgres_datestyle(store_type: StorageType) {
|
||||
let (mut guard, fe_pg_server) = setup_pg_server(store_type, "test_postgres_datestyle").await;
|
||||
let addr = fe_pg_server.bind_addr().unwrap().to_string();
|
||||
|
||||
Reference in New Issue
Block a user