Compare commits

...

6 Commits

Author SHA1 Message Date
jeremyhi
95eccd6cde feat: introduce granularity for memory manager (#7416)
* feat: introduce granularity for memory manager

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: add unit test

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: remove granularity getter for mamanger

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* Update src/common/memory-manager/src/manager.rs

Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>

* feat: acquire_with_policy for manager

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
Co-authored-by: Lei, HUANG <6406592+v0y4g3r@users.noreply.github.com>
2025-12-17 11:08:51 +00:00
fys
0bc5a305be chore: add wait_initialized method for frontend client (#7414)
* chore: add wait_initialized method for frontend client

* fix: some

* fix: cargo fmt

* add comment

* add unit test

* rename

* fix: cargo check

* fix: cr by copilot
2025-12-17 08:13:36 +00:00
discord9
1afcddd5a9 chore: feature gate vector_index (#7428)
Signed-off-by: discord9 <discord9@163.com>
2025-12-17 07:14:25 +00:00
shuiyisong
62808b887b fix: using anonymous s3 access when ak and sk is not provided (#7425)
* chore: allow s3 anon

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

* chore: disable ec2 metadata

Signed-off-by: shuiyisong <xixing.sys@gmail.com>

---------

Signed-off-by: shuiyisong <xixing.sys@gmail.com>
2025-12-17 06:34:29 +00:00
discord9
04ddd40e00 chore: bump version to beta.3 (#7423)
chore: bump to beta.3

Signed-off-by: discord9 <discord9@163.com>
2025-12-17 04:18:23 +00:00
liyang
b4f028be5f chore: change etcd endpoints to array in the test scripts (#7419)
chore: change etcd endpoint

Signed-off-by: liyang <daviderli614@gmail.com>
2025-12-17 03:14:35 +00:00
18 changed files with 481 additions and 235 deletions

View File

@@ -51,7 +51,7 @@ runs:
run: |
helm upgrade \
--install my-greptimedb \
--set meta.backendStorage.etcd.endpoints=${{ inputs.etcd-endpoints }} \
--set 'meta.backendStorage.etcd.endpoints[0]=${{ inputs.etcd-endpoints }}' \
--set meta.enableRegionFailover=${{ inputs.enable-region-failover }} \
--set image.registry=${{ inputs.image-registry }} \
--set image.repository=${{ inputs.image-repository }} \

View File

@@ -81,7 +81,7 @@ function deploy_greptimedb_cluster() {
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set "meta.backendStorage.etcd.endpoints[0]=etcd.$install_namespace.svc.cluster.local:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
-n "$install_namespace"
@@ -119,7 +119,7 @@ function deploy_greptimedb_cluster_with_s3_storage() {
--create-namespace \
--set image.tag="$GREPTIMEDB_IMAGE_TAG" \
--set initializer.tag="$GREPTIMEDB_INITIALIZER_IMAGE_TAG" \
--set meta.backendStorage.etcd.endpoints="etcd.$install_namespace:2379" \
--set "meta.backendStorage.etcd.endpoints[0]=etcd.$install_namespace.svc.cluster.local:2379" \
--set meta.backendStorage.etcd.storeKeyPrefix="$cluster_name" \
--set objectStorage.s3.bucket="$AWS_CI_TEST_BUCKET" \
--set objectStorage.s3.region="$AWS_REGION" \

152
Cargo.lock generated
View File

@@ -212,7 +212,7 @@ checksum = "d301b3b94cb4b2f23d7917810addbbaff90738e0ca2be692bd027e70d7e0330c"
[[package]]
name = "api"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow-schema",
"common-base",
@@ -733,7 +733,7 @@ dependencies = [
[[package]]
name = "auth"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -1383,7 +1383,7 @@ dependencies = [
[[package]]
name = "cache"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"catalog",
"common-error",
@@ -1418,7 +1418,7 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "catalog"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow",
@@ -1763,7 +1763,7 @@ checksum = "b94f61472cee1439c0b966b47e3aca9ae07e45d070759512cd390ea2bebc6675"
[[package]]
name = "cli"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-stream",
"async-trait",
@@ -1816,7 +1816,7 @@ dependencies = [
[[package]]
name = "client"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arc-swap",
@@ -1849,7 +1849,7 @@ dependencies = [
"snafu 0.8.6",
"store-api",
"substrait 0.37.3",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"tokio",
"tokio-stream",
"tonic 0.13.1",
@@ -1889,7 +1889,7 @@ dependencies = [
[[package]]
name = "cmd"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"auth",
@@ -2023,7 +2023,7 @@ checksum = "55b672471b4e9f9e95499ea597ff64941a309b2cdbffcc46f2cc5e2d971fd335"
[[package]]
name = "common-base"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anymap2",
"async-trait",
@@ -2047,14 +2047,14 @@ dependencies = [
[[package]]
name = "common-catalog"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"const_format",
]
[[package]]
name = "common-config"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-error",
@@ -2079,7 +2079,7 @@ dependencies = [
[[package]]
name = "common-datasource"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow",
"arrow-schema",
@@ -2114,7 +2114,7 @@ dependencies = [
[[package]]
name = "common-decimal"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"bigdecimal 0.4.8",
"common-error",
@@ -2127,7 +2127,7 @@ dependencies = [
[[package]]
name = "common-error"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-macro",
"http 1.3.1",
@@ -2138,7 +2138,7 @@ dependencies = [
[[package]]
name = "common-event-recorder"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -2160,7 +2160,7 @@ dependencies = [
[[package]]
name = "common-frontend"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -2182,7 +2182,7 @@ dependencies = [
[[package]]
name = "common-function"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -2242,7 +2242,7 @@ dependencies = [
[[package]]
name = "common-greptimedb-telemetry"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"common-runtime",
@@ -2259,7 +2259,7 @@ dependencies = [
[[package]]
name = "common-grpc"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-flight",
@@ -2294,7 +2294,7 @@ dependencies = [
[[package]]
name = "common-grpc-expr"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"common-base",
@@ -2314,7 +2314,7 @@ dependencies = [
[[package]]
name = "common-macro"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"greptime-proto",
"once_cell",
@@ -2325,7 +2325,7 @@ dependencies = [
[[package]]
name = "common-mem-prof"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anyhow",
"common-error",
@@ -2341,7 +2341,7 @@ dependencies = [
[[package]]
name = "common-memory-manager"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-error",
"common-macro",
@@ -2354,7 +2354,7 @@ dependencies = [
[[package]]
name = "common-meta"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anymap2",
"api",
@@ -2426,7 +2426,7 @@ dependencies = [
[[package]]
name = "common-options"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-grpc",
"humantime-serde",
@@ -2435,11 +2435,11 @@ dependencies = [
[[package]]
name = "common-plugins"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
[[package]]
name = "common-pprof"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-error",
"common-macro",
@@ -2451,7 +2451,7 @@ dependencies = [
[[package]]
name = "common-procedure"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-stream",
@@ -2480,7 +2480,7 @@ dependencies = [
[[package]]
name = "common-procedure-test"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"common-procedure",
@@ -2490,7 +2490,7 @@ dependencies = [
[[package]]
name = "common-query"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -2516,7 +2516,7 @@ dependencies = [
[[package]]
name = "common-recordbatch"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arc-swap",
"common-base",
@@ -2540,7 +2540,7 @@ dependencies = [
[[package]]
name = "common-runtime"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -2569,7 +2569,7 @@ dependencies = [
[[package]]
name = "common-session"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"serde",
"strum 0.27.1",
@@ -2577,7 +2577,7 @@ dependencies = [
[[package]]
name = "common-sql"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-decimal",
@@ -2595,7 +2595,7 @@ dependencies = [
[[package]]
name = "common-stat"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-runtime",
@@ -2610,7 +2610,7 @@ dependencies = [
[[package]]
name = "common-telemetry"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"backtrace",
"common-base",
@@ -2639,7 +2639,7 @@ dependencies = [
[[package]]
name = "common-test-util"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"client",
"common-grpc",
@@ -2652,7 +2652,7 @@ dependencies = [
[[package]]
name = "common-time"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow",
"chrono",
@@ -2670,7 +2670,7 @@ dependencies = [
[[package]]
name = "common-version"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"build-data",
"cargo-manifest",
@@ -2681,7 +2681,7 @@ dependencies = [
[[package]]
name = "common-wal"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-base",
"common-error",
@@ -2704,7 +2704,7 @@ dependencies = [
[[package]]
name = "common-workload"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"common-telemetry",
"serde",
@@ -4012,7 +4012,7 @@ dependencies = [
[[package]]
name = "datanode"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-flight",
@@ -4076,7 +4076,7 @@ dependencies = [
[[package]]
name = "datatypes"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arrow",
"arrow-array",
@@ -4750,7 +4750,7 @@ checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
[[package]]
name = "file-engine"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -4882,7 +4882,7 @@ checksum = "8bf7cc16383c4b8d58b9905a8509f02926ce3058053c056376248d958c9df1e8"
[[package]]
name = "flow"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow",
@@ -4951,7 +4951,7 @@ dependencies = [
"sql",
"store-api",
"strum 0.27.1",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tokio",
"tonic 0.13.1",
@@ -5012,7 +5012,7 @@ checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
[[package]]
name = "frontend"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arc-swap",
@@ -6227,7 +6227,7 @@ dependencies = [
[[package]]
name = "index"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"asynchronous-codec",
@@ -7168,7 +7168,7 @@ checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "log-query"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"chrono",
"common-error",
@@ -7180,7 +7180,7 @@ dependencies = [
[[package]]
name = "log-store"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-stream",
"async-trait",
@@ -7481,7 +7481,7 @@ dependencies = [
[[package]]
name = "meta-client"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -7509,7 +7509,7 @@ dependencies = [
[[package]]
name = "meta-srv"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -7609,7 +7609,7 @@ dependencies = [
[[package]]
name = "metric-engine"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"aquamarine",
@@ -7706,7 +7706,7 @@ dependencies = [
[[package]]
name = "mito-codec"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"bytes",
@@ -7731,7 +7731,7 @@ dependencies = [
[[package]]
name = "mito2"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"aquamarine",
@@ -8471,7 +8471,7 @@ dependencies = [
[[package]]
name = "object-store"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"anyhow",
"bytes",
@@ -8756,7 +8756,7 @@ dependencies = [
[[package]]
name = "operator"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -8816,7 +8816,7 @@ dependencies = [
"sql",
"sqlparser",
"store-api",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tokio",
"tokio-util",
@@ -9102,7 +9102,7 @@ dependencies = [
[[package]]
name = "partition"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -9459,7 +9459,7 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pipeline"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -9615,7 +9615,7 @@ dependencies = [
[[package]]
name = "plugins"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"auth",
"catalog",
@@ -9917,7 +9917,7 @@ dependencies = [
[[package]]
name = "promql"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"async-trait",
@@ -10200,7 +10200,7 @@ dependencies = [
[[package]]
name = "puffin"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-compression 0.4.19",
"async-trait",
@@ -10242,7 +10242,7 @@ dependencies = [
[[package]]
name = "query"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -10309,7 +10309,7 @@ dependencies = [
"sql",
"sqlparser",
"store-api",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tokio",
"tokio-stream",
@@ -11651,7 +11651,7 @@ dependencies = [
[[package]]
name = "servers"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -11779,7 +11779,7 @@ dependencies = [
[[package]]
name = "session"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"ahash 0.8.12",
"api",
@@ -12113,7 +12113,7 @@ dependencies = [
[[package]]
name = "sql"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-buffer",
@@ -12173,7 +12173,7 @@ dependencies = [
[[package]]
name = "sqlness-runner"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"clap 4.5.40",
@@ -12450,7 +12450,7 @@ dependencies = [
[[package]]
name = "standalone"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"catalog",
@@ -12491,7 +12491,7 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "store-api"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"aquamarine",
@@ -12704,7 +12704,7 @@ dependencies = [
[[package]]
name = "substrait"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"async-trait",
"bytes",
@@ -12827,7 +12827,7 @@ dependencies = [
[[package]]
name = "table"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"async-trait",
@@ -13096,7 +13096,7 @@ checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
[[package]]
name = "tests-fuzz"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"arbitrary",
"async-trait",
@@ -13140,7 +13140,7 @@ dependencies = [
[[package]]
name = "tests-integration"
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
dependencies = [
"api",
"arrow-flight",
@@ -13215,7 +13215,7 @@ dependencies = [
"sqlx",
"standalone",
"store-api",
"substrait 1.0.0-beta.2",
"substrait 1.0.0-beta.3",
"table",
"tempfile",
"time",

View File

@@ -75,7 +75,7 @@ members = [
resolver = "2"
[workspace.package]
version = "1.0.0-beta.2"
version = "1.0.0-beta.3"
edition = "2024"
license = "Apache-2.0"

View File

@@ -552,9 +552,8 @@ impl StartCommand {
let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
.set_handler(weak_grpc_handler)
.await;
// set the frontend invoker for flownode
let flow_streaming_engine = flownode.flow_engine().streaming_engine();

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::time::Duration;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
@@ -35,6 +36,14 @@ pub enum Error {
#[snafu(display("Memory semaphore unexpectedly closed"))]
MemorySemaphoreClosed,
#[snafu(display(
"Timeout waiting for memory quota: requested {requested_bytes} bytes, waited {waited:?}"
))]
MemoryAcquireTimeout {
requested_bytes: u64,
waited: Duration,
},
}
impl ErrorExt for Error {
@@ -44,6 +53,7 @@ impl ErrorExt for Error {
match self {
MemoryLimitExceeded { .. } => StatusCode::RuntimeResourcesExhausted,
MemorySemaphoreClosed => StatusCode::Unexpected,
MemoryAcquireTimeout { .. } => StatusCode::RuntimeResourcesExhausted,
}
}

View File

@@ -0,0 +1,168 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
/// Memory permit granularity for different use cases.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum PermitGranularity {
/// 1 KB per permit
///
/// Use for:
/// - HTTP/gRPC request limiting (small, high-concurrency operations)
/// - Small batch operations
/// - Scenarios requiring fine-grained fairness
Kilobyte,
/// 1 MB per permit (default)
///
/// Use for:
/// - Query execution memory management
/// - Compaction memory control
/// - Large, long-running operations
#[default]
Megabyte,
}
impl PermitGranularity {
/// Returns the number of bytes per permit.
#[inline]
pub const fn bytes(self) -> u64 {
match self {
Self::Kilobyte => 1024,
Self::Megabyte => 1024 * 1024,
}
}
/// Returns a human-readable string representation.
pub const fn as_str(self) -> &'static str {
match self {
Self::Kilobyte => "1KB",
Self::Megabyte => "1MB",
}
}
/// Converts bytes to permits based on this granularity.
///
/// Rounds up to ensure the requested bytes are fully covered.
/// Clamped to Semaphore::MAX_PERMITS.
#[inline]
pub fn bytes_to_permits(self, bytes: u64) -> u32 {
use tokio::sync::Semaphore;
let granularity_bytes = self.bytes();
bytes
.saturating_add(granularity_bytes - 1)
.saturating_div(granularity_bytes)
.min(Semaphore::MAX_PERMITS as u64)
.min(u32::MAX as u64) as u32
}
/// Converts permits to bytes based on this granularity.
#[inline]
pub fn permits_to_bytes(self, permits: u32) -> u64 {
(permits as u64).saturating_mul(self.bytes())
}
}
impl fmt::Display for PermitGranularity {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.as_str())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bytes_to_permits_kilobyte() {
let granularity = PermitGranularity::Kilobyte;
// Exact multiples
assert_eq!(granularity.bytes_to_permits(1024), 1);
assert_eq!(granularity.bytes_to_permits(2048), 2);
assert_eq!(granularity.bytes_to_permits(10 * 1024), 10);
// Rounds up
assert_eq!(granularity.bytes_to_permits(1), 1);
assert_eq!(granularity.bytes_to_permits(1025), 2);
assert_eq!(granularity.bytes_to_permits(2047), 2);
}
#[test]
fn test_bytes_to_permits_megabyte() {
let granularity = PermitGranularity::Megabyte;
// Exact multiples
assert_eq!(granularity.bytes_to_permits(1024 * 1024), 1);
assert_eq!(granularity.bytes_to_permits(2 * 1024 * 1024), 2);
// Rounds up
assert_eq!(granularity.bytes_to_permits(1), 1);
assert_eq!(granularity.bytes_to_permits(1024), 1);
assert_eq!(granularity.bytes_to_permits(1024 * 1024 + 1), 2);
}
#[test]
fn test_bytes_to_permits_zero_bytes() {
assert_eq!(PermitGranularity::Kilobyte.bytes_to_permits(0), 0);
assert_eq!(PermitGranularity::Megabyte.bytes_to_permits(0), 0);
}
#[test]
fn test_bytes_to_permits_clamps_to_maximum() {
use tokio::sync::Semaphore;
let max_permits = (Semaphore::MAX_PERMITS as u64).min(u32::MAX as u64) as u32;
assert_eq!(
PermitGranularity::Kilobyte.bytes_to_permits(u64::MAX),
max_permits
);
assert_eq!(
PermitGranularity::Megabyte.bytes_to_permits(u64::MAX),
max_permits
);
}
#[test]
fn test_permits_to_bytes() {
assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(1), 1024);
assert_eq!(PermitGranularity::Kilobyte.permits_to_bytes(10), 10 * 1024);
assert_eq!(PermitGranularity::Megabyte.permits_to_bytes(1), 1024 * 1024);
assert_eq!(
PermitGranularity::Megabyte.permits_to_bytes(10),
10 * 1024 * 1024
);
}
#[test]
fn test_round_trip_conversion() {
// Kilobyte: bytes -> permits -> bytes (should round up)
let kb = PermitGranularity::Kilobyte;
let permits = kb.bytes_to_permits(1500);
let bytes = kb.permits_to_bytes(permits);
assert!(bytes >= 1500); // Must cover original request
assert_eq!(bytes, 2048); // 2KB
// Megabyte: bytes -> permits -> bytes (should round up)
let mb = PermitGranularity::Megabyte;
let permits = mb.bytes_to_permits(1500);
let bytes = mb.permits_to_bytes(permits);
assert!(bytes >= 1500);
assert_eq!(bytes, 1024 * 1024); // 1MB
}
}

View File

@@ -17,7 +17,7 @@ use std::{fmt, mem};
use common_telemetry::debug;
use tokio::sync::{OwnedSemaphorePermit, TryAcquireError};
use crate::manager::{MemoryMetrics, MemoryQuota, bytes_to_permits, permits_to_bytes};
use crate::manager::{MemoryMetrics, MemoryQuota};
/// Guard representing a slice of reserved memory.
pub struct MemoryGuard<M: MemoryMetrics> {
@@ -49,7 +49,9 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
pub fn granted_bytes(&self) -> u64 {
match &self.state {
GuardState::Unlimited => 0,
GuardState::Limited { permit, .. } => permits_to_bytes(permit.num_permits() as u32),
GuardState::Limited { permit, quota } => {
quota.permits_to_bytes(permit.num_permits() as u32)
}
}
}
@@ -65,7 +67,7 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
return true;
}
let additional_permits = bytes_to_permits(bytes);
let additional_permits = quota.bytes_to_permits(bytes);
match quota
.semaphore
@@ -99,11 +101,12 @@ impl<M: MemoryMetrics> MemoryGuard<M> {
return true;
}
let release_permits = bytes_to_permits(bytes);
let release_permits = quota.bytes_to_permits(bytes);
match permit.split(release_permits as usize) {
Some(released_permit) => {
let released_bytes = permits_to_bytes(released_permit.num_permits() as u32);
let released_bytes =
quota.permits_to_bytes(released_permit.num_permits() as u32);
drop(released_permit);
quota.update_in_use_metric();
debug!("Early released {} bytes from memory guard", released_bytes);
@@ -121,7 +124,7 @@ impl<M: MemoryMetrics> Drop for MemoryGuard<M> {
if let GuardState::Limited { permit, quota } =
mem::replace(&mut self.state, GuardState::Unlimited)
{
let bytes = permits_to_bytes(permit.num_permits() as u32);
let bytes = quota.permits_to_bytes(permit.num_permits() as u32);
drop(permit);
quota.update_in_use_metric();
debug!("Released memory: {} bytes", bytes);

View File

@@ -19,6 +19,7 @@
//! share the same allocation logic while using their own metrics.
mod error;
mod granularity;
mod guard;
mod manager;
mod policy;
@@ -27,8 +28,9 @@ mod policy;
mod tests;
pub use error::{Error, Result};
pub use granularity::PermitGranularity;
pub use guard::MemoryGuard;
pub use manager::{MemoryManager, MemoryMetrics, PERMIT_GRANULARITY_BYTES};
pub use manager::{MemoryManager, MemoryMetrics};
pub use policy::{DEFAULT_MEMORY_WAIT_TIMEOUT, OnExhaustedPolicy};
/// No-op metrics implementation for testing.

View File

@@ -17,11 +17,12 @@ use std::sync::Arc;
use snafu::ensure;
use tokio::sync::{Semaphore, TryAcquireError};
use crate::error::{MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result};
use crate::error::{
MemoryAcquireTimeoutSnafu, MemoryLimitExceededSnafu, MemorySemaphoreClosedSnafu, Result,
};
use crate::granularity::PermitGranularity;
use crate::guard::MemoryGuard;
/// Minimum bytes controlled by one semaphore permit.
pub const PERMIT_GRANULARITY_BYTES: u64 = 1 << 20; // 1 MB
use crate::policy::OnExhaustedPolicy;
/// Trait for recording memory usage metrics.
pub trait MemoryMetrics: Clone + Send + Sync + 'static {
@@ -40,6 +41,7 @@ pub struct MemoryManager<M: MemoryMetrics> {
pub(crate) struct MemoryQuota<M: MemoryMetrics> {
pub(crate) semaphore: Arc<Semaphore>,
pub(crate) limit_permits: u32,
pub(crate) granularity: PermitGranularity,
pub(crate) metrics: M,
}
@@ -47,19 +49,25 @@ impl<M: MemoryMetrics> MemoryManager<M> {
/// Creates a new memory manager with the given limit in bytes.
/// `limit_bytes = 0` disables the limit.
pub fn new(limit_bytes: u64, metrics: M) -> Self {
Self::with_granularity(limit_bytes, PermitGranularity::default(), metrics)
}
/// Creates a new memory manager with specified granularity.
pub fn with_granularity(limit_bytes: u64, granularity: PermitGranularity, metrics: M) -> Self {
if limit_bytes == 0 {
metrics.set_limit(0);
return Self { quota: None };
}
let limit_permits = bytes_to_permits(limit_bytes);
let limit_aligned_bytes = permits_to_bytes(limit_permits);
let limit_permits = granularity.bytes_to_permits(limit_bytes);
let limit_aligned_bytes = granularity.permits_to_bytes(limit_permits);
metrics.set_limit(limit_aligned_bytes as i64);
Self {
quota: Some(MemoryQuota {
semaphore: Arc::new(Semaphore::new(limit_permits as usize)),
limit_permits,
granularity,
metrics,
}),
}
@@ -69,7 +77,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn limit_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.limit_permits))
.map(|quota| quota.permits_to_bytes(quota.limit_permits))
.unwrap_or(0)
}
@@ -77,7 +85,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn used_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.used_permits()))
.map(|quota| quota.permits_to_bytes(quota.used_permits()))
.unwrap_or(0)
}
@@ -85,7 +93,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
pub fn available_bytes(&self) -> u64 {
self.quota
.as_ref()
.map(|quota| permits_to_bytes(quota.available_permits_clamped()))
.map(|quota| quota.permits_to_bytes(quota.available_permits_clamped()))
.unwrap_or(0)
}
@@ -98,13 +106,13 @@ impl<M: MemoryMetrics> MemoryManager<M> {
match &self.quota {
None => Ok(MemoryGuard::unlimited()),
Some(quota) => {
let permits = bytes_to_permits(bytes);
let permits = quota.bytes_to_permits(bytes);
ensure!(
permits <= quota.limit_permits,
MemoryLimitExceededSnafu {
requested_bytes: bytes,
limit_bytes: permits_to_bytes(quota.limit_permits),
limit_bytes: self.limit_bytes()
}
);
@@ -125,7 +133,7 @@ impl<M: MemoryMetrics> MemoryManager<M> {
match &self.quota {
None => Some(MemoryGuard::unlimited()),
Some(quota) => {
let permits = bytes_to_permits(bytes);
let permits = quota.bytes_to_permits(bytes);
match quota.semaphore.clone().try_acquire_many_owned(permits) {
Ok(permit) => {
@@ -140,9 +148,56 @@ impl<M: MemoryMetrics> MemoryManager<M> {
}
}
}
/// Acquires memory based on the given policy.
///
/// - For `OnExhaustedPolicy::Wait`: Waits up to the timeout duration for memory to become available
/// - For `OnExhaustedPolicy::Fail`: Returns immediately if memory is not available
///
/// # Errors
/// - `MemoryLimitExceeded`: Requested bytes exceed the total limit (both policies), or memory is currently exhausted (Fail policy only)
/// - `MemoryAcquireTimeout`: Timeout elapsed while waiting for memory (Wait policy only)
/// - `MemorySemaphoreClosed`: The internal semaphore is unexpectedly closed (rare, indicates system issue)
pub async fn acquire_with_policy(
&self,
bytes: u64,
policy: OnExhaustedPolicy,
) -> Result<MemoryGuard<M>> {
match policy {
OnExhaustedPolicy::Wait { timeout } => {
match tokio::time::timeout(timeout, self.acquire(bytes)).await {
Ok(Ok(guard)) => Ok(guard),
Ok(Err(e)) => Err(e),
Err(_elapsed) => {
// Timeout elapsed while waiting
MemoryAcquireTimeoutSnafu {
requested_bytes: bytes,
waited: timeout,
}
.fail()
}
}
}
OnExhaustedPolicy::Fail => self.try_acquire(bytes).ok_or_else(|| {
MemoryLimitExceededSnafu {
requested_bytes: bytes,
limit_bytes: self.limit_bytes(),
}
.build()
}),
}
}
}
impl<M: MemoryMetrics> MemoryQuota<M> {
pub(crate) fn bytes_to_permits(&self, bytes: u64) -> u32 {
self.granularity.bytes_to_permits(bytes)
}
pub(crate) fn permits_to_bytes(&self, permits: u32) -> u64 {
self.granularity.permits_to_bytes(permits)
}
pub(crate) fn used_permits(&self) -> u32 {
self.limit_permits
.saturating_sub(self.available_permits_clamped())
@@ -155,19 +210,7 @@ impl<M: MemoryMetrics> MemoryQuota<M> {
}
pub(crate) fn update_in_use_metric(&self) {
let bytes = permits_to_bytes(self.used_permits());
let bytes = self.permits_to_bytes(self.used_permits());
self.metrics.set_in_use(bytes as i64);
}
}
pub(crate) fn bytes_to_permits(bytes: u64) -> u32 {
bytes
.saturating_add(PERMIT_GRANULARITY_BYTES - 1)
.saturating_div(PERMIT_GRANULARITY_BYTES)
.min(Semaphore::MAX_PERMITS as u64)
.min(u32::MAX as u64) as u32
}
pub(crate) fn permits_to_bytes(permits: u32) -> u64 {
(permits as u64).saturating_mul(PERMIT_GRANULARITY_BYTES)
}

View File

@@ -14,7 +14,10 @@
use tokio::time::{Duration, sleep};
use crate::{MemoryManager, NoOpMetrics, PERMIT_GRANULARITY_BYTES};
use crate::{MemoryManager, NoOpMetrics, PermitGranularity};
// Helper constant for tests - use default Megabyte granularity
const PERMIT_GRANULARITY_BYTES: u64 = PermitGranularity::Megabyte.bytes();
#[test]
fn test_try_acquire_unlimited() {

View File

@@ -15,7 +15,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::sync::{Arc, Mutex, Weak};
use std::time::SystemTime;
use api::v1::greptime_request::Request;
@@ -38,6 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use session::hints::READ_PREFERENCE_HINT;
use snafu::{OptionExt, ResultExt};
use tokio::sync::SetOnce;
use crate::batching_mode::BatchingModeOptions;
use crate::error::{
@@ -75,7 +76,19 @@ impl<E: ErrorExt + Send + Sync + 'static, T: GrpcQueryHandler<Error = E> + Send
}
}
type HandlerMutable = Arc<std::sync::Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>;
#[derive(Debug, Clone)]
pub struct HandlerMutable {
handler: Arc<Mutex<Option<Weak<dyn GrpcQueryHandlerWithBoxedError>>>>,
is_initialized: Arc<SetOnce<()>>,
}
impl HandlerMutable {
pub async fn set_handler(&self, handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) {
*self.handler.lock().unwrap() = Some(handler);
// Ignore the error, as we allow the handler to be set multiple times.
let _ = self.is_initialized.set(());
}
}
/// A simple frontend client able to execute sql using grpc protocol
///
@@ -100,7 +113,11 @@ pub enum FrontendClient {
impl FrontendClient {
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None));
let is_initialized = Arc::new(SetOnce::new());
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(None)),
is_initialized,
};
(
Self::Standalone {
database_client: handler.clone(),
@@ -110,23 +127,13 @@ impl FrontendClient {
)
}
/// Check if the frontend client is initialized.
///
/// In distributed mode, it is always initialized.
/// In standalone mode, it checks if the database client is set.
pub fn is_initialized(&self) -> bool {
match self {
FrontendClient::Distributed { .. } => true,
FrontendClient::Standalone {
database_client, ..
} => {
let guard = database_client.lock();
if let Ok(guard) = guard {
guard.is_some()
} else {
false
}
}
/// Waits until the frontend client is initialized.
pub async fn wait_initialized(&self) {
if let FrontendClient::Standalone {
database_client, ..
} = self
{
database_client.is_initialized.wait().await;
}
}
@@ -158,8 +165,14 @@ impl FrontendClient {
grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
query: QueryOptions,
) -> Self {
let is_initialized = Arc::new(SetOnce::new_with(Some(())));
let handler = HandlerMutable {
handler: Arc::new(Mutex::new(Some(grpc_handler))),
is_initialized: is_initialized.clone(),
};
Self::Standalone {
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
database_client: handler,
query,
}
}
@@ -341,6 +354,7 @@ impl FrontendClient {
{
let database_client = {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
@@ -418,6 +432,7 @@ impl FrontendClient {
{
let database_client = {
database_client
.handler
.lock()
.map_err(|e| {
UnexpectedSnafu {
@@ -480,3 +495,73 @@ impl std::fmt::Display for PeerDesc {
}
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use common_query::Output;
use tokio::time::timeout;
use super::*;
#[derive(Debug)]
struct NoopHandler;
#[async_trait::async_trait]
impl GrpcQueryHandlerWithBoxedError for NoopHandler {
async fn do_query(
&self,
_query: Request,
_ctx: QueryContextRef,
) -> std::result::Result<Output, BoxedError> {
Ok(Output::new_with_affected_rows(0))
}
}
#[tokio::test]
async fn wait_initialized() {
let (client, handler_mut) =
FrontendClient::from_empty_grpc_handler(QueryOptions::default());
assert!(
timeout(Duration::from_millis(50), client.wait_initialized())
.await
.is_err()
);
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
handler_mut.set_handler(Arc::downgrade(&handler)).await;
timeout(Duration::from_secs(1), client.wait_initialized())
.await
.expect("wait_initialized should complete after handler is set");
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.expect("wait_initialized should be a no-op once initialized");
let handler: Arc<dyn GrpcQueryHandlerWithBoxedError> = Arc::new(NoopHandler);
let client =
FrontendClient::from_grpc_handler(Arc::downgrade(&handler), QueryOptions::default());
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
let meta_client = Arc::new(MetaClient::default());
let client = FrontendClient::from_meta_client(
meta_client,
None,
QueryOptions::default(),
BatchingModeOptions::default(),
)
.unwrap();
assert!(
timeout(Duration::from_millis(10), client.wait_initialized())
.await
.is_ok()
);
}
}

View File

@@ -7,6 +7,9 @@ license.workspace = true
[lints]
workspace = true
[features]
vector_index = ["dep:usearch"]
[dependencies]
async-trait.workspace = true
asynchronous-codec = "0.7.0"
@@ -41,7 +44,7 @@ tantivy = { version = "0.24", features = ["zstd-compression"] }
tantivy-jieba = "0.16"
tokio.workspace = true
tokio-util.workspace = true
usearch = { version = "2.21", default-features = false, features = ["fp16lib"] }
usearch = { version = "2.21", default-features = false, features = ["fp16lib"], optional = true }
uuid.workspace = true
[dev-dependencies]

View File

@@ -22,6 +22,7 @@ pub mod external_provider;
pub mod fulltext_index;
pub mod inverted_index;
pub mod target;
#[cfg(feature = "vector_index")]
pub mod vector;
pub type Bytes = Vec<u8>;

View File

@@ -25,7 +25,7 @@ use tokio::sync::mpsc;
use crate::compaction::compactor::{CompactionRegion, Compactor};
use crate::compaction::memory_manager::{CompactionMemoryGuard, CompactionMemoryManager};
use crate::compaction::picker::{CompactionTask, PickerOutput};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu, MemoryAcquireFailedSnafu};
use crate::error::{CompactRegionSnafu, CompactionMemoryExhaustedSnafu};
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_MEMORY_WAIT, COMPACTION_STAGE_ELAPSED};
use crate::region::RegionRoleState;
@@ -95,80 +95,16 @@ impl CompactionTaskImpl {
async fn acquire_memory_with_policy(&self) -> error::Result<CompactionMemoryGuard> {
let region_id = self.compaction_region.region_id;
let requested_bytes = self.estimated_memory_bytes;
let limit_bytes = self.memory_manager.limit_bytes();
let policy = self.memory_policy;
if limit_bytes > 0 && requested_bytes > limit_bytes {
warn!(
"Compaction for region {} requires {} bytes but limit is {} bytes; cannot satisfy request",
region_id, requested_bytes, limit_bytes
);
return Err(CompactionMemoryExhaustedSnafu {
let _timer = COMPACTION_MEMORY_WAIT.start_timer();
self.memory_manager
.acquire_with_policy(requested_bytes, policy)
.await
.context(CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: "exceed_limit".to_string(),
}
.build());
}
match self.memory_policy {
OnExhaustedPolicy::Wait {
timeout: wait_timeout,
} => {
let timer = COMPACTION_MEMORY_WAIT.start_timer();
match tokio::time::timeout(
wait_timeout,
self.memory_manager.acquire(requested_bytes),
)
.await
{
Ok(Ok(guard)) => {
timer.observe_duration();
Ok(guard)
}
Ok(Err(e)) => {
timer.observe_duration();
Err(e).with_context(|_| MemoryAcquireFailedSnafu {
region_id,
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
})
}
Err(_) => {
timer.observe_duration();
warn!(
"Compaction for region {} waited {:?} for {} bytes but timed out",
region_id, wait_timeout, requested_bytes
);
CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: format!("wait_timeout({}ms)", wait_timeout.as_millis()),
}
.fail()
}
}
}
OnExhaustedPolicy::Fail => {
// Try to acquire, fail immediately if not available
self.memory_manager
.try_acquire(requested_bytes)
.ok_or_else(|| {
warn!(
"Compaction memory exhausted for region {} (policy=fail, need {} bytes, limit {} bytes)",
region_id, requested_bytes, limit_bytes
);
CompactionMemoryExhaustedSnafu {
region_id,
required_bytes: requested_bytes,
limit_bytes,
policy: "fail".to_string(),
}
.build()
})
}
}
policy: format!("{policy:?}"),
})
}
/// Remove expired ssts files, update manifest immediately

View File

@@ -1042,20 +1042,8 @@ pub enum Error {
#[snafu(display("Manual compaction is override by following operations."))]
ManualCompactionOverride {},
#[snafu(display(
"Compaction memory limit exceeded for region {region_id}: required {required_bytes} bytes, limit {limit_bytes} bytes (policy: {policy})",
))]
#[snafu(display("Compaction memory exhausted for region {region_id} (policy: {policy})",))]
CompactionMemoryExhausted {
region_id: RegionId,
required_bytes: u64,
limit_bytes: u64,
policy: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to acquire memory for region {region_id} (policy: {policy})"))]
MemoryAcquireFailed {
region_id: RegionId,
policy: String,
#[snafu(source)]
@@ -1359,9 +1347,7 @@ impl ErrorExt for Error {
ManualCompactionOverride {} => StatusCode::Cancelled,
CompactionMemoryExhausted { .. } => StatusCode::RuntimeResourcesExhausted,
MemoryAcquireFailed { source, .. } => source.status_code(),
CompactionMemoryExhausted { source, .. } => source.status_code(),
IncompatibleWalProviderChange { .. } => StatusCode::InvalidArguments,

View File

@@ -16,6 +16,7 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString};
use common_telemetry::tracing::warn;
use opendal::services::{Azblob, Gcs, Oss, S3};
use serde::{Deserialize, Serialize};
@@ -123,11 +124,18 @@ impl From<&S3Connection> for S3 {
fn from(connection: &S3Connection) -> Self {
let root = util::normalize_dir(&connection.root);
let mut builder = S3::default()
.root(&root)
.bucket(&connection.bucket)
.access_key_id(connection.access_key_id.expose_secret())
.secret_access_key(connection.secret_access_key.expose_secret());
let mut builder = S3::default().root(&root).bucket(&connection.bucket);
if !connection.access_key_id.expose_secret().is_empty()
&& !connection.secret_access_key.expose_secret().is_empty()
{
builder = builder
.access_key_id(connection.access_key_id.expose_secret())
.secret_access_key(connection.secret_access_key.expose_secret());
} else {
warn!("No access key id or secret access key provided, using anonymous access");
builder = builder.allow_anonymous().disable_ec2_metadata();
}
if let Some(endpoint) = &connection.endpoint {
builder = builder.endpoint(endpoint);

View File

@@ -259,9 +259,8 @@ impl GreptimeDbStandaloneBuilder {
let grpc_handler = instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
let weak_grpc_handler = Arc::downgrade(&grpc_handler);
frontend_instance_handler
.lock()
.unwrap()
.replace(weak_grpc_handler);
.set_handler(weak_grpc_handler)
.await;
let flow_streaming_engine = flownode.flow_engine().streaming_engine();
let invoker = flow::FrontendInvoker::build_from(