Compare commits

...

12 Commits

Author SHA1 Message Date
Ning Sun
5c8e8316e5 test: update test to include histogram 2025-12-18 16:13:33 +08:00
Ning Sun
56baef08af Merge branch 'feature/update-proto-native-histogram' into ci/git-deps-check 2025-12-18 14:52:24 +08:00
Ning Sun
223a978b17 chore: update proto 2025-12-18 14:46:00 +08:00
Ning Sun
f6871d8aa2 chore: add changes to Cargo.toml to trigger CI 2025-12-18 12:00:57 +08:00
Ning Sun
b5fc95df42 ci: add a CI check to ensure whitelisted dependencies are using their main branch 2025-12-18 11:57:16 +08:00
Ning Sun
6faf1bcb04 chore: update proto to include native histogram 2025-12-18 11:39:50 +08:00
jeremyhi
95eccd6cde feat: introduce granularity for memory manager (#7416)
* feat: introduce granularity for memory manager

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: add unit test

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: remove granularity getter for mamanger

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* Update src/common/memory-manager/src/manager.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* feat: acquire_with_policy for manager

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-12-17 11:08:51 +00:00
fys
0bc5a305be chore: add wait_initialized method for frontend client (#7414)
* chore: add wait_initialized method for frontend client

* fix: some

* fix: cargo fmt

* add comment

* add unit test

* rename

* fix: cargo check

* fix: cr by copilot
2025-12-17 08:13:36 +00:00
discord9
1afcddd5a9 chore: feature gate vector_index (#7428)
Signed-off-by: discord9 <discord9@163.com>
2025-12-17 07:14:25 +00:00
shuiyisong
62808b887b fix: using anonymous s3 access when ak and sk is not provided (#7425)
* chore: allow s3 anon

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: disable ec2 metadata

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-12-17 06:34:29 +00:00
discord9
04ddd40e00 chore: bump version to beta.3 (#7423)
chore: bump to beta.3

Signed-off-by: discord9 <discord9@163.com>
2025-12-17 04:18:23 +00:00
liyang
b4f028be5f chore: change etcd endpoints to array in the test scripts (#7419)
chore: change etcd endpoint

Signed-off-by: liyang <daviderli614@gmail.com>
2025-12-17 03:14:35 +00:00
21 changed files with 673 additions and 260 deletions

View File

@@ -51,7 +51,7 @@ runs:
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \

View File

@@ -81,7 +81,7 @@ function deploy_greptimedb_cluster() {
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set "meta.backendStorage.etcd.endpoints[0]=etcd.$install_namespace.svc.cluster.local:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
-n "$install_namespace"
@@ -119,7 +119,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set "meta.backendStorage.etcd.endpoints[0]=etcd.$install_namespace.svc.cluster.local:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
--set objectStorage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set objectStorage.s3.region="$AWS_REGION" \

154
.github/workflows/check-git-deps.yml vendored Normal file
View File

@@ -0,0 +1,154 @@
name: Check Git Dependencies on Main Branch
on:
pull_request:
branches: [main]
paths:
- 'Cargo.toml'
push:
branches: [main]
paths:
- 'Cargo.toml'
jobs:
check-git-deps:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v6
- name: Check git dependencies
env:
WHITELIST_DEPS: "greptime-proto,meter-core,meter-macros"
run: |
#!/bin/bash
set -e
echo "Checking whitelisted git dependencies..."
# Function to check if a commit is on main branch
check_commit_on_main() {
local repo_url="$1"
local commit="$2"
local repo_name=$(basename "$repo_url" .git)
echo "Checking $repo_name"
echo "Repo: $repo_url"
echo "Commit: $commit"
# Create a temporary directory for cloning
local temp_dir=$(mktemp -d)
# Clone the repository
if git clone "$repo_url" "$temp_dir" 2>/dev/null; then
cd "$temp_dir"
# Try to determine the main branch name
local main_branch="main"
if ! git rev-parse --verify origin/main >/dev/null 2>&1; then
if git rev-parse --verify origin/master >/dev/null 2>&1; then
main_branch="master"
else
# Try to get the default branch
main_branch=$(git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@')
fi
fi
echo "Main branch: $main_branch"
# Check if commit exists
if git cat-file -e "$commit" 2>/dev/null; then
# Check if commit is on main branch
if git merge-base --is-ancestor "$commit" "origin/$main_branch" 2>/dev/null; then
echo "PASS: Commit $commit is on $main_branch branch"
cd - >/dev/null
rm -rf "$temp_dir"
return 0
else
echo "FAIL: Commit $commit is NOT on $main_branch branch"
# Try to find which branch contains this commit
local branch_name=$(git branch -r --contains "$commit" 2>/dev/null | head -1 | sed 's/^[[:space:]]*origin\///' | sed 's/[[:space:]]*$//')
if [[ -n "$branch_name" ]]; then
echo "Found on branch: $branch_name"
fi
cd - >/dev/null
rm -rf "$temp_dir"
return 1
fi
else
echo "FAIL: Commit $commit not found in repository"
cd - >/dev/null
rm -rf "$temp_dir"
return 1
fi
else
echo "FAIL: Failed to clone $repo_url"
rm -rf "$temp_dir"
return 1
fi
}
# Extract whitelisted git dependencies from Cargo.toml
echo "Extracting git dependencies from Cargo.toml..."
# Create temporary array to store dependencies
declare -a deps=()
# Build awk pattern from whitelist
IFS=',' read -ra WHITELIST <<< "$WHITELIST_DEPS"
awk_pattern=""
for dep in "${WHITELIST[@]}"; do
if [[ -n "$awk_pattern" ]]; then
awk_pattern="$awk_pattern|"
fi
awk_pattern="$awk_pattern$dep"
done
# Extract whitelisted dependencies
while IFS= read -r line; do
if [[ -n "$line" ]]; then
deps+=("$line")
fi
done < <(awk -v pattern="$awk_pattern" '
$0 ~ pattern ".*git = \"https:/" {
match($0, /git = "([^"]+)"/, arr)
git_url = arr[1]
if (match($0, /rev = "([^"]+)"/, rev_arr)) {
rev = rev_arr[1]
print git_url " " rev
} else {
# Check next line for rev
getline
if (match($0, /rev = "([^"]+)"/, rev_arr)) {
rev = rev_arr[1]
print git_url " " rev
}
}
}
' Cargo.toml)
echo "Found ${#deps[@]} dependencies to check:"
for dep in "${deps[@]}"; do
echo " $dep"
done
failed=0
for dep in "${deps[@]}"; do
read -r repo_url commit <<< "$dep"
if ! check_commit_on_main "$repo_url" "$commit"; then
failed=1
fi
done
echo "Check completed."
if [[ $failed -eq 1 ]]; then
echo "ERROR: Some git dependencies are not on their main branches!"
echo "Please update the commits to point to main branch commits."
exit 1
else
echo "SUCCESS: All git dependencies are on their main branches!"
fi

154
Cargo.lock generated
View File

@@ -212,7 +212,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow-schema",
"common-base",
@@ -733,7 +733,7 @@ dependencies = [
[[package]]
name = "auth"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -1383,7 +1383,7 @@ dependencies = [
[[package]]
name = "cache"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"catalog",
"common-error",
@@ -1418,7 +1418,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow",
@@ -1763,7 +1763,7 @@ checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675"
[[package]]
name = "cli"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-stream",
"async-trait",
@@ -1816,7 +1816,7 @@ dependencies = [
[[package]]
name = "client"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arc-swap",
@@ -1849,7 +1849,7 @@ dependencies = [
"snafu 0.8.6",
"store-api",
"substrait 0.37.3",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"tokio",
"tokio-stream",
"tonic 0.13.1",
@@ -1889,7 +1889,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"auth",
@@ -2023,7 +2023,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anymap2",
"async-trait",
@@ -2047,14 +2047,14 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"const_format",
]
[[package]]
name = "common-config"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-error",
@@ -2079,7 +2079,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow",
"arrow-schema",
@@ -2114,7 +2114,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2127,7 +2127,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-macro",
"http 1.3.1",
@@ -2138,7 +2138,7 @@ dependencies = [
[[package]]
name = "common-event-recorder"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -2160,7 +2160,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -2182,7 +2182,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -2242,7 +2242,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"common-runtime",
@@ -2259,7 +2259,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-flight",
@@ -2294,7 +2294,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"common-base",
@@ -2314,7 +2314,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"greptime-proto",
"once_cell",
@@ -2325,7 +2325,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anyhow",
"common-error",
@@ -2341,7 +2341,7 @@ dependencies = [
[[package]]
name = "common-memory-manager"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-error",
"common-macro",
@@ -2354,7 +2354,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anymap2",
"api",
@@ -2426,7 +2426,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2435,11 +2435,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
[[package]]
name = "common-pprof"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-error",
"common-macro",
@@ -2451,7 +2451,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-stream",
@@ -2480,7 +2480,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"common-procedure",
@@ -2490,7 +2490,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -2516,7 +2516,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arc-swap",
"common-base",
@@ -2540,7 +2540,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -2569,7 +2569,7 @@ dependencies = [
[[package]]
name = "common-session"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"serde",
"strum 0.27.1",
@@ -2577,7 +2577,7 @@ dependencies = [
[[package]]
name = "common-sql"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-decimal",
@@ -2595,7 +2595,7 @@ dependencies = [
[[package]]
name = "common-stat"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-runtime",
@@ -2610,7 +2610,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"backtrace",
"common-base",
@@ -2639,7 +2639,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"client",
"common-grpc",
@@ -2652,7 +2652,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow",
"chrono",
@@ -2670,7 +2670,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"build-data",
"cargo-manifest",
@@ -2681,7 +2681,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-error",
@@ -2704,7 +2704,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-telemetry",
"serde",
@@ -4012,7 +4012,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-flight",
@@ -4076,7 +4076,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow",
"arrow-array",
@@ -4750,7 +4750,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-engine"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -4882,7 +4882,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow",
@@ -4951,7 +4951,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tokio",
"tonic 0.13.1",
@@ -5012,7 +5012,7 @@ checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
[[package]]
name = "frontend"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arc-swap",
@@ -5459,7 +5459,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0423fa30203187c75e2937a668df1da699c8b96c#0423fa30203187c75e2937a668df1da699c8b96c"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=173efe5ec62722089db7c531c0b0d470a072b915#173efe5ec62722089db7c531c0b0d470a072b915"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",
@@ -6227,7 +6227,7 @@ dependencies = [
[[package]]
name = "index"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -7168,7 +7168,7 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "log-query"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"chrono",
"common-error",
@@ -7180,7 +7180,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-stream",
"async-trait",
@@ -7481,7 +7481,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -7509,7 +7509,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -7609,7 +7609,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"aquamarine",
@@ -7706,7 +7706,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"bytes",
@@ -7731,7 +7731,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"aquamarine",
@@ -8471,7 +8471,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anyhow",
"bytes",
@@ -8756,7 +8756,7 @@ dependencies = [
[[package]]
name = "operator"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -8816,7 +8816,7 @@ dependencies = [
"sql",
"sqlparser",
"store-api",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tokio",
"tokio-util",
@@ -9102,7 +9102,7 @@ dependencies = [
[[package]]
name = "partition"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -9459,7 +9459,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -9615,7 +9615,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"auth",
"catalog",
@@ -9917,7 +9917,7 @@ dependencies = [
[[package]]
name = "promql"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"async-trait",
@@ -10200,7 +10200,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-compression 0.4.19",
"async-trait",
@@ -10242,7 +10242,7 @@ dependencies = [
[[package]]
name = "query"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -10309,7 +10309,7 @@ dependencies = [
"sql",
"sqlparser",
"store-api",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tokio",
"tokio-stream",
@@ -11651,7 +11651,7 @@ dependencies = [
[[package]]
name = "servers"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -11779,7 +11779,7 @@ dependencies = [
[[package]]
name = "session"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -12113,7 +12113,7 @@ dependencies = [
[[package]]
name = "sql"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-buffer",
@@ -12173,7 +12173,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -12450,7 +12450,7 @@ dependencies = [
[[package]]
name = "standalone"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"catalog",
@@ -12491,7 +12491,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "store-api"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"aquamarine",
@@ -12704,7 +12704,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"bytes",
@@ -12827,7 +12827,7 @@ dependencies = [
[[package]]
name = "table"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -13096,7 +13096,7 @@ checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
[[package]]
name = "tests-fuzz"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arbitrary",
"async-trait",
@@ -13140,7 +13140,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-flight",
@@ -13215,7 +13215,7 @@ dependencies = [
"sqlx",
"standalone",
"store-api",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tempfile",
"time",

View File

@@ -75,7 +75,7 @@ members = [
resolver = "2"
[workspace.package]
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
edition = "2024"
license = "Apache-2.0"
@@ -150,7 +150,7 @@ etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62d
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0423fa30203187c75e2937a668df1da699c8b96c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "173efe5ec62722089db7c531c0b0d470a072b915" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -552,9 +552,8 @@ impl StartCommand {
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
.set_handler(weak_grpc_handler)
.await;
// set the frontend invoker for flownode
let flow_streaming_engine = flownode.flow_engine().streaming_engine();

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
@@ -35,6 +36,14 @@ pub enum Error {
#[snafu(display("Memory semaphore unexpectedly closed"))]
MemorySemaphoreClosed,
#[snafu(display(
"Timeout waiting for memory quota: requested {requested_bytes} bytes, waited {waited:?}"
))]
MemoryAcquireTimeout {
requested_bytes: u64,
waited: Duration,
},
}
impl ErrorExt for Error {
@@ -44,6 +53,7 @@ impl ErrorExt for Error {
match self {
MemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
MemorySemaphoreClosed => StatusCode::Unexpected,
MemoryAcquireTimeout { .. } => StatusCode::RuntimeResourcesExhausted,
}
}

View File

@@ -0,0 +1,168 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
/// Memory permit granularity for different use cases.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PermitGranularity {
/// 1 KB per permit
///
/// Use for:
/// - HTTP/gRPC request limiting (small, high-concurrency operations)
/// - Small batch operations
/// - Scenarios requiring fine-grained fairness
Kilobyte,
/// 1 MB per permit (default)
///
/// Use for:
/// - Query execution memory management
/// - Compaction memory control
/// - Large, long-running operations
#[default]
Megabyte,
}
impl PermitGranularity {
/// Returns the number of bytes per permit.
#[inline]
pub const fn bytes(self) -> u64 {
match self {
Self::Kilobyte => 1024,
Self::Megabyte => 1024 * 1024,
}
}
/// Returns a human-readable string representation.
pub const fn as_str(self) -> &'static str {
match self {
Self::Kilobyte => "1KB",
Self::Megabyte => "1MB",
}
}
/// Converts bytes to permits based on this granularity.
///
/// Rounds up to ensure the requested bytes are fully covered.
/// Clamped to Semaphore::MAX_PERMITS.
#[inline]
pub fn bytes_to_permits(self, bytes: u64) -> u32 {
use tokio::sync::Semaphore;
let granularity_bytes = self.bytes();
bytes
.saturating_add(granularity_bytes - 1)
.saturating_div(granularity_bytes)
.min(Semaphore::MAX_PERMITS as u64)
.min(u32::MAX as u64) as u32
}
/// Converts permits to bytes based on this granularity.
#[inline]
pub fn permits_to_bytes(self, permits: u32) -> u64 {
(permits as u64).saturating_mul(self.bytes())
}
}
impl fmt::Display for PermitGranularity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bytes_to_permits_kilobyte() {
let granularity = PermitGranularity::Kilobyte;
// Exact multiples
assert_eq!(granularity.bytes_to_permits(1024), 1);
assert_eq!(granularity.bytes_to_permits(2048), 2);
assert_eq!(granularity.bytes_to_permits(10 * 1024), 10);
// Rounds up
assert_eq!(granularity.bytes_to_permits(1), 1);
assert_eq!(granularity.bytes_to_permits(1025), 2);
assert_eq!(granularity.bytes_to_permits(2047), 2);
}
#[test]
fn test_bytes_to_permits_megabyte() {
let granularity = PermitGranularity::Megabyte;
// Exact multiples
assert_eq!(granularity.bytes_to_permits(1024 * 1024), 1);
assert_eq!(granularity.bytes_to_permits(2 * 1024 * 1024), 2);
// Rounds up
assert_eq!(granularity.bytes_to_permits(1), 1);
assert_eq!(granularity.bytes_to_permits(1024), 1);
assert_eq!(granularity.bytes_to_permits(1024 * 1024 + 1), 2);
}
#[test]
fn test_bytes_to_permits_zero_bytes() {
assert_eq!(PermitGranularity::Kilobyte.bytes_to_permits(0), 0);
assert_eq!(PermitGranularity::Megabyte.bytes_to_permits(0), 0);
}
#[test]
fn test_bytes_to_permits_clamps_to_maximum() {
use tokio::sync::Semaphore;
let max_permits = (Semaphore::MAX_PERMITS as u64).min(u32::MAX as u64) as u32;
assert_eq!(
PermitGranularity::Kilobyte.bytes_to_permits(u64::MAX),
max_permits
);
assert_eq!(
PermitGranularity::Megabyte.bytes_to_permits(u64::MAX),
max_permits
);
}
#[test]
fn test_permits_to_bytes() {
assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(1), 1024);
assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(10), 10 * 1024);
assert_eq!(PermitGranularity::Megabyte.permits_to_bytes(1), 1024 * 1024);
assert_eq!(
PermitGranularity::Megabyte.permits_to_bytes(10),
10 * 1024 * 1024
);
}
#[test]
fn test_round_trip_conversion() {
// Kilobyte: bytes -> permits -> bytes (should round up)
let kb = PermitGranularity::Kilobyte;
let permits = kb.bytes_to_permits(1500);
let bytes = kb.permits_to_bytes(permits);
assert!(bytes >= 1500); // Must cover original request
assert_eq!(bytes, 2048); // 2KB
// Megabyte: bytes -> permits -> bytes (should round up)
let mb = PermitGranularity::Megabyte;
let permits = mb.bytes_to_permits(1500);
let bytes = mb.permits_to_bytes(permits);
assert!(bytes >= 1500);
assert_eq!(bytes, 1024 * 1024); // 1MB
}
}

View File

@@ -17,7 +17,7 @@ use std::{fmt, mem};
use common_telemetry::debug;
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use crate::manager::{MemoryMetrics, MemoryQuota, bytes_to_permits, permits_to_bytes};
use crate::manager::{MemoryMetrics, MemoryQuota};
/// Guard representing a slice of reserved memory.
pub struct MemoryGuard<M: MemoryMetrics> {
@@ -49,7 +49,9 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
pub fn granted_bytes(&self) -> u64 {
match &self.state {
GuardState::Unlimited => 0,
GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
GuardState::Limited { permit, quota } => {
quota.permits_to_bytes(permit.num_permits() as u32)
}
}
}
@@ -65,7 +67,7 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
return true;
}
let additional_permits = bytes_to_permits(bytes);
let additional_permits = quota.bytes_to_permits(bytes);
match quota
.semaphore
@@ -99,11 +101,12 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
return true;
}
let release_permits = bytes_to_permits(bytes);
let release_permits = quota.bytes_to_permits(bytes);
match permit.split(release_permits as usize) {
Some(released_permit) => {
let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
let released_bytes =
quota.permits_to_bytes(released_permit.num_permits() as u32);
drop(released_permit);
quota.update_in_use_metric();
debug!("Early released {} bytes from memory guard", released_bytes);
@@ -121,7 +124,7 @@ impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
if let GuardState::Limited { permit, quota } =
mem::replace(&mut self.state, GuardState::Unlimited)
{
let bytes = permits_to_bytes(permit.num_permits() as u32);
let bytes = quota.permits_to_bytes(permit.num_permits() as u32);
drop(permit);
quota.update_in_use_metric();
debug!("Released memory: {} bytes", bytes);

View File

@@ -19,6 +19,7 @@
//! share the same allocation logic while using their own metrics.
mod error;
mod granularity;
mod guard;
mod manager;
mod policy;
@@ -27,8 +28,9 @@ mod policy;
mod tests;
pub use error::{Error, Result};
pub use granularity::PermitGranularity;
pub use guard::MemoryGuard;
pub use manager::{MemoryManager, MemoryMetrics, PERMIT_GRANULARITY_BYTES};
pub use manager::{MemoryManager, MemoryMetrics};
pub use policy::{DEFAULT_MEMORY_WAIT_TIMEOUT, OnExhaustedPolicy};
/// No-op metrics implementation for testing.

View File

@@ -17,11 +17,12 @@ use std::sync::Arc;
use snafu::ensure;
use tokio::sync::{Semaphore, TryAcquireError};
use crate::error::{MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result};
use crate::error::{
MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
};
use crate::granularity::PermitGranularity;
use crate::guard::MemoryGuard;
/// Minimum bytes controlled by one semaphore permit.
pub const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
use crate::policy::OnExhaustedPolicy;
/// Trait for recording memory usage metrics.
pub trait MemoryMetrics: Clone + Send + Sync + 'static {
@@ -40,6 +41,7 @@ pub struct MemoryManager<M: MemoryMetrics> {
pub(crate) struct MemoryQuota<M: MemoryMetrics> {
pub(crate) semaphore: Arc<Semaphore>,
pub(crate) limit_permits: u32,
pub(crate) granularity: PermitGranularity,
pub(crate) metrics: M,
}
@@ -47,19 +49,25 @@ impl<M: MemoryMetrics> MemoryManager<M> {
/// Creates a new memory manager with the given limit in bytes.
/// `limit_bytes = 0` disables the limit.
pub fn new(limit_bytes: u64, metrics: M) -> Self {
Self::with_granularity(limit_bytes, PermitGranularity::default(), metrics)
}
/// Creates a new memory manager with specified granularity.
pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self {
if limit_bytes == 0 {
metrics.set_limit(0);
return Self { quota: None };
}
let limit_permits = bytes_to_permits(limit_bytes);
let limit_aligned_bytes = permits_to_bytes(limit_permits);
let limit_permits = granularity.bytes_to_permits(limit_bytes);
let limit_aligned_bytes = granularity.permits_to_bytes(limit_permits);
metrics.set_limit(limit_aligned_bytes as i64);
Self {
quota: Some(MemoryQuota {
semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
limit_permits,
granularity,
metrics,
}),
}
@@ -69,7 +77,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn limit_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.limit_permits))
.map(|quota| quota.permits_to_bytes(quota.limit_permits))
.unwrap_or(0)
}
@@ -77,7 +85,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn used_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.used_permits()))
.map(|quota| quota.permits_to_bytes(quota.used_permits()))
.unwrap_or(0)
}
@@ -85,7 +93,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn available_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.available_permits_clamped()))
.map(|quota| quota.permits_to_bytes(quota.available_permits_clamped()))
.unwrap_or(0)
}
@@ -98,13 +106,13 @@ impl<M: MemoryMetrics> MemoryManager<M> {
match &self.quota {
None => Ok(MemoryGuard::unlimited()),
Some(quota) => {
let permits = bytes_to_permits(bytes);
let permits = quota.bytes_to_permits(bytes);
ensure!(
permits <= quota.limit_permits,
MemoryLimitExceededSnafu {
requested_bytes: bytes,
limit_bytes: permits_to_bytes(quota.limit_permits),
limit_bytes: self.limit_bytes()
}
);
@@ -125,7 +133,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
match &self.quota {
None => Some(MemoryGuard::unlimited()),
Some(quota) => {
let permits = bytes_to_permits(bytes);
let permits = quota.bytes_to_permits(bytes);
match quota.semaphore.clone().try_acquire_many_owned(permits) {
Ok(permit) => {
@@ -140,9 +148,56 @@ impl<M: MemoryMetrics> MemoryManager<M> {
}
}
}
/// Acquires memory based on the given policy.
///
/// - For `OnExhaustedPolicy::Wait`: Waits up to the timeout duration for memory to become available
/// - For `OnExhaustedPolicy::Fail`: Returns immediately if memory is not available
///
/// # Errors
/// - `MemoryLimitExceeded`: Requested bytes exceed the total limit (both policies), or memory is currently exhausted (Fail policy only)
/// - `MemoryAcquireTimeout`: Timeout elapsed while waiting for memory (Wait policy only)
/// - `MemorySemaphoreClosed`: The internal semaphore is unexpectedly closed (rare, indicates system issue)
pub async fn acquire_with_policy(
&self,
bytes: u64,
policy: OnExhaustedPolicy,
) -> Result<MemoryGuard<M>> {
match policy {
OnExhaustedPolicy::Wait { timeout } => {
match tokio::time::timeout(timeout, self.acquire(bytes)).await {
Ok(Ok(guard)) => Ok(guard),
Ok(Err(e)) => Err(e),
Err(_elapsed) => {
// Timeout elapsed while waiting
MemoryAcquireTimeoutSnafu {
requested_bytes: bytes,
waited: timeout,
}
.fail()
}
}
}
OnExhaustedPolicy::Fail => self.try_acquire(bytes).ok_or_else(|| {
MemoryLimitExceededSnafu {
requested_bytes: bytes,
limit_bytes: self.limit_bytes(),
}
.build()
}),
}
}
}
impl<M: MemoryMetrics> MemoryQuota<M> {
pub(crate) fn bytes_to_permits(&self, bytes: u64) -> u32 {
self.granularity.bytes_to_permits(bytes)
}
pub(crate) fn permits_to_bytes(&self, permits: u32) -> u64 {
self.granularity.permits_to_bytes(permits)
}
pub(crate) fn used_permits(&self) -> u32 {
self.limit_permits
.saturating_sub(self.available_permits_clamped())
@@ -155,19 +210,7 @@ impl<M: MemoryMetrics> MemoryQuota<M> {
}
pub(crate) fn update_in_use_metric(&self) {
let bytes = permits_to_bytes(self.used_permits());
let bytes = self.permits_to_bytes(self.used_permits());
self.metrics.set_in_use(bytes as i64);
}
}
pub(crate) fn bytes_to_permits(bytes: u64) -> u32 {
bytes
.saturating_add(PERMIT_GRANULARITY_BYTES - 1)
.saturating_div(PERMIT_GRANULARITY_BYTES)
.min(Semaphore::MAX_PERMITS as u64)
.min(u32::MAX as u64) as u32
}
pub(crate) fn permits_to_bytes(permits: u32) -> u64 {
(permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
}

View File

@@ -14,7 +14,10 @@
use tokio::time::{Duration, sleep};
use crate::{MemoryManager, NoOpMetrics, PERMIT_GRANULARITY_BYTES};
use crate::{MemoryManager, NoOpMetrics, PermitGranularity};
// Helper constant for tests - use default Megabyte granularity
const PERMIT_GRANULARITY_BYTES: u64 = PermitGranularity::Megabyte.bytes();
#[test]
fn test_try_acquire_unlimited() {

View File

@@ -71,6 +71,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
}),
MetricType::GAUGE => timeseries.push(TimeSeries {
labels: convert_label(m.get_label(), mf_name, None),
@@ -79,6 +80,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
}),
MetricType::HISTOGRAM => {
let h = m.get_histogram();
@@ -97,6 +99,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
if upper_bound.is_sign_positive() && upper_bound.is_infinite() {
inf_seen = true;
@@ -114,6 +117,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
}
timeseries.push(TimeSeries {
@@ -127,6 +131,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
timeseries.push(TimeSeries {
labels: convert_label(
@@ -139,6 +144,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
}
MetricType::SUMMARY => {
@@ -155,6 +161,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
}
timeseries.push(TimeSeries {
@@ -168,6 +175,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
timeseries.push(TimeSeries {
labels: convert_label(
@@ -180,6 +188,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
}
MetricType::UNTYPED => {
@@ -274,7 +283,7 @@ mod test {
assert_eq!(
format!("{:?}", write_quest.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }]"#
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
);
let gauge_opts = Opts::new("test_gauge", "test help")
@@ -288,7 +297,7 @@ mod test {
let write_quest = convert_metric_to_write_request(mf, None, 0);
assert_eq!(
format!("{:?}", write_quest.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [] }]"#
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
);
}
@@ -305,20 +314,20 @@ mod test {
.iter()
.map(|x| format!("{:?}", x))
.collect();
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }"#;
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }"#;
assert_eq!(write_quest_str.join("\n"), ans);
}
@@ -355,10 +364,10 @@ TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" },
.iter()
.map(|x| format!("{:?}", x))
.collect();
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }"#;
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [], histograms: [] }"#;
assert_eq!(write_quest_str.join("\n"), ans);
}
@@ -385,11 +394,11 @@ TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }],
let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0);
assert_eq!(
format!("{:?}", write_quest1.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
);
assert_eq!(
format!("{:?}", write_quest2.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
);
}
}

View File

@@ -15,7 +15,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::sync::{Arc, Mutex, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
@@ -38,6 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use tokio::sync::SetOnce;
use crate::batching_mode::BatchingModeOptions;
use crate::error::{
@@ -75,7 +76,19 @@ impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send
}
}
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
#[derive(Debug, Clone)]
pub struct HandlerMutable {
handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
is_initialized: Arc<SetOnce<()>>,
}
impl HandlerMutable {
pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
*self.handler.lock().unwrap() = Some(handler);
// Ignore the error, as we allow the handler to be set multiple times.
let _ = self.is_initialized.set(());
}
}
/// A simple frontend client able to execute sql using grpc protocol
///
@@ -100,7 +113,11 @@ pub enum FrontendClient {
impl FrontendClient {
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None));
let is_initialized = Arc::new(SetOnce::new());
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(None)),
is_initialized,
};
(
Self::Standalone {
database_client: handler.clone(),
@@ -110,23 +127,13 @@ impl FrontendClient {
)
}
/// Check if the frontend client is initialized.
///
/// In distributed mode, it is always initialized.
/// In standalone mode, it checks if the database client is set.
pub fn is_initialized(&self) -> bool {
match self {
FrontendClient::Distributed { .. } => true,
FrontendClient::Standalone {
database_client, ..
} => {
let guard = database_client.lock();
if let Ok(guard) = guard {
guard.is_some()
} else {
false
}
}
/// Waits until the frontend client is initialized.
pub async fn wait_initialized(&self) {
if let FrontendClient::Standalone {
database_client, ..
} = self
{
database_client.is_initialized.wait().await;
}
}
@@ -158,8 +165,14 @@ impl FrontendClient {
grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
query: QueryOptions,
) -> Self {
let is_initialized = Arc::new(SetOnce::new_with(Some(())));
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(Some(grpc_handler))),
is_initialized: is_initialized.clone(),
};
Self::Standalone {
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
database_client: handler,
query,
}
}
@@ -341,6 +354,7 @@ impl FrontendClient {
{
let database_client = {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
@@ -418,6 +432,7 @@ impl FrontendClient {
{
let database_client = {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
@@ -480,3 +495,73 @@ impl std::fmt::Display for PeerDesc {
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_query::Output;
use tokio::time::timeout;
use super::*;
#[derive(Debug)]
struct NoopHandler;
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
async fn do_query(
&self,
_query: Request,
_ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
Ok(Output::new_with_affected_rows(0))
}
}
#[tokio::test]
async fn wait_initialized() {
let (client, handler_mut) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
assert!(
timeout(Duration::from_millis(50), client.wait_initialized())
.await
.is_err()
);
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
handler_mut.set_handler(Arc::downgrade(&handler)).await;
timeout(Duration::from_secs(1), client.wait_initialized())
.await
.expect("wait_initialized should complete after handler is set");
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.expect("wait_initialized should be a no-op once initialized");
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
let meta_client = Arc::new(MetaClient::default());
let client = FrontendClient::from_meta_client(
meta_client,
None,
QueryOptions::default(),
BatchingModeOptions::default(),
)
.unwrap();
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
}
}

View File

@@ -7,6 +7,9 @@ license.workspace = true
[lints]
workspace = true
[features]
vector_index = ["dep:usearch"]
[dependencies]
async-trait.workspace = true
asynchronous-codec = "0.7.0"
@@ -41,7 +44,7 @@ tantivy = { version = "0.24", features = ["zstd-compression"] }
tantivy-jieba = "0.16"
tokio.workspace = true
tokio-util.workspace = true
usearch = { version = "2.21", default-features = false, features = ["fp16lib"] }
usearch = { version = "2.21", default-features = false, features = ["fp16lib"], optional = true }
uuid.workspace = true
[dev-dependencies]

View File

@@ -22,6 +22,7 @@ pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;
pub mod target;
#[cfg(feature = "vector_index")]
pub mod vector;
pub type Bytes = Vec<u8>;

View File

@@ -25,7 +25,7 @@ use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu, MemoryAcquireFailedSnafu};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
use crate::region::RegionRoleState;
@@ -95,80 +95,16 @@ impl CompactionTaskImpl {
async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
let region_id = self.compaction_region.region_id;
let requested_bytes = self.estimated_memory_bytes;
let limit_bytes = self.memory_manager.limit_bytes();
let policy = self.memory_policy;
if limit_bytes > 0 && requested_bytes > limit_bytes {
warn!(
"Compaction for region {} requires {} bytes but limit is {} bytes; cannot satisfy request",
region_id, requested_bytes, limit_bytes
);
return Err(CompactionMemoryExhaustedSnafu {
let _timer = COMPACTION_MEMORY_WAIT.start_timer();
self.memory_manager
.acquire_with_policy(requested_bytes, policy)
.await
.context(CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: "exceed_limit".to_string(),
}
.build());
}
match self.memory_policy {
OnExhaustedPolicy::Wait {
timeout: wait_timeout,
} => {
let timer = COMPACTION_MEMORY_WAIT.start_timer();
match tokio::time::timeout(
wait_timeout,
self.memory_manager.acquire(requested_bytes),
)
.await
{
Ok(Ok(guard)) => {
timer.observe_duration();
Ok(guard)
}
Ok(Err(e)) => {
timer.observe_duration();
Err(e).with_context(|_| MemoryAcquireFailedSnafu {
region_id,
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
})
}
Err(_) => {
timer.observe_duration();
warn!(
"Compaction for region {} waited {:?} for {} bytes but timed out",
region_id, wait_timeout, requested_bytes
);
CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
}
.fail()
}
}
}
OnExhaustedPolicy::Fail => {
// Try to acquire, fail immediately if not available
self.memory_manager
.try_acquire(requested_bytes)
.ok_or_else(|| {
warn!(
"Compaction memory exhausted for region {} (policy=fail, need {} bytes, limit {} bytes)",
region_id, requested_bytes, limit_bytes
);
CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: "fail".to_string(),
}
.build()
})
}
}
policy: format!("{policy:?}"),
})
}
/// Remove expired ssts files, update manifest immediately

View File

@@ -1042,20 +1042,8 @@ pub enum Error {
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
#[snafu(display(
"Compaction memory limit exceeded for region {region_id}: required {required_bytes} bytes, limit {limit_bytes} bytes (policy: {policy})",
))]
#[snafu(display("Compaction memory exhausted for region {region_id} (policy: {policy})",))]
CompactionMemoryExhausted {
region_id: RegionId,
required_bytes: u64,
limit_bytes: u64,
policy: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to acquire memory for region {region_id} (policy: {policy})"))]
MemoryAcquireFailed {
region_id: RegionId,
policy: String,
#[snafu(source)]
@@ -1359,9 +1347,7 @@ impl ErrorExt for Error {
ManualCompactionOverride {} => StatusCode::Cancelled,
CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted,
MemoryAcquireFailed { source, .. } => source.status_code(),
CompactionMemoryExhausted { source, .. } => source.status_code(),
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,

View File

@@ -16,6 +16,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_telemetry::tracing::warn;
use opendal::services::{Azblob, Gcs, Oss, S3};
use serde::{Deserialize, Serialize};
@@ -123,11 +124,18 @@ impl From<&S3Connection> for S3 {
fn from(connection: &S3Connection) -> Self {
let root = util::normalize_dir(&connection.root);
let mut builder = S3::default()
.root(&root)
.bucket(&connection.bucket)
.access_key_id(connection.access_key_id.expose_secret())
.secret_access_key(connection.secret_access_key.expose_secret());
let mut builder = S3::default().root(&root).bucket(&connection.bucket);
if !connection.access_key_id.expose_secret().is_empty()
&& !connection.secret_access_key.expose_secret().is_empty()
{
builder = builder
.access_key_id(connection.access_key_id.expose_secret())
.secret_access_key(connection.secret_access_key.expose_secret());
} else {
warn!("No access key id or secret access key provided, using anonymous access");
builder = builder.allow_anonymous().disable_ec2_metadata();
}
if let Some(endpoint) = &connection.endpoint {
builder = builder.endpoint(endpoint);

View File

@@ -26,7 +26,7 @@ use arrow::datatypes::{Float64Type, TimestampMillisecondType};
use common_grpc::precision::Precision;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_telemetry::tracing;
use common_telemetry::{tracing, warn};
use datafusion::dataframe::DataFrame;
use datafusion::prelude::{Expr, col, lit, regexp_match};
use datafusion_common::ScalarValue;
@@ -415,6 +415,10 @@ pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertR
table_data.add_row(one_row);
}
}
if !series.histograms.is_empty() {
warn!("Native histograms are not supported yet, data ignored");
}
}
Ok(multi_table_data.into_row_insert_requests())

View File

@@ -259,9 +259,8 @@ impl GreptimeDbStandaloneBuilder {
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
.set_handler(weak_grpc_handler)
.await;
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
let invoker = flow::FrontendInvoker::build_from(