Compare commits

...

12 Commits

Author SHA1 Message Date
Ning Sun
2f4a15ec40 ci: ensure commits from main branch for whitelisted git dependencies (#7434)
* chore: update proto to include native histogram

* ci: add a CI check to ensure whitelisted dependencies are using their main branch

* chore: add changes to Cargo.toml to trigger CI

* chore: update proto

* test: update test to include histogram
2025-12-18 14:10:33 +00:00
Lanqing Yang
658332fe68 chore(mito): nit remove extra hashset in gc workers (#7399)
chore(mito): remove extra hashset in gc workers

Signed-off-by: lyang24 <lanqingy93@gmail.com>
2025-12-18 13:09:32 +00:00
shuiyisong
c088d361a4 chore: expose disable_ec2_metadata option (#7439)
chore: add option for disable ec2 metadata

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-12-18 11:55:08 +00:00
shuiyisong
a85864067e chore: remove canonicalize (#7430)
* chore: remove canonicalize

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: add match file name option

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update field name

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: modify tls option

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update config file

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update config md

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: update option to `enable_filename_match`

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: address CR issues

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove option

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: remove unused test

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-12-18 09:39:10 +00:00
LFC
0df69c95aa chore: use official etcd-client (#7432)
Signed-off-by: luofucong <luofc@foxmail.com>
2025-12-18 06:25:48 +00:00
McKnight22
72eede8b38 refactor(cli): unify storage configuration for export command (#7280)
* refactor(cli): unify storage configuration for export command

- Utilize ObjectStoreConfig to unify storage configuration for export command
- Support export command for Fs, S3, OSS, GCS and Azblob
- Fix the Display implementation for SecretString always returned the string
  "SecretString([REDACTED])" even when the internal secret was empty.

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* refactor(cli): unify storage configuration for export command

- Change the encapsulation permissions of each configuration
  options for every storage backend to public access.

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>

* refactor(cli): unify storage configuration for export command

- Update the implementation of ObjectStoreConfig::build_xxx() using macro solutions

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>

* refactor(cli): unify storage configuration for export command

- Introduce config validation for each storage type

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* refactor(cli): unify storage configuration for export command

- Enable trait-based polymorphism for storage type handling
  (from inherent impl to trait impl)
- Extract helper functions to reduce code duplication

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* refactor(cli): unify storage configuration for export command

- Improve SecretString handling and validation
  (Distinguishing between "not provided" and "empty string")
- Add validation when using filesystem storage

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* refactor(cli): unify storage configuration for export command

- Refactor storage field validation with macro

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* refactor(cli): unify storage configuration for export command

- support GCS Application Default Credentials (like GKE, Cloud Run, or local development with ) in export
  (Enabling ADC without validating  or  to be present)
  (Making  optional in GCS validation (defaults to https://storage.googleapis.com))

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* refactor(cli): unify storage configuration for export command

This commit refactors the validation logic for object store configurations in the CLI to leverage clap features and reduce boilerplate.

Key changes:
- Update wrap_with_clap_prefix macro to use clap's requires attribute.
  This ensures that storage-specific options (e.g., --s3-bucket) are only accepted when the corresponding backend is enabled (e.g., --s3).
- Simplify FieldValidator trait by removing the is_provided method, as dependency checks are now handled by clap.
- Introduce validate_backend! macro to standardize the validation of required fields for enabled backends.
- Refactor ExportCommand to remove explicit validation calls (validate_s3, etc.) and rely on the validation within backend constructors.
- Add integration tests for ExportCommand to verify build success with S3, OSS, GCS, and Azblob configurations.

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>

* refactor(cli): unify storage configuration for export command

- Use macros to simplify storage export implementation

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>

* refactor(cli): unify storage configuration for export command

- Rollback StorageExport trait implementation to not using macro for better code clarity and maintainability
- Introduce format_uri helper function to unify URI formatting logic
- Fix OSS URI path bug inherited from legacy code

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>

* refactor(cli): unify storage configuration for export command

- Remove unnecessary async_trait

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>

---------

Signed-off-by: McKnight22 <tao.wang.22@outlook.com>
Co-authored-by: WenyXu <wenymedia@gmail.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
2025-12-18 03:16:53 +00:00
jeremyhi
95eccd6cde feat: introduce granularity for memory manager (#7416)
* feat: introduce granularity for memory manager

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: add unit test

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: remove granularity getter for mamanger

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* Update src/common/memory-manager/src/manager.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* feat: acquire_with_policy for manager

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-12-17 11:08:51 +00:00
fys
0bc5a305be chore: add wait_initialized method for frontend client (#7414)
* chore: add wait_initialized method for frontend client

* fix: some

* fix: cargo fmt

* add comment

* add unit test

* rename

* fix: cargo check

* fix: cr by copilot
2025-12-17 08:13:36 +00:00
discord9
1afcddd5a9 chore: feature gate vector_index (#7428)
Signed-off-by: discord9 <discord9@163.com>
2025-12-17 07:14:25 +00:00
shuiyisong
62808b887b fix: using anonymous s3 access when ak and sk is not provided (#7425)
* chore: allow s3 anon

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: disable ec2 metadata

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-12-17 06:34:29 +00:00
discord9
04ddd40e00 chore: bump version to beta.3 (#7423)
chore: bump to beta.3

Signed-off-by: discord9 <discord9@163.com>
2025-12-17 04:18:23 +00:00
liyang
b4f028be5f chore: change etcd endpoints to array in the test scripts (#7419)
chore: change etcd endpoint

Signed-off-by: liyang <daviderli614@gmail.com>
2025-12-17 03:14:35 +00:00
37 changed files with 2219 additions and 768 deletions

View File

@@ -51,7 +51,7 @@ runs:
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \

View File

@@ -81,7 +81,7 @@ function deploy_greptimedb_cluster() {
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set "meta.backendStorage.etcd.endpoints[0]=etcd.$install_namespace.svc.cluster.local:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
-n "$install_namespace"
@@ -119,7 +119,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set "meta.backendStorage.etcd.endpoints[0]=etcd.$install_namespace.svc.cluster.local:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
--set objectStorage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set objectStorage.s3.region="$AWS_REGION" \

154
.github/workflows/check-git-deps.yml vendored Normal file
View File

@@ -0,0 +1,154 @@
name: Check Git Dependencies on Main Branch
on:
pull_request:
branches: [main]
paths:
- 'Cargo.toml'
push:
branches: [main]
paths:
- 'Cargo.toml'
jobs:
check-git-deps:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v6
- name: Check git dependencies
env:
WHITELIST_DEPS: "greptime-proto,meter-core,meter-macros"
run: |
#!/bin/bash
set -e
echo "Checking whitelisted git dependencies..."
# Function to check if a commit is on main branch
check_commit_on_main() {
local repo_url="$1"
local commit="$2"
local repo_name=$(basename "$repo_url" .git)
echo "Checking $repo_name"
echo "Repo: $repo_url"
echo "Commit: $commit"
# Create a temporary directory for cloning
local temp_dir=$(mktemp -d)
# Clone the repository
if git clone "$repo_url" "$temp_dir" 2>/dev/null; then
cd "$temp_dir"
# Try to determine the main branch name
local main_branch="main"
if ! git rev-parse --verify origin/main >/dev/null 2>&1; then
if git rev-parse --verify origin/master >/dev/null 2>&1; then
main_branch="master"
else
# Try to get the default branch
main_branch=$(git symbolic-ref refs/remotes/origin/HEAD | sed 's@^refs/remotes/origin/@@')
fi
fi
echo "Main branch: $main_branch"
# Check if commit exists
if git cat-file -e "$commit" 2>/dev/null; then
# Check if commit is on main branch
if git merge-base --is-ancestor "$commit" "origin/$main_branch" 2>/dev/null; then
echo "PASS: Commit $commit is on $main_branch branch"
cd - >/dev/null
rm -rf "$temp_dir"
return 0
else
echo "FAIL: Commit $commit is NOT on $main_branch branch"
# Try to find which branch contains this commit
local branch_name=$(git branch -r --contains "$commit" 2>/dev/null | head -1 | sed 's/^[[:space:]]*origin\///' | sed 's/[[:space:]]*$//')
if [[ -n "$branch_name" ]]; then
echo "Found on branch: $branch_name"
fi
cd - >/dev/null
rm -rf "$temp_dir"
return 1
fi
else
echo "FAIL: Commit $commit not found in repository"
cd - >/dev/null
rm -rf "$temp_dir"
return 1
fi
else
echo "FAIL: Failed to clone $repo_url"
rm -rf "$temp_dir"
return 1
fi
}
# Extract whitelisted git dependencies from Cargo.toml
echo "Extracting git dependencies from Cargo.toml..."
# Create temporary array to store dependencies
declare -a deps=()
# Build awk pattern from whitelist
IFS=',' read -ra WHITELIST <<< "$WHITELIST_DEPS"
awk_pattern=""
for dep in "${WHITELIST[@]}"; do
if [[ -n "$awk_pattern" ]]; then
awk_pattern="$awk_pattern|"
fi
awk_pattern="$awk_pattern$dep"
done
# Extract whitelisted dependencies
while IFS= read -r line; do
if [[ -n "$line" ]]; then
deps+=("$line")
fi
done < <(awk -v pattern="$awk_pattern" '
$0 ~ pattern ".*git = \"https:/" {
match($0, /git = "([^"]+)"/, arr)
git_url = arr[1]
if (match($0, /rev = "([^"]+)"/, rev_arr)) {
rev = rev_arr[1]
print git_url " " rev
} else {
# Check next line for rev
getline
if (match($0, /rev = "([^"]+)"/, rev_arr)) {
rev = rev_arr[1]
print git_url " " rev
}
}
}
' Cargo.toml)
echo "Found ${#deps[@]} dependencies to check:"
for dep in "${deps[@]}"; do
echo " $dep"
done
failed=0
for dep in "${deps[@]}"; do
read -r repo_url commit <<< "$dep"
if ! check_commit_on_main "$repo_url" "$commit"; then
failed=1
fi
done
echo "Check completed."
if [[ $failed -eq 1 ]]; then
echo "ERROR: Some git dependencies are not on their main branches!"
echo "Please update the commits to point to main branch commits."
exit 1
else
echo "SUCCESS: All git dependencies are on their main branches!"
fi

160
Cargo.lock generated
View File

@@ -212,7 +212,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow-schema",
"common-base",
@@ -733,7 +733,7 @@ dependencies = [
[[package]]
name = "auth"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -1383,7 +1383,7 @@ dependencies = [
[[package]]
name = "cache"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"catalog",
"common-error",
@@ -1418,7 +1418,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow",
@@ -1763,7 +1763,7 @@ checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675"
[[package]]
name = "cli"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-stream",
"async-trait",
@@ -1786,6 +1786,7 @@ dependencies = [
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-test-util",
"common-time",
"common-version",
"common-wal",
@@ -1816,7 +1817,7 @@ dependencies = [
[[package]]
name = "client"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arc-swap",
@@ -1849,7 +1850,7 @@ dependencies = [
"snafu 0.8.6",
"store-api",
"substrait 0.37.3",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"tokio",
"tokio-stream",
"tonic 0.13.1",
@@ -1889,7 +1890,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"auth",
@@ -2023,7 +2024,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anymap2",
"async-trait",
@@ -2047,14 +2048,14 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"const_format",
]
[[package]]
name = "common-config"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-error",
@@ -2079,7 +2080,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow",
"arrow-schema",
@@ -2114,7 +2115,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2127,7 +2128,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-macro",
"http 1.3.1",
@@ -2138,7 +2139,7 @@ dependencies = [
[[package]]
name = "common-event-recorder"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -2160,7 +2161,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -2182,7 +2183,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -2242,7 +2243,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"common-runtime",
@@ -2259,7 +2260,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-flight",
@@ -2294,7 +2295,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"common-base",
@@ -2314,7 +2315,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"greptime-proto",
"once_cell",
@@ -2325,7 +2326,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anyhow",
"common-error",
@@ -2341,7 +2342,7 @@ dependencies = [
[[package]]
name = "common-memory-manager"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-error",
"common-macro",
@@ -2354,7 +2355,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anymap2",
"api",
@@ -2426,7 +2427,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2435,11 +2436,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
[[package]]
name = "common-pprof"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-error",
"common-macro",
@@ -2451,7 +2452,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-stream",
@@ -2480,7 +2481,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"common-procedure",
@@ -2490,7 +2491,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -2516,7 +2517,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arc-swap",
"common-base",
@@ -2540,7 +2541,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -2569,7 +2570,7 @@ dependencies = [
[[package]]
name = "common-session"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"serde",
"strum 0.27.1",
@@ -2577,7 +2578,7 @@ dependencies = [
[[package]]
name = "common-sql"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-decimal",
@@ -2595,7 +2596,7 @@ dependencies = [
[[package]]
name = "common-stat"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-runtime",
@@ -2610,7 +2611,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"backtrace",
"common-base",
@@ -2639,7 +2640,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"client",
"common-grpc",
@@ -2652,7 +2653,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow",
"chrono",
@@ -2670,7 +2671,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"build-data",
"cargo-manifest",
@@ -2681,7 +2682,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-error",
@@ -2704,7 +2705,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-telemetry",
"serde",
@@ -4012,7 +4013,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-flight",
@@ -4076,7 +4077,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow",
"arrow-array",
@@ -4633,8 +4634,9 @@ dependencies = [
[[package]]
name = "etcd-client"
version = "0.15.0"
source = "git+https://github.com/GreptimeTeam/etcd-client?rev=f62df834f0cffda355eba96691fe1a9a332b75a7#f62df834f0cffda355eba96691fe1a9a332b75a7"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88365f1a5671eb2f7fc240adb216786bc6494b38ce15f1d26ad6eaa303d5e822"
dependencies = [
"http 1.3.1",
"prost 0.13.5",
@@ -4750,7 +4752,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-engine"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -4882,7 +4884,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow",
@@ -4951,7 +4953,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tokio",
"tonic 0.13.1",
@@ -5012,7 +5014,7 @@ checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
[[package]]
name = "frontend"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arc-swap",
@@ -5459,7 +5461,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=0423fa30203187c75e2937a668df1da699c8b96c#0423fa30203187c75e2937a668df1da699c8b96c"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=173efe5ec62722089db7c531c0b0d470a072b915#173efe5ec62722089db7c531c0b0d470a072b915"
dependencies = [
"prost 0.13.5",
"prost-types 0.13.5",
@@ -6227,7 +6229,7 @@ dependencies = [
[[package]]
name = "index"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -7168,7 +7170,7 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "log-query"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"chrono",
"common-error",
@@ -7180,7 +7182,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-stream",
"async-trait",
@@ -7481,7 +7483,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -7509,7 +7511,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -7609,7 +7611,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"aquamarine",
@@ -7706,7 +7708,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"bytes",
@@ -7731,7 +7733,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"aquamarine",
@@ -8471,7 +8473,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anyhow",
"bytes",
@@ -8756,7 +8758,7 @@ dependencies = [
[[package]]
name = "operator"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -8816,7 +8818,7 @@ dependencies = [
"sql",
"sqlparser",
"store-api",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tokio",
"tokio-util",
@@ -9102,7 +9104,7 @@ dependencies = [
[[package]]
name = "partition"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -9459,7 +9461,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -9615,7 +9617,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"auth",
"catalog",
@@ -9917,7 +9919,7 @@ dependencies = [
[[package]]
name = "promql"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"async-trait",
@@ -10200,7 +10202,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-compression 0.4.19",
"async-trait",
@@ -10242,7 +10244,7 @@ dependencies = [
[[package]]
name = "query"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -10309,7 +10311,7 @@ dependencies = [
"sql",
"sqlparser",
"store-api",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tokio",
"tokio-stream",
@@ -11651,7 +11653,7 @@ dependencies = [
[[package]]
name = "servers"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -11779,7 +11781,7 @@ dependencies = [
[[package]]
name = "session"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -12113,7 +12115,7 @@ dependencies = [
[[package]]
name = "sql"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-buffer",
@@ -12173,7 +12175,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -12450,7 +12452,7 @@ dependencies = [
[[package]]
name = "standalone"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"catalog",
@@ -12491,7 +12493,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "store-api"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"aquamarine",
@@ -12704,7 +12706,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"bytes",
@@ -12827,7 +12829,7 @@ dependencies = [
[[package]]
name = "table"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -13096,7 +13098,7 @@ checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
[[package]]
name = "tests-fuzz"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arbitrary",
"async-trait",
@@ -13140,7 +13142,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-flight",
@@ -13215,7 +13217,7 @@ dependencies = [
"sqlx",
"standalone",
"store-api",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tempfile",
"time",

View File

@@ -75,7 +75,7 @@ members = [
resolver = "2"
[workspace.package]
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
edition = "2024"
license = "Apache-2.0"
@@ -143,14 +143,14 @@ derive_builder = "0.20"
derive_more = { version = "2.1", features = ["full"] }
dotenv = "0.15"
either = "1.15"
etcd-client = { git = "https://github.com/GreptimeTeam/etcd-client", rev = "f62df834f0cffda355eba96691fe1a9a332b75a7", features = [
etcd-client = { version = "0.16.1", features = [
"tls",
"tls-roots",
] }
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "0423fa30203187c75e2937a668df1da699c8b96c" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "173efe5ec62722089db7c531c0b0d470a072b915" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -225,6 +225,7 @@ overwrite_entry_start_id = false
# endpoint = "https://s3.amazonaws.com"
# region = "us-west-2"
# enable_virtual_host_style = false
# disable_ec2_metadata = false
# Example of using Oss as the storage.
# [storage]

View File

@@ -131,7 +131,6 @@ key_path = ""
## For now, gRPC tls config does not support auto reload.
watch = false
## MySQL server options.
[mysql]
## Whether to enable.

View File

@@ -332,6 +332,7 @@ max_running_procedures = 128
# endpoint = "https://s3.amazonaws.com"
# region = "us-west-2"
# enable_virtual_host_style = false
# disable_ec2_metadata = false
# Example of using Oss as the storage.
# [storage]

View File

@@ -67,6 +67,7 @@ tracing-appender.workspace = true
[dev-dependencies]
common-meta = { workspace = true, features = ["testing"] }
common-test-util.workspace = true
common-version.workspace = true
serde.workspace = true
tempfile.workspace = true

View File

@@ -15,5 +15,8 @@
mod object_store;
mod store;
pub use object_store::{ObjectStoreConfig, new_fs_object_store};
pub use object_store::{
ObjectStoreConfig, PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection,
PrefixedS3Connection, new_fs_object_store,
};
pub use store::StoreConfig;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_base::secrets::SecretString;
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use object_store::services::{Azblob, Fs, Gcs, Oss, S3};
use object_store::util::{with_instrument_layers, with_retry_layers};
@@ -22,9 +22,69 @@ use snafu::ResultExt;
use crate::error::{self};
/// Trait to convert CLI field types to target struct field types.
/// This enables `Option<SecretString>` (CLI) -> `SecretString` (target) conversions,
/// allowing us to distinguish "not provided" from "provided but empty".
trait IntoField<T> {
fn into_field(self) -> T;
}
/// Identity conversion for types that are the same.
impl<T> IntoField<T> for T {
fn into_field(self) -> T {
self
}
}
/// Convert `Option<SecretString>` to `SecretString`, using default for None.
impl IntoField<SecretString> for Option<SecretString> {
fn into_field(self) -> SecretString {
self.unwrap_or_default()
}
}
/// Trait for checking if a field is effectively empty.
///
/// **`is_empty()`**: Checks if the field has no meaningful value
/// - Used when backend is enabled to validate required fields
/// - `None`, `Some("")`, `false`, or `""` are considered empty
trait FieldValidator {
/// Check if the field is empty (has no meaningful value).
fn is_empty(&self) -> bool;
}
/// String fields: empty if the string is empty
impl FieldValidator for String {
fn is_empty(&self) -> bool {
self.is_empty()
}
}
/// Bool fields: false is considered "empty", true is "provided"
impl FieldValidator for bool {
fn is_empty(&self) -> bool {
!self
}
}
/// Option<String> fields: None or empty content is empty
impl FieldValidator for Option<String> {
fn is_empty(&self) -> bool {
self.as_ref().is_none_or(|s| s.is_empty())
}
}
/// Option<SecretString> fields: None or empty secret is empty
/// For secrets, Some("") is treated as "not provided" for both checks
impl FieldValidator for Option<SecretString> {
fn is_empty(&self) -> bool {
self.as_ref().is_none_or(|s| s.expose_secret().is_empty())
}
}
macro_rules! wrap_with_clap_prefix {
(
$new_name:ident, $prefix:literal, $base:ty, {
$new_name:ident, $prefix:literal, $enable_flag:literal, $base:ty, {
$( $( #[doc = $doc:expr] )? $( #[alias = $alias:literal] )? $field:ident : $type:ty $( = $default:expr )? ),* $(,)?
}
) => {
@@ -34,15 +94,16 @@ macro_rules! wrap_with_clap_prefix {
$(
$( #[doc = $doc] )?
$( #[clap(alias = $alias)] )?
#[clap(long $(, default_value_t = $default )? )]
[<$prefix $field>]: $type,
#[clap(long, requires = $enable_flag $(, default_value_t = $default )? )]
pub [<$prefix $field>]: $type,
)*
}
impl From<$new_name> for $base {
fn from(w: $new_name) -> Self {
Self {
$( $field: w.[<$prefix $field>] ),*
// Use into_field() to handle Option<SecretString> -> SecretString conversion
$( $field: w.[<$prefix $field>].into_field() ),*
}
}
}
@@ -50,9 +111,90 @@ macro_rules! wrap_with_clap_prefix {
};
}
/// Macro for declarative backend validation.
///
/// # Validation Rules
///
/// For each storage backend (S3, OSS, GCS, Azblob), this function validates:
/// **When backend is enabled** (e.g., `--s3`): All required fields must be non-empty
///
/// Note: When backend is disabled, clap's `requires` attribute ensures no configuration
/// fields can be provided at parse time.
///
/// # Syntax
///
/// ```ignore
/// validate_backend!(
/// enable: self.enable_s3,
/// name: "S3",
/// required: [(field1, "name1"), (field2, "name2"), ...],
/// custom_validator: |missing| { ... } // optional
/// )
/// ```
///
/// # Arguments
///
/// - `enable`: Boolean expression indicating if backend is enabled
/// - `name`: Human-readable backend name for error messages
/// - `required`: Array of (field_ref, field_name) tuples for required fields
/// - `custom_validator`: Optional closure for complex validation logic
///
/// # Example
///
/// ```ignore
/// validate_backend!(
/// enable: self.enable_s3,
/// name: "S3",
/// required: [
/// (&self.s3.s3_bucket, "bucket"),
/// (&self.s3.s3_access_key_id, "access key ID"),
/// ]
/// )
/// ```
macro_rules! validate_backend {
(
enable: $enable:expr,
name: $backend_name:expr,
required: [ $( ($field:expr, $field_name:expr) ),* $(,)? ]
$(, custom_validator: $custom_validator:expr)?
) => {{
if $enable {
// Check required fields when backend is enabled
let mut missing = Vec::new();
$(
if FieldValidator::is_empty($field) {
missing.push($field_name);
}
)*
// Run custom validation if provided
$(
$custom_validator(&mut missing);
)?
if !missing.is_empty() {
return Err(BoxedError::new(
error::MissingConfigSnafu {
msg: format!(
"{} {} must be set when --{} is enabled.",
$backend_name,
missing.join(", "),
$backend_name.to_lowercase()
),
}
.build(),
));
}
}
Ok(())
}};
}
wrap_with_clap_prefix! {
PrefixedAzblobConnection,
"azblob-",
"enable_azblob",
AzblobConnection,
{
#[doc = "The container of the object store."]
@@ -60,9 +202,9 @@ wrap_with_clap_prefix! {
#[doc = "The root of the object store."]
root: String = Default::default(),
#[doc = "The account name of the object store."]
account_name: SecretString = Default::default(),
account_name: Option<SecretString>,
#[doc = "The account key of the object store."]
account_key: SecretString = Default::default(),
account_key: Option<SecretString>,
#[doc = "The endpoint of the object store."]
endpoint: String = Default::default(),
#[doc = "The SAS token of the object store."]
@@ -70,9 +212,33 @@ wrap_with_clap_prefix! {
}
}
impl PrefixedAzblobConnection {
pub fn validate(&self) -> Result<(), BoxedError> {
validate_backend!(
enable: true,
name: "AzBlob",
required: [
(&self.azblob_container, "container"),
(&self.azblob_root, "root"),
(&self.azblob_account_name, "account name"),
(&self.azblob_endpoint, "endpoint"),
],
custom_validator: |missing: &mut Vec<&str>| {
// account_key is only required if sas_token is not provided
if self.azblob_sas_token.is_none()
&& self.azblob_account_key.is_empty()
{
missing.push("account key (when sas_token is not provided)");
}
}
)
}
}
wrap_with_clap_prefix! {
PrefixedS3Connection,
"s3-",
"enable_s3",
S3Connection,
{
#[doc = "The bucket of the object store."]
@@ -80,21 +246,39 @@ wrap_with_clap_prefix! {
#[doc = "The root of the object store."]
root: String = Default::default(),
#[doc = "The access key ID of the object store."]
access_key_id: SecretString = Default::default(),
access_key_id: Option<SecretString>,
#[doc = "The secret access key of the object store."]
secret_access_key: SecretString = Default::default(),
secret_access_key: Option<SecretString>,
#[doc = "The endpoint of the object store."]
endpoint: Option<String>,
#[doc = "The region of the object store."]
region: Option<String>,
#[doc = "Enable virtual host style for the object store."]
enable_virtual_host_style: bool = Default::default(),
#[doc = "Disable EC2 metadata service for the object store."]
disable_ec2_metadata: bool = Default::default(),
}
}
impl PrefixedS3Connection {
pub fn validate(&self) -> Result<(), BoxedError> {
validate_backend!(
enable: true,
name: "S3",
required: [
(&self.s3_bucket, "bucket"),
(&self.s3_access_key_id, "access key ID"),
(&self.s3_secret_access_key, "secret access key"),
(&self.s3_region, "region"),
]
)
}
}
wrap_with_clap_prefix! {
PrefixedOssConnection,
"oss-",
"enable_oss",
OssConnection,
{
#[doc = "The bucket of the object store."]
@@ -102,17 +286,33 @@ wrap_with_clap_prefix! {
#[doc = "The root of the object store."]
root: String = Default::default(),
#[doc = "The access key ID of the object store."]
access_key_id: SecretString = Default::default(),
access_key_id: Option<SecretString>,
#[doc = "The access key secret of the object store."]
access_key_secret: SecretString = Default::default(),
access_key_secret: Option<SecretString>,
#[doc = "The endpoint of the object store."]
endpoint: String = Default::default(),
}
}
impl PrefixedOssConnection {
pub fn validate(&self) -> Result<(), BoxedError> {
validate_backend!(
enable: true,
name: "OSS",
required: [
(&self.oss_bucket, "bucket"),
(&self.oss_access_key_id, "access key ID"),
(&self.oss_access_key_secret, "access key secret"),
(&self.oss_endpoint, "endpoint"),
]
)
}
}
wrap_with_clap_prefix! {
PrefixedGcsConnection,
"gcs-",
"enable_gcs",
GcsConnection,
{
#[doc = "The root of the object store."]
@@ -122,40 +322,72 @@ wrap_with_clap_prefix! {
#[doc = "The scope of the object store."]
scope: String = Default::default(),
#[doc = "The credential path of the object store."]
credential_path: SecretString = Default::default(),
credential_path: Option<SecretString>,
#[doc = "The credential of the object store."]
credential: SecretString = Default::default(),
credential: Option<SecretString>,
#[doc = "The endpoint of the object store."]
endpoint: String = Default::default(),
}
}
/// common config for object store.
impl PrefixedGcsConnection {
pub fn validate(&self) -> Result<(), BoxedError> {
validate_backend!(
enable: true,
name: "GCS",
required: [
(&self.gcs_bucket, "bucket"),
(&self.gcs_root, "root"),
(&self.gcs_scope, "scope"),
]
// No custom_validator needed: GCS supports Application Default Credentials (ADC)
// where neither credential_path nor credential is required.
// Endpoint is also optional (defaults to https://storage.googleapis.com).
)
}
}
/// Common config for object store.
///
/// # Dependency Enforcement
///
/// Each backend's configuration fields (e.g., `--s3-bucket`) requires its corresponding
/// enable flag (e.g., `--s3`) to be present. This is enforced by `clap` at parse time
/// using the `requires` attribute.
///
/// For example, attempting to use `--s3-bucket my-bucket` without `--s3` will result in:
/// ```text
/// error: The argument '--s3-bucket <BUCKET>' requires '--s3'
/// ```
///
/// This ensures that users cannot accidentally provide backend-specific configuration
/// without explicitly enabling that backend.
#[derive(clap::Parser, Debug, Clone, PartialEq, Default)]
#[clap(group(clap::ArgGroup::new("storage_backend").required(false).multiple(false)))]
pub struct ObjectStoreConfig {
/// Whether to use S3 object store.
#[clap(long, alias = "s3")]
#[clap(long = "s3", group = "storage_backend")]
pub enable_s3: bool,
#[clap(flatten)]
pub s3: PrefixedS3Connection,
/// Whether to use OSS.
#[clap(long, alias = "oss")]
#[clap(long = "oss", group = "storage_backend")]
pub enable_oss: bool,
#[clap(flatten)]
pub oss: PrefixedOssConnection,
/// Whether to use GCS.
#[clap(long, alias = "gcs")]
#[clap(long = "gcs", group = "storage_backend")]
pub enable_gcs: bool,
#[clap(flatten)]
pub gcs: PrefixedGcsConnection,
/// Whether to use Azure Blob.
#[clap(long, alias = "azblob")]
#[clap(long = "azblob", group = "storage_backend")]
pub enable_azblob: bool,
#[clap(flatten)]
@@ -173,52 +405,66 @@ pub fn new_fs_object_store(root: &str) -> std::result::Result<ObjectStore, Boxed
Ok(with_instrument_layers(object_store, false))
}
macro_rules! gen_object_store_builder {
($method:ident, $field:ident, $conn_type:ty, $service_type:ty) => {
pub fn $method(&self) -> Result<ObjectStore, BoxedError> {
let config = <$conn_type>::from(self.$field.clone());
common_telemetry::info!(
"Building object store with {}: {:?}",
stringify!($field),
config
);
let object_store = ObjectStore::new(<$service_type>::from(&config))
.context(error::InitBackendSnafu)
.map_err(BoxedError::new)?
.finish();
Ok(with_instrument_layers(
with_retry_layers(object_store),
false,
))
}
};
}
impl ObjectStoreConfig {
gen_object_store_builder!(build_s3, s3, S3Connection, S3);
gen_object_store_builder!(build_oss, oss, OssConnection, Oss);
gen_object_store_builder!(build_gcs, gcs, GcsConnection, Gcs);
gen_object_store_builder!(build_azblob, azblob, AzblobConnection, Azblob);
pub fn validate(&self) -> Result<(), BoxedError> {
if self.enable_s3 {
self.s3.validate()?;
}
if self.enable_oss {
self.oss.validate()?;
}
if self.enable_gcs {
self.gcs.validate()?;
}
if self.enable_azblob {
self.azblob.validate()?;
}
Ok(())
}
/// Builds the object store from the config.
pub fn build(&self) -> Result<Option<ObjectStore>, BoxedError> {
let object_store = if self.enable_s3 {
let s3 = S3Connection::from(self.s3.clone());
common_telemetry::info!("Building object store with s3: {:?}", s3);
Some(
ObjectStore::new(S3::from(&s3))
.context(error::InitBackendSnafu)
.map_err(BoxedError::new)?
.finish(),
)
self.validate()?;
if self.enable_s3 {
self.build_s3().map(Some)
} else if self.enable_oss {
let oss = OssConnection::from(self.oss.clone());
common_telemetry::info!("Building object store with oss: {:?}", oss);
Some(
ObjectStore::new(Oss::from(&oss))
.context(error::InitBackendSnafu)
.map_err(BoxedError::new)?
.finish(),
)
self.build_oss().map(Some)
} else if self.enable_gcs {
let gcs = GcsConnection::from(self.gcs.clone());
common_telemetry::info!("Building object store with gcs: {:?}", gcs);
Some(
ObjectStore::new(Gcs::from(&gcs))
.context(error::InitBackendSnafu)
.map_err(BoxedError::new)?
.finish(),
)
self.build_gcs().map(Some)
} else if self.enable_azblob {
let azblob = AzblobConnection::from(self.azblob.clone());
common_telemetry::info!("Building object store with azblob: {:?}", azblob);
Some(
ObjectStore::new(Azblob::from(&azblob))
.context(error::InitBackendSnafu)
.map_err(BoxedError::new)?
.finish(),
)
self.build_azblob().map(Some)
} else {
None
};
let object_store = object_store
.map(|object_store| with_instrument_layers(with_retry_layers(object_store), false));
Ok(object_store)
Ok(None)
}
}
}

View File

@@ -14,6 +14,7 @@
mod export;
mod import;
mod storage_export;
use clap::Subcommand;
use client::DEFAULT_CATALOG_NAME;

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,373 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use common_base::secrets::{ExposeSecret, SecretString};
use common_error::ext::BoxedError;
use crate::common::{
PrefixedAzblobConnection, PrefixedGcsConnection, PrefixedOssConnection, PrefixedS3Connection,
};
/// Helper function to extract secret string from Option<SecretString>.
/// Returns empty string if None.
fn expose_optional_secret(secret: &Option<SecretString>) -> &str {
secret
.as_ref()
.map(|s| s.expose_secret().as_str())
.unwrap_or("")
}
/// Helper function to format root path with leading slash if non-empty.
fn format_root_path(root: &str) -> String {
if root.is_empty() {
String::new()
} else {
format!("/{}", root)
}
}
/// Helper function to mask multiple secrets in a string.
fn mask_secrets(mut sql: String, secrets: &[&str]) -> String {
for secret in secrets {
if !secret.is_empty() {
sql = sql.replace(secret, "[REDACTED]");
}
}
sql
}
/// Helper function to format storage URI.
fn format_uri(scheme: &str, bucket: &str, root: &str, path: &str) -> String {
let root = format_root_path(root);
format!("{}://{}{}/{}", scheme, bucket, root, path)
}
/// Trait for storage backends that can be used for data export.
pub trait StorageExport: Send + Sync {
/// Generate the storage path for COPY DATABASE command.
/// Returns (path, connection_string) where connection_string includes CONNECTION clause.
fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String);
/// Format the output path for logging purposes.
fn format_output_path(&self, file_path: &str) -> String;
/// Mask sensitive information in SQL commands for safe logging.
fn mask_sensitive_info(&self, sql: &str) -> String;
}
macro_rules! define_backend {
($name:ident, $config:ty) => {
#[derive(Clone)]
pub struct $name {
config: $config,
}
impl $name {
pub fn new(config: $config) -> Result<Self, BoxedError> {
config.validate()?;
Ok(Self { config })
}
}
};
}
/// Local file system storage backend.
#[derive(Clone)]
pub struct FsBackend {
output_dir: String,
}
impl FsBackend {
pub fn new(output_dir: String) -> Self {
Self { output_dir }
}
}
impl StorageExport for FsBackend {
fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
if self.output_dir.is_empty() {
unreachable!("output_dir must be set when not using remote storage")
}
let path = PathBuf::from(&self.output_dir)
.join(catalog)
.join(format!("{schema}/"))
.to_string_lossy()
.to_string();
(path, String::new())
}
fn format_output_path(&self, file_path: &str) -> String {
format!("{}/{}", self.output_dir, file_path)
}
fn mask_sensitive_info(&self, sql: &str) -> String {
sql.to_string()
}
}
define_backend!(S3Backend, PrefixedS3Connection);
impl StorageExport for S3Backend {
fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
let s3_path = format_uri(
"s3",
&self.config.s3_bucket,
&self.config.s3_root,
&format!("{}/{}/", catalog, schema),
);
let mut connection_options = vec![
format!(
"ACCESS_KEY_ID='{}'",
expose_optional_secret(&self.config.s3_access_key_id)
),
format!(
"SECRET_ACCESS_KEY='{}'",
expose_optional_secret(&self.config.s3_secret_access_key)
),
];
if let Some(region) = &self.config.s3_region {
connection_options.push(format!("REGION='{}'", region));
}
if let Some(endpoint) = &self.config.s3_endpoint {
connection_options.push(format!("ENDPOINT='{}'", endpoint));
}
let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
(s3_path, connection_str)
}
fn format_output_path(&self, file_path: &str) -> String {
format_uri(
"s3",
&self.config.s3_bucket,
&self.config.s3_root,
file_path,
)
}
fn mask_sensitive_info(&self, sql: &str) -> String {
mask_secrets(
sql.to_string(),
&[
expose_optional_secret(&self.config.s3_access_key_id),
expose_optional_secret(&self.config.s3_secret_access_key),
],
)
}
}
define_backend!(OssBackend, PrefixedOssConnection);
impl StorageExport for OssBackend {
fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
let oss_path = format_uri(
"oss",
&self.config.oss_bucket,
&self.config.oss_root,
&format!("{}/{}/", catalog, schema),
);
let connection_options = [
format!(
"ACCESS_KEY_ID='{}'",
expose_optional_secret(&self.config.oss_access_key_id)
),
format!(
"ACCESS_KEY_SECRET='{}'",
expose_optional_secret(&self.config.oss_access_key_secret)
),
];
let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
(oss_path, connection_str)
}
fn format_output_path(&self, file_path: &str) -> String {
format_uri(
"oss",
&self.config.oss_bucket,
&self.config.oss_root,
file_path,
)
}
fn mask_sensitive_info(&self, sql: &str) -> String {
mask_secrets(
sql.to_string(),
&[
expose_optional_secret(&self.config.oss_access_key_id),
expose_optional_secret(&self.config.oss_access_key_secret),
],
)
}
}
define_backend!(GcsBackend, PrefixedGcsConnection);
impl StorageExport for GcsBackend {
fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
let gcs_path = format_uri(
"gcs",
&self.config.gcs_bucket,
&self.config.gcs_root,
&format!("{}/{}/", catalog, schema),
);
let mut connection_options = Vec::new();
let credential_path = expose_optional_secret(&self.config.gcs_credential_path);
if !credential_path.is_empty() {
connection_options.push(format!("CREDENTIAL_PATH='{}'", credential_path));
}
let credential = expose_optional_secret(&self.config.gcs_credential);
if !credential.is_empty() {
connection_options.push(format!("CREDENTIAL='{}'", credential));
}
if !self.config.gcs_endpoint.is_empty() {
connection_options.push(format!("ENDPOINT='{}'", self.config.gcs_endpoint));
}
let connection_str = if connection_options.is_empty() {
String::new()
} else {
format!(" CONNECTION ({})", connection_options.join(", "))
};
(gcs_path, connection_str)
}
fn format_output_path(&self, file_path: &str) -> String {
format_uri(
"gcs",
&self.config.gcs_bucket,
&self.config.gcs_root,
file_path,
)
}
fn mask_sensitive_info(&self, sql: &str) -> String {
mask_secrets(
sql.to_string(),
&[
expose_optional_secret(&self.config.gcs_credential_path),
expose_optional_secret(&self.config.gcs_credential),
],
)
}
}
define_backend!(AzblobBackend, PrefixedAzblobConnection);
impl StorageExport for AzblobBackend {
fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
let azblob_path = format_uri(
"azblob",
&self.config.azblob_container,
&self.config.azblob_root,
&format!("{}/{}/", catalog, schema),
);
let mut connection_options = vec![
format!(
"ACCOUNT_NAME='{}'",
expose_optional_secret(&self.config.azblob_account_name)
),
format!(
"ACCOUNT_KEY='{}'",
expose_optional_secret(&self.config.azblob_account_key)
),
];
if let Some(sas_token) = &self.config.azblob_sas_token {
connection_options.push(format!("SAS_TOKEN='{}'", sas_token));
}
let connection_str = format!(" CONNECTION ({})", connection_options.join(", "));
(azblob_path, connection_str)
}
fn format_output_path(&self, file_path: &str) -> String {
format_uri(
"azblob",
&self.config.azblob_container,
&self.config.azblob_root,
file_path,
)
}
fn mask_sensitive_info(&self, sql: &str) -> String {
mask_secrets(
sql.to_string(),
&[
expose_optional_secret(&self.config.azblob_account_name),
expose_optional_secret(&self.config.azblob_account_key),
],
)
}
}
#[derive(Clone)]
pub enum StorageType {
Fs(FsBackend),
S3(S3Backend),
Oss(OssBackend),
Gcs(GcsBackend),
Azblob(AzblobBackend),
}
impl StorageExport for StorageType {
fn get_storage_path(&self, catalog: &str, schema: &str) -> (String, String) {
match self {
StorageType::Fs(backend) => backend.get_storage_path(catalog, schema),
StorageType::S3(backend) => backend.get_storage_path(catalog, schema),
StorageType::Oss(backend) => backend.get_storage_path(catalog, schema),
StorageType::Gcs(backend) => backend.get_storage_path(catalog, schema),
StorageType::Azblob(backend) => backend.get_storage_path(catalog, schema),
}
}
fn format_output_path(&self, file_path: &str) -> String {
match self {
StorageType::Fs(backend) => backend.format_output_path(file_path),
StorageType::S3(backend) => backend.format_output_path(file_path),
StorageType::Oss(backend) => backend.format_output_path(file_path),
StorageType::Gcs(backend) => backend.format_output_path(file_path),
StorageType::Azblob(backend) => backend.format_output_path(file_path),
}
}
fn mask_sensitive_info(&self, sql: &str) -> String {
match self {
StorageType::Fs(backend) => backend.mask_sensitive_info(sql),
StorageType::S3(backend) => backend.mask_sensitive_info(sql),
StorageType::Oss(backend) => backend.mask_sensitive_info(sql),
StorageType::Gcs(backend) => backend.mask_sensitive_info(sql),
StorageType::Azblob(backend) => backend.mask_sensitive_info(sql),
}
}
}
impl StorageType {
/// Returns true if the storage backend is remote (not local filesystem).
pub fn is_remote_storage(&self) -> bool {
!matches!(self, StorageType::Fs(_))
}
}

View File

@@ -253,12 +253,6 @@ pub enum Error {
error: ObjectStoreError,
},
#[snafu(display("S3 config need be set"))]
S3ConfigNotSet {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Output directory not set"))]
OutputDirNotSet {
#[snafu(implicit)]
@@ -364,9 +358,9 @@ impl ErrorExt for Error {
Error::Other { source, .. } => source.status_code(),
Error::OpenDal { .. } | Error::InitBackend { .. } => StatusCode::Internal,
Error::S3ConfigNotSet { .. }
| Error::OutputDirNotSet { .. }
| Error::EmptyStoreAddrs { .. } => StatusCode::InvalidArguments,
Error::OutputDirNotSet { .. } | Error::EmptyStoreAddrs { .. } => {
StatusCode::InvalidArguments
}
Error::BuildRuntime { source, .. } => source.status_code(),

View File

@@ -552,9 +552,8 @@ impl StartCommand {
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
.set_handler(weak_grpc_handler)
.await;
// set the frontend invoker for flownode
let flow_streaming_engine = flownode.flow_engine().streaming_engine();

View File

@@ -59,15 +59,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to canonicalize path: {}", path))]
CanonicalizePath {
path: String,
#[snafu(source)]
error: std::io::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid path '{}': expected a file, not a directory", path))]
InvalidPath {
path: String,
@@ -82,8 +73,7 @@ impl ErrorExt for Error {
Error::TomlFormat { .. }
| Error::LoadLayeredConfig { .. }
| Error::FileWatch { .. }
| Error::InvalidPath { .. }
| Error::CanonicalizePath { .. } => StatusCode::InvalidArguments,
| Error::InvalidPath { .. } => StatusCode::InvalidArguments,
Error::SerdeJson { .. } => StatusCode::Unexpected,
}
}

View File

@@ -30,7 +30,7 @@ use common_telemetry::{error, info, warn};
use notify::{EventKind, RecursiveMode, Watcher};
use snafu::ResultExt;
use crate::error::{CanonicalizePathSnafu, FileWatchSnafu, InvalidPathSnafu, Result};
use crate::error::{FileWatchSnafu, InvalidPathSnafu, Result};
/// Configuration for the file watcher behavior.
#[derive(Debug, Clone, Default)]
@@ -41,15 +41,10 @@ pub struct FileWatcherConfig {
impl FileWatcherConfig {
pub fn new() -> Self {
Self::default()
Default::default()
}
pub fn with_modify_and_create(mut self) -> Self {
self.include_remove_events = false;
self
}
pub fn with_remove_events(mut self) -> Self {
pub fn include_remove_events(mut self) -> Self {
self.include_remove_events = true;
self
}
@@ -93,11 +88,8 @@ impl FileWatcherBuilder {
path: path.display().to_string(),
}
);
// Canonicalize the path for reliable comparison with event paths
let canonical = path.canonicalize().context(CanonicalizePathSnafu {
path: path.display().to_string(),
})?;
self.file_paths.push(canonical);
self.file_paths.push(path.to_path_buf());
Ok(self)
}
@@ -144,7 +136,6 @@ impl FileWatcherBuilder {
}
let config = self.config;
let watched_files: HashSet<PathBuf> = self.file_paths.iter().cloned().collect();
info!(
"Spawning file watcher for paths: {:?} (watching parent directories)",
@@ -165,25 +156,7 @@ impl FileWatcherBuilder {
continue;
}
// Check if any of the event paths match our watched files
let is_watched_file = event.paths.iter().any(|event_path| {
// Try to canonicalize the event path for comparison
// If the file was deleted, canonicalize will fail, so we also
// compare the raw path
if let Ok(canonical) = event_path.canonicalize()
&& watched_files.contains(&canonical)
{
return true;
}
// For deleted files, compare using the raw path
watched_files.contains(event_path)
});
if !is_watched_file {
continue;
}
info!(?event.kind, ?event.paths, "Detected file change");
info!(?event.kind, ?event.paths, "Detected folder change");
callback();
}
Err(err) => {
@@ -301,55 +274,4 @@ mod tests {
"Watcher should have detected file recreation"
);
}
#[test]
fn test_file_watcher_ignores_other_files() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("test_file_watcher_other");
let watched_file = dir.path().join("watched.txt");
let other_file = dir.path().join("other.txt");
// Create both files
std::fs::write(&watched_file, "watched content").unwrap();
std::fs::write(&other_file, "other content").unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
FileWatcherBuilder::new()
.watch_path(&watched_file)
.unwrap()
.config(FileWatcherConfig::new())
.spawn(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
})
.unwrap();
// Give watcher time to start
std::thread::sleep(Duration::from_millis(100));
// Modify the other file - should NOT trigger callback
std::fs::write(&other_file, "modified other content").unwrap();
// Wait for potential event
std::thread::sleep(Duration::from_millis(500));
assert_eq!(
counter.load(Ordering::SeqCst),
0,
"Watcher should not have detected changes to other files"
);
// Now modify the watched file - SHOULD trigger callback
std::fs::write(&watched_file, "modified watched content").unwrap();
// Wait for the event to be processed
std::thread::sleep(Duration::from_millis(500));
assert!(
counter.load(Ordering::SeqCst) >= 1,
"Watcher should have detected change to watched file"
);
}
}

View File

@@ -27,6 +27,7 @@ const SECRET_ACCESS_KEY: &str = "secret_access_key";
const SESSION_TOKEN: &str = "session_token";
const REGION: &str = "region";
const ENABLE_VIRTUAL_HOST_STYLE: &str = "enable_virtual_host_style";
const DISABLE_EC2_METADATA: &str = "disable_ec2_metadata";
pub fn is_supported_in_s3(key: &str) -> bool {
[
@@ -36,6 +37,7 @@ pub fn is_supported_in_s3(key: &str) -> bool {
SESSION_TOKEN,
REGION,
ENABLE_VIRTUAL_HOST_STYLE,
DISABLE_EC2_METADATA,
]
.contains(&key)
}
@@ -82,6 +84,21 @@ pub fn build_s3_backend(
}
}
if let Some(disable_str) = connection.get(DISABLE_EC2_METADATA) {
let disable = disable_str.as_str().parse::<bool>().map_err(|e| {
error::InvalidConnectionSnafu {
msg: format!(
"failed to parse the option {}={}, {}",
DISABLE_EC2_METADATA, disable_str, e
),
}
.build()
})?;
if disable {
builder = builder.disable_ec2_metadata();
}
}
// TODO(weny): Consider finding a better way to eliminate duplicate code.
Ok(ObjectStore::new(builder)
.context(error::BuildBackendSnafu)?
@@ -109,6 +126,7 @@ mod tests {
assert!(is_supported_in_s3(SESSION_TOKEN));
assert!(is_supported_in_s3(REGION));
assert!(is_supported_in_s3(ENABLE_VIRTUAL_HOST_STYLE));
assert!(is_supported_in_s3(DISABLE_EC2_METADATA));
assert!(!is_supported_in_s3("foo"))
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
@@ -35,6 +36,14 @@ pub enum Error {
#[snafu(display("Memory semaphore unexpectedly closed"))]
MemorySemaphoreClosed,
#[snafu(display(
"Timeout waiting for memory quota: requested {requested_bytes} bytes, waited {waited:?}"
))]
MemoryAcquireTimeout {
requested_bytes: u64,
waited: Duration,
},
}
impl ErrorExt for Error {
@@ -44,6 +53,7 @@ impl ErrorExt for Error {
match self {
MemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
MemorySemaphoreClosed => StatusCode::Unexpected,
MemoryAcquireTimeout { .. } => StatusCode::RuntimeResourcesExhausted,
}
}

View File

@@ -0,0 +1,168 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
/// Memory permit granularity for different use cases.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PermitGranularity {
/// 1 KB per permit
///
/// Use for:
/// - HTTP/gRPC request limiting (small, high-concurrency operations)
/// - Small batch operations
/// - Scenarios requiring fine-grained fairness
Kilobyte,
/// 1 MB per permit (default)
///
/// Use for:
/// - Query execution memory management
/// - Compaction memory control
/// - Large, long-running operations
#[default]
Megabyte,
}
impl PermitGranularity {
/// Returns the number of bytes per permit.
#[inline]
pub const fn bytes(self) -> u64 {
match self {
Self::Kilobyte => 1024,
Self::Megabyte => 1024 * 1024,
}
}
/// Returns a human-readable string representation.
pub const fn as_str(self) -> &'static str {
match self {
Self::Kilobyte => "1KB",
Self::Megabyte => "1MB",
}
}
/// Converts bytes to permits based on this granularity.
///
/// Rounds up to ensure the requested bytes are fully covered.
/// Clamped to Semaphore::MAX_PERMITS.
#[inline]
pub fn bytes_to_permits(self, bytes: u64) -> u32 {
use tokio::sync::Semaphore;
let granularity_bytes = self.bytes();
bytes
.saturating_add(granularity_bytes - 1)
.saturating_div(granularity_bytes)
.min(Semaphore::MAX_PERMITS as u64)
.min(u32::MAX as u64) as u32
}
/// Converts permits to bytes based on this granularity.
#[inline]
pub fn permits_to_bytes(self, permits: u32) -> u64 {
(permits as u64).saturating_mul(self.bytes())
}
}
impl fmt::Display for PermitGranularity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bytes_to_permits_kilobyte() {
let granularity = PermitGranularity::Kilobyte;
// Exact multiples
assert_eq!(granularity.bytes_to_permits(1024), 1);
assert_eq!(granularity.bytes_to_permits(2048), 2);
assert_eq!(granularity.bytes_to_permits(10 * 1024), 10);
// Rounds up
assert_eq!(granularity.bytes_to_permits(1), 1);
assert_eq!(granularity.bytes_to_permits(1025), 2);
assert_eq!(granularity.bytes_to_permits(2047), 2);
}
#[test]
fn test_bytes_to_permits_megabyte() {
let granularity = PermitGranularity::Megabyte;
// Exact multiples
assert_eq!(granularity.bytes_to_permits(1024 * 1024), 1);
assert_eq!(granularity.bytes_to_permits(2 * 1024 * 1024), 2);
// Rounds up
assert_eq!(granularity.bytes_to_permits(1), 1);
assert_eq!(granularity.bytes_to_permits(1024), 1);
assert_eq!(granularity.bytes_to_permits(1024 * 1024 + 1), 2);
}
#[test]
fn test_bytes_to_permits_zero_bytes() {
assert_eq!(PermitGranularity::Kilobyte.bytes_to_permits(0), 0);
assert_eq!(PermitGranularity::Megabyte.bytes_to_permits(0), 0);
}
#[test]
fn test_bytes_to_permits_clamps_to_maximum() {
use tokio::sync::Semaphore;
let max_permits = (Semaphore::MAX_PERMITS as u64).min(u32::MAX as u64) as u32;
assert_eq!(
PermitGranularity::Kilobyte.bytes_to_permits(u64::MAX),
max_permits
);
assert_eq!(
PermitGranularity::Megabyte.bytes_to_permits(u64::MAX),
max_permits
);
}
#[test]
fn test_permits_to_bytes() {
assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(1), 1024);
assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(10), 10 * 1024);
assert_eq!(PermitGranularity::Megabyte.permits_to_bytes(1), 1024 * 1024);
assert_eq!(
PermitGranularity::Megabyte.permits_to_bytes(10),
10 * 1024 * 1024
);
}
#[test]
fn test_round_trip_conversion() {
// Kilobyte: bytes -> permits -> bytes (should round up)
let kb = PermitGranularity::Kilobyte;
let permits = kb.bytes_to_permits(1500);
let bytes = kb.permits_to_bytes(permits);
assert!(bytes >= 1500); // Must cover original request
assert_eq!(bytes, 2048); // 2KB
// Megabyte: bytes -> permits -> bytes (should round up)
let mb = PermitGranularity::Megabyte;
let permits = mb.bytes_to_permits(1500);
let bytes = mb.permits_to_bytes(permits);
assert!(bytes >= 1500);
assert_eq!(bytes, 1024 * 1024); // 1MB
}
}

View File

@@ -17,7 +17,7 @@ use std::{fmt, mem};
use common_telemetry::debug;
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use crate::manager::{MemoryMetrics, MemoryQuota, bytes_to_permits, permits_to_bytes};
use crate::manager::{MemoryMetrics, MemoryQuota};
/// Guard representing a slice of reserved memory.
pub struct MemoryGuard<M: MemoryMetrics> {
@@ -49,7 +49,9 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
pub fn granted_bytes(&self) -> u64 {
match &self.state {
GuardState::Unlimited => 0,
GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
GuardState::Limited { permit, quota } => {
quota.permits_to_bytes(permit.num_permits() as u32)
}
}
}
@@ -65,7 +67,7 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
return true;
}
let additional_permits = bytes_to_permits(bytes);
let additional_permits = quota.bytes_to_permits(bytes);
match quota
.semaphore
@@ -99,11 +101,12 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
return true;
}
let release_permits = bytes_to_permits(bytes);
let release_permits = quota.bytes_to_permits(bytes);
match permit.split(release_permits as usize) {
Some(released_permit) => {
let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
let released_bytes =
quota.permits_to_bytes(released_permit.num_permits() as u32);
drop(released_permit);
quota.update_in_use_metric();
debug!("Early released {} bytes from memory guard", released_bytes);
@@ -121,7 +124,7 @@ impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
if let GuardState::Limited { permit, quota } =
mem::replace(&mut self.state, GuardState::Unlimited)
{
let bytes = permits_to_bytes(permit.num_permits() as u32);
let bytes = quota.permits_to_bytes(permit.num_permits() as u32);
drop(permit);
quota.update_in_use_metric();
debug!("Released memory: {} bytes", bytes);

View File

@@ -19,6 +19,7 @@
//! share the same allocation logic while using their own metrics.
mod error;
mod granularity;
mod guard;
mod manager;
mod policy;
@@ -27,8 +28,9 @@ mod policy;
mod tests;
pub use error::{Error, Result};
pub use granularity::PermitGranularity;
pub use guard::MemoryGuard;
pub use manager::{MemoryManager, MemoryMetrics, PERMIT_GRANULARITY_BYTES};
pub use manager::{MemoryManager, MemoryMetrics};
pub use policy::{DEFAULT_MEMORY_WAIT_TIMEOUT, OnExhaustedPolicy};
/// No-op metrics implementation for testing.

View File

@@ -17,11 +17,12 @@ use std::sync::Arc;
use snafu::ensure;
use tokio::sync::{Semaphore, TryAcquireError};
use crate::error::{MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result};
use crate::error::{
MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
};
use crate::granularity::PermitGranularity;
use crate::guard::MemoryGuard;
/// Minimum bytes controlled by one semaphore permit.
pub const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
use crate::policy::OnExhaustedPolicy;
/// Trait for recording memory usage metrics.
pub trait MemoryMetrics: Clone + Send + Sync + 'static {
@@ -40,6 +41,7 @@ pub struct MemoryManager<M: MemoryMetrics> {
pub(crate) struct MemoryQuota<M: MemoryMetrics> {
pub(crate) semaphore: Arc<Semaphore>,
pub(crate) limit_permits: u32,
pub(crate) granularity: PermitGranularity,
pub(crate) metrics: M,
}
@@ -47,19 +49,25 @@ impl<M: MemoryMetrics> MemoryManager<M> {
/// Creates a new memory manager with the given limit in bytes.
/// `limit_bytes = 0` disables the limit.
pub fn new(limit_bytes: u64, metrics: M) -> Self {
Self::with_granularity(limit_bytes, PermitGranularity::default(), metrics)
}
/// Creates a new memory manager with specified granularity.
pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self {
if limit_bytes == 0 {
metrics.set_limit(0);
return Self { quota: None };
}
let limit_permits = bytes_to_permits(limit_bytes);
let limit_aligned_bytes = permits_to_bytes(limit_permits);
let limit_permits = granularity.bytes_to_permits(limit_bytes);
let limit_aligned_bytes = granularity.permits_to_bytes(limit_permits);
metrics.set_limit(limit_aligned_bytes as i64);
Self {
quota: Some(MemoryQuota {
semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
limit_permits,
granularity,
metrics,
}),
}
@@ -69,7 +77,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn limit_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.limit_permits))
.map(|quota| quota.permits_to_bytes(quota.limit_permits))
.unwrap_or(0)
}
@@ -77,7 +85,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn used_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.used_permits()))
.map(|quota| quota.permits_to_bytes(quota.used_permits()))
.unwrap_or(0)
}
@@ -85,7 +93,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn available_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.available_permits_clamped()))
.map(|quota| quota.permits_to_bytes(quota.available_permits_clamped()))
.unwrap_or(0)
}
@@ -98,13 +106,13 @@ impl<M: MemoryMetrics> MemoryManager<M> {
match &self.quota {
None => Ok(MemoryGuard::unlimited()),
Some(quota) => {
let permits = bytes_to_permits(bytes);
let permits = quota.bytes_to_permits(bytes);
ensure!(
permits <= quota.limit_permits,
MemoryLimitExceededSnafu {
requested_bytes: bytes,
limit_bytes: permits_to_bytes(quota.limit_permits),
limit_bytes: self.limit_bytes()
}
);
@@ -125,7 +133,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
match &self.quota {
None => Some(MemoryGuard::unlimited()),
Some(quota) => {
let permits = bytes_to_permits(bytes);
let permits = quota.bytes_to_permits(bytes);
match quota.semaphore.clone().try_acquire_many_owned(permits) {
Ok(permit) => {
@@ -140,9 +148,56 @@ impl<M: MemoryMetrics> MemoryManager<M> {
}
}
}
/// Acquires memory based on the given policy.
///
/// - For `OnExhaustedPolicy::Wait`: Waits up to the timeout duration for memory to become available
/// - For `OnExhaustedPolicy::Fail`: Returns immediately if memory is not available
///
/// # Errors
/// - `MemoryLimitExceeded`: Requested bytes exceed the total limit (both policies), or memory is currently exhausted (Fail policy only)
/// - `MemoryAcquireTimeout`: Timeout elapsed while waiting for memory (Wait policy only)
/// - `MemorySemaphoreClosed`: The internal semaphore is unexpectedly closed (rare, indicates system issue)
pub async fn acquire_with_policy(
&self,
bytes: u64,
policy: OnExhaustedPolicy,
) -> Result<MemoryGuard<M>> {
match policy {
OnExhaustedPolicy::Wait { timeout } => {
match tokio::time::timeout(timeout, self.acquire(bytes)).await {
Ok(Ok(guard)) => Ok(guard),
Ok(Err(e)) => Err(e),
Err(_elapsed) => {
// Timeout elapsed while waiting
MemoryAcquireTimeoutSnafu {
requested_bytes: bytes,
waited: timeout,
}
.fail()
}
}
}
OnExhaustedPolicy::Fail => self.try_acquire(bytes).ok_or_else(|| {
MemoryLimitExceededSnafu {
requested_bytes: bytes,
limit_bytes: self.limit_bytes(),
}
.build()
}),
}
}
}
impl<M: MemoryMetrics> MemoryQuota<M> {
pub(crate) fn bytes_to_permits(&self, bytes: u64) -> u32 {
self.granularity.bytes_to_permits(bytes)
}
pub(crate) fn permits_to_bytes(&self, permits: u32) -> u64 {
self.granularity.permits_to_bytes(permits)
}
pub(crate) fn used_permits(&self) -> u32 {
self.limit_permits
.saturating_sub(self.available_permits_clamped())
@@ -155,19 +210,7 @@ impl<M: MemoryMetrics> MemoryQuota<M> {
}
pub(crate) fn update_in_use_metric(&self) {
let bytes = permits_to_bytes(self.used_permits());
let bytes = self.permits_to_bytes(self.used_permits());
self.metrics.set_in_use(bytes as i64);
}
}
pub(crate) fn bytes_to_permits(bytes: u64) -> u32 {
bytes
.saturating_add(PERMIT_GRANULARITY_BYTES - 1)
.saturating_div(PERMIT_GRANULARITY_BYTES)
.min(Semaphore::MAX_PERMITS as u64)
.min(u32::MAX as u64) as u32
}
pub(crate) fn permits_to_bytes(permits: u32) -> u64 {
(permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
}

View File

@@ -14,7 +14,10 @@
use tokio::time::{Duration, sleep};
use crate::{MemoryManager, NoOpMetrics, PERMIT_GRANULARITY_BYTES};
use crate::{MemoryManager, NoOpMetrics, PermitGranularity};
// Helper constant for tests - use default Megabyte granularity
const PERMIT_GRANULARITY_BYTES: u64 = PermitGranularity::Megabyte.bytes();
#[test]
fn test_try_acquire_unlimited() {

View File

@@ -71,6 +71,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
}),
MetricType::GAUGE => timeseries.push(TimeSeries {
labels: convert_label(m.get_label(), mf_name, None),
@@ -79,6 +80,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
}),
MetricType::HISTOGRAM => {
let h = m.get_histogram();
@@ -97,6 +99,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
if upper_bound.is_sign_positive() && upper_bound.is_infinite() {
inf_seen = true;
@@ -114,6 +117,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
}
timeseries.push(TimeSeries {
@@ -127,6 +131,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
timeseries.push(TimeSeries {
labels: convert_label(
@@ -139,6 +144,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
}
MetricType::SUMMARY => {
@@ -155,6 +161,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
}
timeseries.push(TimeSeries {
@@ -168,6 +175,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
timeseries.push(TimeSeries {
labels: convert_label(
@@ -180,6 +188,7 @@ pub fn convert_metric_to_write_request(
timestamp,
}],
exemplars: vec![],
histograms: vec![],
});
}
MetricType::UNTYPED => {
@@ -274,7 +283,7 @@ mod test {
assert_eq!(
format!("{:?}", write_quest.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }]"#
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
);
let gauge_opts = Opts::new("test_gauge", "test help")
@@ -288,7 +297,7 @@ mod test {
let write_quest = convert_metric_to_write_request(mf, None, 0);
assert_eq!(
format!("{:?}", write_quest.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [] }]"#
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
);
}
@@ -305,20 +314,20 @@ mod test {
.iter()
.map(|x| format!("{:?}", x))
.collect();
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }"#;
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }"#;
assert_eq!(write_quest_str.join("\n"), ans);
}
@@ -355,10 +364,10 @@ TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" },
.iter()
.map(|x| format!("{:?}", x))
.collect();
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }"#;
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [], histograms: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [], histograms: [] }"#;
assert_eq!(write_quest_str.join("\n"), ans);
}
@@ -385,11 +394,11 @@ TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }],
let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0);
assert_eq!(
format!("{:?}", write_quest1.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [], histograms: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
);
assert_eq!(
format!("{:?}", write_quest2.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [], histograms: [] }]"#
);
}
}

View File

@@ -15,7 +15,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::sync::{Arc, Mutex, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
@@ -38,6 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use tokio::sync::SetOnce;
use crate::batching_mode::BatchingModeOptions;
use crate::error::{
@@ -75,7 +76,19 @@ impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send
}
}
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
#[derive(Debug, Clone)]
pub struct HandlerMutable {
handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
is_initialized: Arc<SetOnce<()>>,
}
impl HandlerMutable {
pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
*self.handler.lock().unwrap() = Some(handler);
// Ignore the error, as we allow the handler to be set multiple times.
let _ = self.is_initialized.set(());
}
}
/// A simple frontend client able to execute sql using grpc protocol
///
@@ -100,7 +113,11 @@ pub enum FrontendClient {
impl FrontendClient {
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None));
let is_initialized = Arc::new(SetOnce::new());
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(None)),
is_initialized,
};
(
Self::Standalone {
database_client: handler.clone(),
@@ -110,23 +127,13 @@ impl FrontendClient {
)
}
/// Check if the frontend client is initialized.
///
/// In distributed mode, it is always initialized.
/// In standalone mode, it checks if the database client is set.
pub fn is_initialized(&self) -> bool {
match self {
FrontendClient::Distributed { .. } => true,
FrontendClient::Standalone {
database_client, ..
} => {
let guard = database_client.lock();
if let Ok(guard) = guard {
guard.is_some()
} else {
false
}
}
/// Waits until the frontend client is initialized.
pub async fn wait_initialized(&self) {
if let FrontendClient::Standalone {
database_client, ..
} = self
{
database_client.is_initialized.wait().await;
}
}
@@ -158,8 +165,14 @@ impl FrontendClient {
grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
query: QueryOptions,
) -> Self {
let is_initialized = Arc::new(SetOnce::new_with(Some(())));
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(Some(grpc_handler))),
is_initialized: is_initialized.clone(),
};
Self::Standalone {
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
database_client: handler,
query,
}
}
@@ -341,6 +354,7 @@ impl FrontendClient {
{
let database_client = {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
@@ -418,6 +432,7 @@ impl FrontendClient {
{
let database_client = {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
@@ -480,3 +495,73 @@ impl std::fmt::Display for PeerDesc {
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_query::Output;
use tokio::time::timeout;
use super::*;
#[derive(Debug)]
struct NoopHandler;
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
async fn do_query(
&self,
_query: Request,
_ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
Ok(Output::new_with_affected_rows(0))
}
}
#[tokio::test]
async fn wait_initialized() {
let (client, handler_mut) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
assert!(
timeout(Duration::from_millis(50), client.wait_initialized())
.await
.is_err()
);
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
handler_mut.set_handler(Arc::downgrade(&handler)).await;
timeout(Duration::from_secs(1), client.wait_initialized())
.await
.expect("wait_initialized should complete after handler is set");
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.expect("wait_initialized should be a no-op once initialized");
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
let meta_client = Arc::new(MetaClient::default());
let client = FrontendClient::from_meta_client(
meta_client,
None,
QueryOptions::default(),
BatchingModeOptions::default(),
)
.unwrap();
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
}
}

View File

@@ -7,6 +7,9 @@ license.workspace = true
[lints]
workspace = true
[features]
vector_index = ["dep:usearch"]
[dependencies]
async-trait.workspace = true
asynchronous-codec = "0.7.0"
@@ -41,7 +44,7 @@ tantivy = { version = "0.24", features = ["zstd-compression"] }
tantivy-jieba = "0.16"
tokio.workspace = true
tokio-util.workspace = true
usearch = { version = "2.21", default-features = false, features = ["fp16lib"] }
usearch = { version = "2.21", default-features = false, features = ["fp16lib"], optional = true }
uuid.workspace = true
[dev-dependencies]

View File

@@ -22,6 +22,7 @@ pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;
pub mod target;
#[cfg(feature = "vector_index")]
pub mod vector;
pub type Bytes = Vec<u8>;

View File

@@ -25,7 +25,7 @@ use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu, MemoryAcquireFailedSnafu};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
use crate::region::RegionRoleState;
@@ -95,80 +95,16 @@ impl CompactionTaskImpl {
async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
let region_id = self.compaction_region.region_id;
let requested_bytes = self.estimated_memory_bytes;
let limit_bytes = self.memory_manager.limit_bytes();
let policy = self.memory_policy;
if limit_bytes > 0 && requested_bytes > limit_bytes {
warn!(
"Compaction for region {} requires {} bytes but limit is {} bytes; cannot satisfy request",
region_id, requested_bytes, limit_bytes
);
return Err(CompactionMemoryExhaustedSnafu {
let _timer = COMPACTION_MEMORY_WAIT.start_timer();
self.memory_manager
.acquire_with_policy(requested_bytes, policy)
.await
.context(CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: "exceed_limit".to_string(),
}
.build());
}
match self.memory_policy {
OnExhaustedPolicy::Wait {
timeout: wait_timeout,
} => {
let timer = COMPACTION_MEMORY_WAIT.start_timer();
match tokio::time::timeout(
wait_timeout,
self.memory_manager.acquire(requested_bytes),
)
.await
{
Ok(Ok(guard)) => {
timer.observe_duration();
Ok(guard)
}
Ok(Err(e)) => {
timer.observe_duration();
Err(e).with_context(|_| MemoryAcquireFailedSnafu {
region_id,
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
})
}
Err(_) => {
timer.observe_duration();
warn!(
"Compaction for region {} waited {:?} for {} bytes but timed out",
region_id, wait_timeout, requested_bytes
);
CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
}
.fail()
}
}
}
OnExhaustedPolicy::Fail => {
// Try to acquire, fail immediately if not available
self.memory_manager
.try_acquire(requested_bytes)
.ok_or_else(|| {
warn!(
"Compaction memory exhausted for region {} (policy=fail, need {} bytes, limit {} bytes)",
region_id, requested_bytes, limit_bytes
);
CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: "fail".to_string(),
}
.build()
})
}
}
policy: format!("{policy:?}"),
})
}
/// Remove expired ssts files, update manifest immediately

View File

@@ -1042,20 +1042,8 @@ pub enum Error {
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
#[snafu(display(
"Compaction memory limit exceeded for region {region_id}: required {required_bytes} bytes, limit {limit_bytes} bytes (policy: {policy})",
))]
#[snafu(display("Compaction memory exhausted for region {region_id} (policy: {policy})",))]
CompactionMemoryExhausted {
region_id: RegionId,
required_bytes: u64,
limit_bytes: u64,
policy: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to acquire memory for region {region_id} (policy: {policy})"))]
MemoryAcquireFailed {
region_id: RegionId,
policy: String,
#[snafu(source)]
@@ -1359,9 +1347,7 @@ impl ErrorExt for Error {
ManualCompactionOverride {} => StatusCode::Cancelled,
CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted,
MemoryAcquireFailed { source, .. } => source.status_code(),
CompactionMemoryExhausted { source, .. } => source.status_code(),
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,

View File

@@ -540,7 +540,7 @@ impl LocalGcWorker {
fn filter_deletable_files(
&self,
entries: Vec<Entry>,
in_use_filenames: &HashSet<&FileId>,
in_use_filenames: &HashSet<FileId>,
may_linger_filenames: &HashSet<&FileId>,
eligible_for_removal: &HashSet<&FileId>,
unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
@@ -641,9 +641,6 @@ impl LocalGcWorker {
.flatten()
.collect::<HashSet<_>>();
// in use filenames, include sst and index files
let in_use_filenames = in_used.iter().collect::<HashSet<_>>();
// When full_file_listing is false, skip expensive list operations and only delete
// files that are tracked in recently_removed_files
if !self.full_file_listing {
@@ -653,7 +650,7 @@ impl LocalGcWorker {
// 3. Have passed the lingering time
let files_to_delete: Vec<FileId> = eligible_for_removal
.iter()
.filter(|file_id| !in_use_filenames.contains(*file_id))
.filter(|file_id| !in_used.contains(*file_id))
.map(|&f| *f)
.collect();
@@ -672,7 +669,7 @@ impl LocalGcWorker {
let (all_unused_files_ready_for_delete, all_in_exist_linger_files) = self
.filter_deletable_files(
all_entries,
&in_use_filenames,
in_used,
&may_linger_filenames,
&eligible_for_removal,
unknown_file_may_linger_until,

View File

@@ -117,6 +117,11 @@ pub struct S3Connection {
/// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
/// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
pub enable_virtual_host_style: bool,
/// Disable EC2 metadata service.
/// By default, opendal will use EC2 metadata service to load credentials from the instance metadata,
/// when access key id and secret access key are not provided.
/// If enabled, opendal will *NOT* use EC2 metadata service.
pub disable_ec2_metadata: bool,
}
impl From<&S3Connection> for S3 {
@@ -129,6 +134,10 @@ impl From<&S3Connection> for S3 {
.access_key_id(connection.access_key_id.expose_secret())
.secret_access_key(connection.secret_access_key.expose_secret());
if connection.disable_ec2_metadata {
builder = builder.disable_ec2_metadata();
}
if let Some(endpoint) = &connection.endpoint {
builder = builder.endpoint(endpoint);
}

View File

@@ -26,7 +26,7 @@ use arrow::datatypes::{Float64Type, TimestampMillisecondType};
use common_grpc::precision::Precision;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_recordbatch::{RecordBatch, RecordBatches};
use common_telemetry::tracing;
use common_telemetry::{tracing, warn};
use datafusion::dataframe::DataFrame;
use datafusion::prelude::{Expr, col, lit, regexp_match};
use datafusion_common::ScalarValue;
@@ -415,6 +415,10 @@ pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertR
table_data.add_row(one_row);
}
}
if !series.histograms.is_empty() {
warn!("Native histograms are not supported yet, data ignored");
}
}
Ok(multi_table_data.into_row_insert_requests())

View File

@@ -362,13 +362,13 @@ mod tests {
cert_path: "/path/to/cert_path".to_string(),
key_path: "/path/to/key_path".to_string(),
ca_cert_path: String::new(),
watch: false
watch: false,
},
TlsOption::new(
Some(Disable),
Some("/path/to/cert_path".to_string()),
Some("/path/to/key_path".to_string()),
false
false,
)
);
}

View File

@@ -259,9 +259,8 @@ impl GreptimeDbStandaloneBuilder {
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
.set_handler(weak_grpc_handler)
.await;
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
let invoker = flow::FrontendInvoker::build_from(

View File

@@ -1586,6 +1586,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"endpoint =",
"region =",
"enable_virtual_host_style =",
"disable_ec2_metadata =",
"cache_path =",
"cache_capacity =",
"memory_pool_size =",