feat!: memory limiter unification write path (#7437)

* feat: remove option max_in_flight_write_bytes

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* feat: replace RequestMemoryLimiter

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: add integration test

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: fix test

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* fix: by AI comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* refactor: global permit pool on writing

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

* chore: by ai comment

Signed-off-by: jeremyhi <fengjiachun@gmail.com>

---------

Signed-off-by: jeremyhi <fengjiachun@gmail.com>
This commit is contained in:
jeremyhi
2025-12-23 10:18:49 +08:00
committed by GitHub
parent a8b512dded
commit 6a6b34c709
41 changed files with 586 additions and 869 deletions

4
Cargo.lock generated
View File

@@ -5036,6 +5036,7 @@ dependencies = [
"common-function",
"common-grpc",
"common-macro",
"common-memory-manager",
"common-meta",
"common-options",
"common-procedure",
@@ -11690,6 +11691,7 @@ dependencies = [
"common-grpc",
"common-macro",
"common-mem-prof",
"common-memory-manager",
"common-meta",
"common-plugins",
"common-pprof",
@@ -12482,6 +12484,7 @@ dependencies = [
"common-config",
"common-error",
"common-macro",
"common-memory-manager",
"common-meta",
"common-options",
"common-procedure",
@@ -13184,6 +13187,7 @@ dependencies = [
"common-event-recorder",
"common-frontend",
"common-grpc",
"common-memory-manager",
"common-meta",
"common-procedure",
"common-query",

View File

@@ -14,11 +14,12 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.<br/>Options: "wait" (default, 10s timeout), "wait(<duration>)" (e.g., "wait(30s)"), "fail" |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited.<br/>NOTE: This setting affects scan_memory_limit's privileged tier allocation.<br/>When set, 70% of queries get privileged memory access (full scan_memory_limit).<br/>The remaining 30% get standard tier access (70% of scan_memory_limit). |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -26,14 +27,12 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.max_total_body_memory` | String | Unset | Maximum total memory for all concurrent HTTP request bodies.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_total_message_memory` | String | Unset | Maximum total memory for all concurrent gRPC request messages.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.<br/>The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.<br/>Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
@@ -227,7 +226,8 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted.<br/>Options: "wait" (default, 10s timeout), "wait(<duration>)" (e.g., "wait(30s)"), "fail" |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -238,7 +238,6 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit.<br/>The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.<br/>Set to 0 to disable limit. |
| `http.max_total_body_memory` | String | Unset | Maximum total memory for all concurrent HTTP request bodies.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default<br/>This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests.<br/>Available options:<br/>- strict: deny invalid UTF-8 strings (default).<br/>- lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD).<br/>- unchecked: do not valid strings. |
@@ -246,7 +245,6 @@
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host.<br/>If left empty or unset, the server will automatically use the IP address of the first network interface<br/>on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
| `grpc.max_total_message_memory` | String | Unset | Maximum total memory for all concurrent gRPC request messages.<br/>Set to 0 to disable the limit. Default: "0" (unlimited) |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options:<br/>- `none`: disable all compression<br/>- `transport`: only enable gRPC transport compression (zstd)<br/>- `arrow_ipc`: only enable Arrow IPC compression (lz4)<br/>- `all`: enable all compression.<br/>Default to `none` |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection.<br/>The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.<br/>Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |

View File

@@ -6,9 +6,15 @@ default_timezone = "UTC"
## @toml2docs:none-default
default_column_prefix = "greptime"
## The maximum in-flight write bytes.
## Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"
#+ max_in_flight_write_bytes = "1GB"
## Policy when write bytes quota is exhausted.
## Options: "wait" (default, 10s timeout), "wait(<duration>)" (e.g., "wait(30s)"), "fail"
## @toml2docs:none-default
#+ write_bytes_exhausted_policy = "wait"
## The runtime options.
#+ [runtime]
@@ -35,10 +41,6 @@ timeout = "0s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## Maximum total memory for all concurrent HTTP request bodies.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_body_memory = "1GB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
@@ -62,10 +64,6 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Maximum total memory for all concurrent gRPC request messages.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_message_memory = "1GB"
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)

View File

@@ -6,6 +6,16 @@ default_timezone = "UTC"
## @toml2docs:none-default
default_column_prefix = "greptime"
## Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "1GB"
## Policy when write bytes quota is exhausted.
## Options: "wait" (default, 10s timeout), "wait(<duration>)" (e.g., "wait(30s)"), "fail"
## @toml2docs:none-default
#+ write_bytes_exhausted_policy = "wait"
## Initialize all regions in the background during the startup.
## By default, it provides services after all regions have been initialized.
init_regions_in_background = false
@@ -22,10 +32,6 @@ max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
#+ enable_telemetry = true
## The maximum in-flight write bytes.
## @toml2docs:none-default
#+ max_in_flight_write_bytes = "500MB"
## The runtime options.
#+ [runtime]
## The number of threads to execute the runtime for global read operations.
@@ -43,10 +49,6 @@ timeout = "0s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
## Maximum total memory for all concurrent HTTP request bodies.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_body_memory = "1GB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
@@ -67,10 +69,6 @@ prom_validation_mode = "strict"
bind_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
## Maximum total memory for all concurrent gRPC request messages.
## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
#+ max_total_message_memory = "1GB"
## The maximum connection age for gRPC connection.
## The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
## Refer to https://grpc.io/docs/guides/keepalive/ for more details.

View File

@@ -37,6 +37,12 @@ pub struct MemoryManager<M: MemoryMetrics> {
quota: Option<MemoryQuota<M>>,
}
impl<M: MemoryMetrics + Default> Default for MemoryManager<M> {
fn default() -> Self {
Self::new(0, M::default())
}
}
#[derive(Clone)]
pub(crate) struct MemoryQuota<M: MemoryMetrics> {
pub(crate) semaphore: Arc<Semaphore>,

View File

@@ -490,7 +490,6 @@ impl<'a> FlownodeServiceBuilder<'a> {
let config = GrpcServerConfig {
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
max_total_message_memory: opts.grpc.max_total_message_memory.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
max_connection_age: opts.grpc.max_connection_age,
};

View File

@@ -32,6 +32,7 @@ common-frontend.workspace = true
common-function.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-memory-manager.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-procedure.workspace = true

View File

@@ -357,14 +357,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to acquire more permits from limiter"))]
AcquireLimiter {
#[snafu(source)]
error: tokio::sync::AcquireError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Service suspended"))]
Suspended {
#[snafu(implicit)]
@@ -449,8 +441,6 @@ impl ErrorExt for Error {
Error::StatementTimeout { .. } => StatusCode::Cancelled,
Error::AcquireLimiter { .. } => StatusCode::Internal,
Error::Suspended { .. } => StatusCode::Suspended,
}
}

View File

@@ -17,6 +17,7 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_config::config::Configurable;
use common_event_recorder::EventRecorderOptions;
use common_memory_manager::OnExhaustedPolicy;
use common_options::datanode::DatanodeClientOptions;
use common_options::memory::MemoryOptions;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
@@ -45,6 +46,12 @@ pub struct FrontendOptions {
pub default_timezone: Option<String>,
pub default_column_prefix: Option<String>,
pub heartbeat: HeartbeatOptions,
/// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
/// Set to 0 to disable the limit. Default: "0" (unlimited)
pub max_in_flight_write_bytes: ReadableSize,
/// Policy when write bytes quota is exhausted.
/// Options: "wait" (default, 10s), "wait(<duration>)", "fail"
pub write_bytes_exhausted_policy: OnExhaustedPolicy,
pub http: HttpOptions,
pub grpc: GrpcOptions,
/// The internal gRPC options for the frontend service.
@@ -63,7 +70,6 @@ pub struct FrontendOptions {
pub user_provider: Option<String>,
pub tracing: TracingOptions,
pub query: QueryOptions,
pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: SlowQueryOptions,
pub memory: MemoryOptions,
/// The event recorder options.
@@ -77,6 +83,8 @@ impl Default for FrontendOptions {
default_timezone: None,
default_column_prefix: None,
heartbeat: HeartbeatOptions::frontend_default(),
max_in_flight_write_bytes: ReadableSize(0),
write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
internal_grpc: None,
@@ -93,7 +101,6 @@ impl Default for FrontendOptions {
user_provider: None,
tracing: TracingOptions::default(),
query: QueryOptions::default(),
max_in_flight_write_bytes: None,
slow_query: SlowQueryOptions::default(),
memory: MemoryOptions::default(),
event_recorder: EventRecorderOptions::default(),

View File

@@ -97,7 +97,6 @@ use crate::error::{
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
StatementTimeoutSnafu, TableOperationSnafu,
};
use crate::limiter::LimiterRef;
use crate::stream_wrapper::CancellableStreamWrapper;
lazy_static! {
@@ -118,7 +117,6 @@ pub struct Instance {
deleter: DeleterRef,
table_metadata_manager: TableMetadataManagerRef,
event_recorder: Option<EventRecorderRef>,
limiter: Option<LimiterRef>,
process_manager: ProcessManagerRef,
slow_query_options: SlowQueryOptions,
suspend: Arc<AtomicBool>,

View File

@@ -49,7 +49,6 @@ use crate::events::EventHandlerImpl;
use crate::frontend::FrontendOptions;
use crate::instance::Instance;
use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::limiter::Limiter;
/// The frontend [`Instance`] builder.
pub struct FrontendBuilder {
@@ -248,14 +247,6 @@ impl FrontendBuilder {
self.options.event_recorder.ttl,
))));
// Create the limiter if the max_in_flight_write_bytes is set.
let limiter = self
.options
.max_in_flight_write_bytes
.map(|max_in_flight_write_bytes| {
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes() as usize))
});
Ok(Instance {
catalog_manager: self.catalog_manager,
pipeline_operator,
@@ -266,7 +257,6 @@ impl FrontendBuilder {
deleter,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
event_recorder: Some(event_recorder),
limiter,
process_manager,
otlp_metrics_table_legacy_cache: DashMap::new(),
slow_query_options: self.options.slow_query.clone(),

View File

@@ -71,12 +71,6 @@ impl GrpcQueryHandler for Instance {
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
.context(PermissionSnafu)?;
let _guard = if let Some(limiter) = &self.limiter {
Some(limiter.limit_request(&request).await?)
} else {
None
};
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => {

View File

@@ -22,7 +22,7 @@ use common_error::ext::BoxedError;
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use servers::error::{
AuthSnafu, CatalogSnafu, Error, OtherSnafu, TimestampOverflowSnafu, UnexpectedResultSnafu,
AuthSnafu, CatalogSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu,
};
use servers::influxdb::InfluxdbRequest;
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
@@ -59,18 +59,6 @@ impl InfluxdbLineProtocolHandler for Instance {
.post_lines_conversion(requests, ctx.clone())
.await?;
let _guard = if let Some(limiter) = &self.limiter {
Some(
limiter
.limit_row_inserts(&requests)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};
self.handle_influx_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)

View File

@@ -23,8 +23,7 @@ use datatypes::timestamp::TimestampNanosecond;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, OtherSnafu, PipelineSnafu,
Result as ServerResult,
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
@@ -124,18 +123,6 @@ impl Instance {
log: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
let _guard = if let Some(limiter) = &self.limiter {
Some(
limiter
.limit_row_inserts(&log)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};
self.inserter
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
.await
@@ -148,18 +135,6 @@ impl Instance {
rows: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult<Output> {
let _guard = if let Some(limiter) = &self.limiter {
Some(
limiter
.limit_row_inserts(&rows)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};
self.inserter
.handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
.await

View File

@@ -16,7 +16,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu};
use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu};
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
@@ -41,18 +41,6 @@ impl OpentsdbProtocolHandler for Instance {
let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
let _guard = if let Some(limiter) = &self.limiter {
Some(
limiter
.limit_row_inserts(&requests)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};
// OpenTSDB is single value.
let output = self
.handle_row_inserts(requests, ctx, true, true)

View File

@@ -24,7 +24,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
@@ -83,18 +83,6 @@ impl OpenTelemetryProtocolHandler for Instance {
ctx
};
let _guard = if let Some(limiter) = &self.limiter {
Some(
limiter
.limit_row_inserts(&requests)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};
// If the user uses the legacy path, it is by default without metric engine.
if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
self.handle_row_inserts(requests, ctx, false, false)
@@ -191,18 +179,6 @@ impl OpenTelemetryProtocolHandler for Instance {
)
.await?;
let _guard = if let Some(limiter) = &self.limiter {
Some(
limiter
.limit_ctx_req(&opt_req)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};
let mut outputs = vec![];
for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {

View File

@@ -175,18 +175,6 @@ impl PromStoreProtocolHandler for Instance {
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_write(&request, ctx.clone())?;
let _guard = if let Some(limiter) = &self.limiter {
Some(
limiter
.limit_row_inserts(&request)
.await
.map_err(BoxedError::new)
.context(error::OtherSnafu)?,
)
} else {
None
};
let output = if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)

View File

@@ -19,7 +19,6 @@ pub mod events;
pub mod frontend;
pub mod heartbeat;
pub mod instance;
pub(crate) mod limiter;
pub(crate) mod metrics;
pub mod server;
pub mod service_config;

View File

@@ -1,332 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::column::Values;
use api::v1::greptime_request::Request;
use api::v1::value::ValueData;
use api::v1::{
Decimal128, InsertRequests, IntervalMonthDayNano, JsonValue, RowInsertRequest,
RowInsertRequests, json_value,
};
use pipeline::ContextReq;
use snafu::ResultExt;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use crate::error::{AcquireLimiterSnafu, Result};
pub(crate) type LimiterRef = Arc<Limiter>;
/// A frontend request limiter that controls the total size of in-flight write
/// requests.
pub(crate) struct Limiter {
max_in_flight_write_bytes: usize,
byte_counter: Arc<Semaphore>,
}
impl Limiter {
pub fn new(max_in_flight_write_bytes: usize) -> Self {
Self {
byte_counter: Arc::new(Semaphore::new(max_in_flight_write_bytes)),
max_in_flight_write_bytes,
}
}
pub async fn limit_request(&self, request: &Request) -> Result<OwnedSemaphorePermit> {
let size = match request {
Request::Inserts(requests) => self.insert_requests_data_size(requests),
Request::RowInserts(requests) => {
self.rows_insert_requests_data_size(requests.inserts.iter())
}
_ => 0,
};
self.limit_in_flight_write_bytes(size).await
}
pub async fn limit_row_inserts(
&self,
requests: &RowInsertRequests,
) -> Result<OwnedSemaphorePermit> {
let size = self.rows_insert_requests_data_size(requests.inserts.iter());
self.limit_in_flight_write_bytes(size).await
}
pub async fn limit_ctx_req(&self, opt_req: &ContextReq) -> Result<OwnedSemaphorePermit> {
let size = self.rows_insert_requests_data_size(opt_req.ref_all_req());
self.limit_in_flight_write_bytes(size).await
}
/// Await until more inflight bytes are available
pub async fn limit_in_flight_write_bytes(&self, bytes: usize) -> Result<OwnedSemaphorePermit> {
self.byte_counter
.clone()
.acquire_many_owned(bytes as u32)
.await
.context(AcquireLimiterSnafu)
}
/// Returns the current in-flight write bytes.
#[allow(dead_code)]
pub fn in_flight_write_bytes(&self) -> usize {
self.max_in_flight_write_bytes - self.byte_counter.available_permits()
}
fn insert_requests_data_size(&self, request: &InsertRequests) -> usize {
let mut size: usize = 0;
for insert in &request.inserts {
for column in &insert.columns {
if let Some(values) = &column.values {
size += Self::size_of_column_values(values);
}
}
}
size
}
fn rows_insert_requests_data_size<'a>(
&self,
inserts: impl Iterator<Item = &'a RowInsertRequest>,
) -> usize {
let mut size: usize = 0;
for insert in inserts {
if let Some(rows) = &insert.rows {
for row in &rows.rows {
for value in &row.values {
if let Some(value) = &value.value_data {
size += Self::size_of_value_data(value);
}
}
}
}
}
size
}
fn size_of_column_values(values: &Values) -> usize {
let mut size: usize = 0;
size += values.i8_values.len() * size_of::<i32>();
size += values.i16_values.len() * size_of::<i32>();
size += values.i32_values.len() * size_of::<i32>();
size += values.i64_values.len() * size_of::<i64>();
size += values.u8_values.len() * size_of::<u32>();
size += values.u16_values.len() * size_of::<u32>();
size += values.u32_values.len() * size_of::<u32>();
size += values.u64_values.len() * size_of::<u64>();
size += values.f32_values.len() * size_of::<f32>();
size += values.f64_values.len() * size_of::<f64>();
size += values.bool_values.len() * size_of::<bool>();
size += values
.binary_values
.iter()
.map(|v| v.len() * size_of::<u8>())
.sum::<usize>();
size += values.string_values.iter().map(|v| v.len()).sum::<usize>();
size += values.date_values.len() * size_of::<i32>();
size += values.datetime_values.len() * size_of::<i64>();
size += values.timestamp_second_values.len() * size_of::<i64>();
size += values.timestamp_millisecond_values.len() * size_of::<i64>();
size += values.timestamp_microsecond_values.len() * size_of::<i64>();
size += values.timestamp_nanosecond_values.len() * size_of::<i64>();
size += values.time_second_values.len() * size_of::<i64>();
size += values.time_millisecond_values.len() * size_of::<i64>();
size += values.time_microsecond_values.len() * size_of::<i64>();
size += values.time_nanosecond_values.len() * size_of::<i64>();
size += values.interval_year_month_values.len() * size_of::<i64>();
size += values.interval_day_time_values.len() * size_of::<i64>();
size += values.interval_month_day_nano_values.len() * size_of::<IntervalMonthDayNano>();
size += values.decimal128_values.len() * size_of::<Decimal128>();
size += values
.list_values
.iter()
.map(|v| {
v.items
.iter()
.map(|item| {
item.value_data
.as_ref()
.map(Self::size_of_value_data)
.unwrap_or(0)
})
.sum::<usize>()
})
.sum::<usize>();
size += values
.struct_values
.iter()
.map(|v| {
v.items
.iter()
.map(|item| {
item.value_data
.as_ref()
.map(Self::size_of_value_data)
.unwrap_or(0)
})
.sum::<usize>()
})
.sum::<usize>();
size
}
fn size_of_value_data(value: &ValueData) -> usize {
match value {
ValueData::I8Value(_) => size_of::<i32>(),
ValueData::I16Value(_) => size_of::<i32>(),
ValueData::I32Value(_) => size_of::<i32>(),
ValueData::I64Value(_) => size_of::<i64>(),
ValueData::U8Value(_) => size_of::<u32>(),
ValueData::U16Value(_) => size_of::<u32>(),
ValueData::U32Value(_) => size_of::<u32>(),
ValueData::U64Value(_) => size_of::<u64>(),
ValueData::F32Value(_) => size_of::<f32>(),
ValueData::F64Value(_) => size_of::<f64>(),
ValueData::BoolValue(_) => size_of::<bool>(),
ValueData::BinaryValue(v) => v.len() * size_of::<u8>(),
ValueData::StringValue(v) => v.len(),
ValueData::DateValue(_) => size_of::<i32>(),
ValueData::DatetimeValue(_) => size_of::<i64>(),
ValueData::TimestampSecondValue(_) => size_of::<i64>(),
ValueData::TimestampMillisecondValue(_) => size_of::<i64>(),
ValueData::TimestampMicrosecondValue(_) => size_of::<i64>(),
ValueData::TimestampNanosecondValue(_) => size_of::<i64>(),
ValueData::TimeSecondValue(_) => size_of::<i64>(),
ValueData::TimeMillisecondValue(_) => size_of::<i64>(),
ValueData::TimeMicrosecondValue(_) => size_of::<i64>(),
ValueData::TimeNanosecondValue(_) => size_of::<i64>(),
ValueData::IntervalYearMonthValue(_) => size_of::<i32>(),
ValueData::IntervalDayTimeValue(_) => size_of::<i64>(),
ValueData::IntervalMonthDayNanoValue(_) => size_of::<IntervalMonthDayNano>(),
ValueData::Decimal128Value(_) => size_of::<Decimal128>(),
ValueData::ListValue(list_values) => list_values
.items
.iter()
.map(|item| {
item.value_data
.as_ref()
.map(Self::size_of_value_data)
.unwrap_or(0)
})
.sum(),
ValueData::StructValue(struct_values) => struct_values
.items
.iter()
.map(|item| {
item.value_data
.as_ref()
.map(Self::size_of_value_data)
.unwrap_or(0)
})
.sum(),
ValueData::JsonValue(v) => {
fn calc(v: &JsonValue) -> usize {
let Some(value) = v.value.as_ref() else {
return 0;
};
match value {
json_value::Value::Boolean(_) => size_of::<bool>(),
json_value::Value::Int(_) => size_of::<i64>(),
json_value::Value::Uint(_) => size_of::<u64>(),
json_value::Value::Float(_) => size_of::<f64>(),
json_value::Value::Str(s) => s.len(),
json_value::Value::Array(array) => array.items.iter().map(calc).sum(),
json_value::Value::Object(object) => object
.entries
.iter()
.flat_map(|entry| {
entry.value.as_ref().map(|v| entry.key.len() + calc(v))
})
.sum(),
}
}
calc(v)
}
}
}
}
#[cfg(test)]
mod tests {
use api::v1::column::Values;
use api::v1::greptime_request::Request;
use api::v1::{Column, InsertRequest};
use super::*;
fn generate_request(size: usize) -> Request {
let i8_values = vec![0; size / 4];
Request::Inserts(InsertRequests {
inserts: vec![InsertRequest {
columns: vec![Column {
values: Some(Values {
i8_values,
..Default::default()
}),
..Default::default()
}],
..Default::default()
}],
})
}
#[tokio::test]
async fn test_limiter() {
let limiter_ref: LimiterRef = Arc::new(Limiter::new(1024));
let tasks_count = 10;
let request_data_size = 100;
let mut handles = vec![];
// Generate multiple requests to test the limiter.
for _ in 0..tasks_count {
let limiter = limiter_ref.clone();
let handle = tokio::spawn(async move {
let result = limiter
.limit_request(&generate_request(request_data_size))
.await;
assert!(result.is_ok());
});
handles.push(handle);
}
// Wait for all threads to complete.
for handle in handles {
handle.await.unwrap();
}
}
#[tokio::test]
async fn test_in_flight_write_bytes() {
let limiter_ref: LimiterRef = Arc::new(Limiter::new(1024));
let req1 = generate_request(100);
let result1 = limiter_ref
.limit_request(&req1)
.await
.expect("failed to acquire permits");
assert_eq!(limiter_ref.in_flight_write_bytes(), 100);
let req2 = generate_request(200);
let result2 = limiter_ref
.limit_request(&req2)
.await
.expect("failed to acquire permits");
assert_eq!(limiter_ref.in_flight_write_bytes(), 300);
drop(result1);
assert_eq!(limiter_ref.in_flight_write_bytes(), 200);
drop(result2);
assert_eq!(limiter_ref.in_flight_write_bytes(), 0);
}
}

View File

@@ -40,6 +40,7 @@ use servers::otel_arrow::OtelArrowServiceHandler;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
use servers::request_memory_limiter::ServerMemoryLimiter;
use servers::server::{Server, ServerHandlers};
use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
use snafu::ResultExt;
@@ -76,15 +77,25 @@ where
}
}
pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result<GrpcServerBuilder> {
pub fn grpc_server_builder(
&self,
opts: &GrpcOptions,
request_memory_limiter: ServerMemoryLimiter,
) -> Result<GrpcServerBuilder> {
let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
.with_memory_limiter(request_memory_limiter)
.with_tls_config(opts.tls.clone())
.context(error::InvalidTlsConfigSnafu)?;
Ok(builder)
}
pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
pub fn http_server_builder(
&self,
opts: &FrontendOptions,
request_memory_limiter: ServerMemoryLimiter,
) -> HttpServerBuilder {
let mut builder = HttpServerBuilder::new(opts.http.clone())
.with_memory_limiter(request_memory_limiter)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
let validator = self.plugins.get::<LogValidatorRef>();
@@ -169,11 +180,12 @@ where
meta_client: &Option<MetaClientOptions>,
name: Option<String>,
external: bool,
request_memory_limiter: ServerMemoryLimiter,
) -> Result<GrpcServer> {
let builder = if let Some(builder) = self.grpc_server_builder.take() {
builder
} else {
self.grpc_server_builder(grpc)?
self.grpc_server_builder(grpc, request_memory_limiter)?
};
let user_provider = if external {
@@ -235,11 +247,16 @@ where
Ok(grpc_server)
}
fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result<HttpServer> {
fn build_http_server(
&mut self,
opts: &FrontendOptions,
toml: String,
request_memory_limiter: ServerMemoryLimiter,
) -> Result<HttpServer> {
let builder = if let Some(builder) = self.http_server_builder.take() {
builder
} else {
self.http_server_builder(opts)
self.http_server_builder(opts, request_memory_limiter)
};
let http_server = builder
@@ -257,6 +274,12 @@ where
let toml = opts.to_toml().context(TomlFormatSnafu)?;
let opts: FrontendOptions = opts.into();
// Create request memory limiter for all server protocols
let request_memory_limiter = ServerMemoryLimiter::new(
opts.max_in_flight_write_bytes.as_bytes(),
opts.write_bytes_exhausted_policy,
);
let handlers = ServerHandlers::default();
let user_provider = self.plugins.get::<UserProviderRef>();
@@ -264,7 +287,13 @@ where
{
// Always init GRPC server
let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
let grpc_server = self.build_grpc_server(
&opts.grpc,
&opts.meta_client,
None,
true,
request_memory_limiter.clone(),
)?;
handlers.insert((Box::new(grpc_server), grpc_addr));
}
@@ -276,6 +305,7 @@ where
&opts.meta_client,
Some("INTERNAL_GRPC_SERVER".to_string()),
false,
request_memory_limiter.clone(),
)?;
handlers.insert((Box::new(grpc_server), grpc_addr));
}
@@ -284,7 +314,8 @@ where
// Always init HTTP server
let http_options = &opts.http;
let http_addr = parse_addr(&http_options.addr)?;
let http_server = self.build_http_server(&opts, toml)?;
let http_server =
self.build_http_server(&opts, toml, request_memory_limiter.clone())?;
handlers.insert((Box::new(http_server), http_addr));
}

View File

@@ -41,6 +41,7 @@ common-frontend.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-mem-prof = { workspace = true, optional = true }
common-memory-manager.workspace = true
common-meta.workspace = true
common-plugins.workspace = true
common-pprof = { workspace = true, optional = true }

View File

@@ -95,6 +95,13 @@ pub enum Error {
error: tonic::transport::Error,
},
#[snafu(display("Request memory limit exceeded"))]
MemoryLimitExceeded {
#[snafu(implicit)]
location: Location,
source: common_memory_manager::Error,
},
#[snafu(display("{} server is already started", server))]
AlreadyStarted {
server: String,
@@ -785,6 +792,8 @@ impl ErrorExt for Error {
Cancelled { .. } => StatusCode::Cancelled,
Suspended { .. } => StatusCode::Suspended,
MemoryLimitExceeded { .. } => StatusCode::RateLimited,
}
}

View File

@@ -52,7 +52,6 @@ use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, T
use crate::metrics::MetricsMiddlewareLayer;
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::request_limiter::RequestMemoryLimiter;
use crate::server::Server;
use crate::tls::TlsOption;
@@ -69,8 +68,6 @@ pub struct GrpcOptions {
pub max_recv_message_size: ReadableSize,
/// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
/// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
pub max_total_message_memory: ReadableSize,
/// Compression mode in Arrow Flight service.
pub flight_compression: FlightCompression,
pub runtime_size: usize,
@@ -126,7 +123,6 @@ impl GrpcOptions {
GrpcServerConfig {
max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
max_send_message_size: self.max_send_message_size.as_bytes() as usize,
max_total_message_memory: self.max_total_message_memory.as_bytes() as usize,
tls: self.tls.clone(),
max_connection_age: self.max_connection_age,
}
@@ -145,7 +141,6 @@ impl Default for GrpcOptions {
server_addr: String::new(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
max_total_message_memory: ReadableSize(0),
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
@@ -167,7 +162,6 @@ impl GrpcOptions {
server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
max_total_message_memory: ReadableSize(0),
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
@@ -234,7 +228,6 @@ pub struct GrpcServer {
bind_addr: Option<SocketAddr>,
name: Option<String>,
config: GrpcServerConfig,
memory_limiter: RequestMemoryLimiter,
}
/// Grpc Server configuration
@@ -244,8 +237,6 @@ pub struct GrpcServerConfig {
pub max_recv_message_size: usize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
/// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
pub max_total_message_memory: usize,
pub tls: TlsOption,
/// Maximum time that a channel may exist.
/// Useful when the server wants to control the reconnection of its clients.
@@ -258,7 +249,6 @@ impl Default for GrpcServerConfig {
Self {
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
max_total_message_memory: 0,
tls: TlsOption::default(),
max_connection_age: None,
}
@@ -298,11 +288,6 @@ impl GrpcServer {
}
Ok(())
}
/// Get the memory limiter for monitoring current memory usage
pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
&self.memory_limiter
}
}
pub struct HealthCheckHandler;

View File

@@ -46,7 +46,7 @@ use crate::grpc::{GrpcServer, GrpcServerConfig};
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
use crate::request_limiter::RequestMemoryLimiter;
use crate::request_memory_limiter::ServerMemoryLimiter;
use crate::tls::TlsOption;
/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
@@ -92,12 +92,14 @@ pub struct GrpcServerBuilder {
HeaderInterceptor,
>,
>,
memory_limiter: RequestMemoryLimiter,
memory_limiter: ServerMemoryLimiter,
}
impl GrpcServerBuilder {
pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
let memory_limiter = RequestMemoryLimiter::new(config.max_total_message_memory);
// Create a default unlimited limiter (can be overridden with with_memory_limiter)
let memory_limiter = ServerMemoryLimiter::default();
Self {
name: None,
config,
@@ -109,6 +111,12 @@ impl GrpcServerBuilder {
}
}
/// Set a global memory limiter for all server protocols.
pub fn with_memory_limiter(mut self, limiter: ServerMemoryLimiter) -> Self {
self.memory_limiter = limiter;
self
}
pub fn config(&self) -> &GrpcServerConfig {
&self.config
}
@@ -117,7 +125,7 @@ impl GrpcServerBuilder {
&self.runtime
}
pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
pub fn memory_limiter(&self) -> &ServerMemoryLimiter {
&self.memory_limiter
}
@@ -238,7 +246,6 @@ impl GrpcServerBuilder {
bind_addr: None,
name: self.name,
config: self.config,
memory_limiter: self.memory_limiter,
}
}
}

View File

@@ -26,8 +26,7 @@ use tonic::{Request, Response, Status, Streaming};
use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::{TonicResult, cancellation};
use crate::hint_headers;
use crate::metrics::{METRIC_GRPC_MEMORY_USAGE_BYTES, METRIC_GRPC_REQUESTS_REJECTED_TOTAL};
use crate::request_limiter::RequestMemoryLimiter;
use crate::request_memory_limiter::ServerMemoryLimiter;
pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
@@ -52,25 +51,12 @@ impl GreptimeDatabase for DatabaseService {
remote_addr, hints
);
let _guard = request
.extensions()
.get::<RequestMemoryLimiter>()
.filter(|limiter| limiter.is_enabled())
.and_then(|limiter| {
let message_size = request.get_ref().encoded_len();
limiter
.try_acquire(message_size)
.map(|guard| {
guard.inspect(|g| {
METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
})
})
.inspect_err(|_| {
METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
})
.transpose()
})
.transpose()?;
let _guard = if let Some(limiter) = request.extensions().get::<ServerMemoryLimiter>() {
let message_size = request.get_ref().encoded_len() as u64;
Some(limiter.acquire(message_size).await?)
} else {
None
};
let handler = self.handler.clone();
let request_future = async move {
@@ -119,7 +105,7 @@ impl GreptimeDatabase for DatabaseService {
remote_addr, hints
);
let limiter = request.extensions().get::<RequestMemoryLimiter>().cloned();
let limiter = request.extensions().get::<ServerMemoryLimiter>().cloned();
let handler = self.handler.clone();
let request_future = async move {
@@ -129,24 +115,12 @@ impl GreptimeDatabase for DatabaseService {
while let Some(request) = stream.next().await {
let request = request?;
let _guard = limiter
.as_ref()
.filter(|limiter| limiter.is_enabled())
.and_then(|limiter| {
let message_size = request.encoded_len();
limiter
.try_acquire(message_size)
.map(|guard| {
guard.inspect(|g| {
METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
})
})
.inspect_err(|_| {
METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
})
.transpose()
})
.transpose()?;
let _guard = if let Some(limiter_ref) = &limiter {
let message_size = request.encoded_len() as u64;
Some(limiter_ref.acquire(message_size).await?)
} else {
None
};
let output = handler.handle_request(request, hints.clone()).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,

View File

@@ -29,6 +29,7 @@ use bytes;
use bytes::Bytes;
use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse};
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_memory_manager::MemoryGuard;
use common_query::{Output, OutputData};
use common_recordbatch::DfRecordBatch;
use common_telemetry::debug;
@@ -39,7 +40,7 @@ use futures::{Stream, future, ready};
use futures_util::{StreamExt, TryStreamExt};
use prost::Message;
use session::context::{QueryContext, QueryContextRef};
use snafu::{ResultExt, ensure};
use snafu::{IntoError, ResultExt, ensure};
use table::table_name::TableName;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
@@ -49,8 +50,8 @@ use crate::error::{InvalidParameterSnafu, Result, ToJsonSnafu};
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
use crate::grpc::greptime_handler::{GreptimeRequestHandler, get_request_type};
use crate::grpc::{FlightCompression, TonicResult, context_auth};
use crate::metrics::{METRIC_GRPC_MEMORY_USAGE_BYTES, METRIC_GRPC_REQUESTS_REJECTED_TOTAL};
use crate::request_limiter::{RequestMemoryGuard, RequestMemoryLimiter};
use crate::request_memory_limiter::ServerMemoryLimiter;
use crate::request_memory_metrics::RequestMemoryMetrics;
use crate::{error, hint_headers};
pub type TonicStream<T> = Pin<Box<dyn Stream<Item = TonicResult<T>> + Send + 'static>>;
@@ -219,7 +220,7 @@ impl FlightCraft for GreptimeRequestHandler {
) -> TonicResult<Response<TonicStream<PutResult>>> {
let (headers, extensions, stream) = request.into_parts();
let limiter = extensions.get::<RequestMemoryLimiter>().cloned();
let limiter = extensions.get::<ServerMemoryLimiter>().cloned();
let query_ctx = context_auth::create_query_context_from_grpc_metadata(&headers)?;
context_auth::check_auth(self.user_provider.clone(), &headers, query_ctx.clone()).await?;
@@ -260,7 +261,7 @@ pub struct PutRecordBatchRequest {
pub record_batch: DfRecordBatch,
pub schema_bytes: Bytes,
pub flight_data: FlightData,
pub(crate) _guard: Option<RequestMemoryGuard>,
pub(crate) _guard: Option<MemoryGuard<RequestMemoryMetrics>>,
}
impl PutRecordBatchRequest {
@@ -270,28 +271,24 @@ impl PutRecordBatchRequest {
request_id: i64,
schema_bytes: Bytes,
flight_data: FlightData,
limiter: Option<&RequestMemoryLimiter>,
limiter: Option<&ServerMemoryLimiter>,
) -> Result<Self> {
let memory_usage = flight_data.data_body.len()
+ flight_data.app_metadata.len()
+ flight_data.data_header.len();
let _guard = limiter
.filter(|limiter| limiter.is_enabled())
.map(|limiter| {
limiter
.try_acquire(memory_usage)
.map(|guard| {
guard.inspect(|g| {
METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
})
})
.inspect_err(|_| {
METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
})
})
.transpose()?
.flatten();
let _guard = if let Some(limiter) = limiter {
let guard = limiter.try_acquire(memory_usage as u64).ok_or_else(|| {
let inner_err = common_memory_manager::Error::MemoryLimitExceeded {
requested_bytes: memory_usage as u64,
limit_bytes: limiter.limit_bytes(),
};
error::MemoryLimitExceededSnafu.into_error(inner_err)
})?;
Some(guard)
} else {
None
};
Ok(Self {
table_name,
@@ -308,7 +305,7 @@ pub struct PutRecordBatchRequestStream {
flight_data_stream: Streaming<FlightData>,
catalog: String,
schema_name: String,
limiter: Option<RequestMemoryLimiter>,
limiter: Option<ServerMemoryLimiter>,
// Client now lazily sends schema data so we cannot eagerly wait for it.
// Instead, we need to decode while receiving record batches.
state: StreamState,
@@ -331,7 +328,7 @@ impl PutRecordBatchRequestStream {
flight_data_stream: Streaming<FlightData>,
catalog: String,
schema: String,
limiter: Option<RequestMemoryLimiter>,
limiter: Option<ServerMemoryLimiter>,
) -> TonicResult<Self> {
Ok(Self {
flight_data_stream,
@@ -395,7 +392,6 @@ impl Stream for PutRecordBatchRequestStream {
match poll {
Some(Ok(flight_data)) => {
// Clone limiter once to avoid borrowing issues
let limiter = self.limiter.clone();
match &mut self.state {

View File

@@ -18,15 +18,15 @@ use futures::future::BoxFuture;
use tonic::server::NamedService;
use tower::{Layer, Service};
use crate::request_limiter::RequestMemoryLimiter;
use crate::request_memory_limiter::ServerMemoryLimiter;
#[derive(Clone)]
pub struct MemoryLimiterExtensionLayer {
limiter: RequestMemoryLimiter,
limiter: ServerMemoryLimiter,
}
impl MemoryLimiterExtensionLayer {
pub fn new(limiter: RequestMemoryLimiter) -> Self {
pub fn new(limiter: ServerMemoryLimiter) -> Self {
Self { limiter }
}
}
@@ -45,7 +45,7 @@ impl<S> Layer<S> for MemoryLimiterExtensionLayer {
#[derive(Clone)]
pub struct MemoryLimiterExtensionService<S> {
inner: S,
limiter: RequestMemoryLimiter,
limiter: ServerMemoryLimiter,
}
impl<S: NamedService> NamedService for MemoryLimiterExtensionService<S> {

View File

@@ -83,7 +83,7 @@ use crate::query_handler::{
OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
PromStoreProtocolHandlerRef,
};
use crate::request_limiter::RequestMemoryLimiter;
use crate::request_memory_limiter::ServerMemoryLimiter;
use crate::server::Server;
pub mod authorize;
@@ -134,7 +134,7 @@ pub struct HttpServer {
router: StdMutex<Router>,
shutdown_tx: Mutex<Option<Sender<()>>>,
user_provider: Option<UserProviderRef>,
memory_limiter: RequestMemoryLimiter,
memory_limiter: ServerMemoryLimiter,
// plugins
plugins: Plugins,
@@ -157,9 +157,6 @@ pub struct HttpOptions {
pub body_limit: ReadableSize,
/// Maximum total memory for all concurrent HTTP request bodies. 0 disables the limit.
pub max_total_body_memory: ReadableSize,
/// Validation mode while decoding Prometheus remote write requests.
pub prom_validation_mode: PromValidationMode,
@@ -204,7 +201,6 @@ impl Default for HttpOptions {
timeout: Duration::from_secs(0),
disable_dashboard: false,
body_limit: DEFAULT_BODY_LIMIT,
max_total_body_memory: ReadableSize(0),
cors_allowed_origins: Vec::new(),
enable_cors: true,
prom_validation_mode: PromValidationMode::Strict,
@@ -539,12 +535,12 @@ pub struct GreptimeOptionsConfigState {
pub greptime_config_options: String,
}
#[derive(Default)]
pub struct HttpServerBuilder {
options: HttpOptions,
plugins: Plugins,
user_provider: Option<UserProviderRef>,
router: Router,
memory_limiter: ServerMemoryLimiter,
}
impl HttpServerBuilder {
@@ -554,9 +550,16 @@ impl HttpServerBuilder {
plugins: Plugins::default(),
user_provider: None,
router: Router::new(),
memory_limiter: ServerMemoryLimiter::default(),
}
}
/// Set a global memory limiter for all server protocols.
pub fn with_memory_limiter(mut self, limiter: ServerMemoryLimiter) -> Self {
self.memory_limiter = limiter;
self
}
pub fn with_sql_handler(self, sql_handler: ServerSqlQueryHandlerRef) -> Self {
let sql_router = HttpServer::route_sql(ApiState { sql_handler });
@@ -750,8 +753,6 @@ impl HttpServerBuilder {
}
pub fn build(self) -> HttpServer {
let memory_limiter =
RequestMemoryLimiter::new(self.options.max_total_body_memory.as_bytes() as usize);
HttpServer {
options: self.options,
user_provider: self.user_provider,
@@ -759,7 +760,7 @@ impl HttpServerBuilder {
plugins: self.plugins,
router: StdMutex::new(self.router),
bind_addr: None,
memory_limiter,
memory_limiter: self.memory_limiter,
}
}
}

View File

@@ -19,11 +19,10 @@ use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use http::StatusCode;
use crate::metrics::{METRIC_HTTP_MEMORY_USAGE_BYTES, METRIC_HTTP_REQUESTS_REJECTED_TOTAL};
use crate::request_limiter::RequestMemoryLimiter;
use crate::request_memory_limiter::ServerMemoryLimiter;
pub async fn memory_limit_middleware(
State(limiter): State<RequestMemoryLimiter>,
State(limiter): State<ServerMemoryLimiter>,
req: Request,
next: Next,
) -> Response {
@@ -31,15 +30,12 @@ pub async fn memory_limit_middleware(
.headers()
.get(http::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
.and_then(|v| v.parse::<usize>().ok())
.and_then(|v| v.parse::<u64>().ok())
.unwrap_or(0);
let _guard = match limiter.try_acquire(content_length) {
Ok(guard) => guard.inspect(|g| {
METRIC_HTTP_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
}),
let _guard = match limiter.acquire(content_length).await {
Ok(guard) => guard,
Err(e) => {
METRIC_HTTP_REQUESTS_REJECTED_TOTAL.inc();
return (
StatusCode::TOO_MANY_REQUESTS,
format!("Request body memory limit exceeded: {}", e),

View File

@@ -50,7 +50,8 @@ pub mod prometheus_handler;
pub mod proto;
pub mod query_handler;
pub mod repeated_field;
pub mod request_limiter;
pub mod request_memory_limiter;
pub mod request_memory_metrics;
mod row_writer;
pub mod server;
pub mod tls;

View File

@@ -299,24 +299,24 @@ lazy_static! {
"servers handle bulk insert elapsed",
).unwrap();
pub static ref METRIC_HTTP_MEMORY_USAGE_BYTES: IntGauge = register_int_gauge!(
"greptime_servers_http_memory_usage_bytes",
"current http request memory usage in bytes"
// Unified request memory metrics
/// Current memory in use by all concurrent requests (HTTP, gRPC, Flight).
pub static ref REQUEST_MEMORY_IN_USE: IntGauge = register_int_gauge!(
"greptime_servers_request_memory_in_use_bytes",
"bytes currently reserved for all concurrent request bodies and messages"
).unwrap();
pub static ref METRIC_HTTP_REQUESTS_REJECTED_TOTAL: IntCounter = register_int_counter!(
"greptime_servers_http_requests_rejected_total",
"total number of http requests rejected due to memory limit"
/// Maximum configured memory for all concurrent requests.
pub static ref REQUEST_MEMORY_LIMIT: IntGauge = register_int_gauge!(
"greptime_servers_request_memory_limit_bytes",
"maximum bytes allowed for all concurrent request bodies and messages"
).unwrap();
pub static ref METRIC_GRPC_MEMORY_USAGE_BYTES: IntGauge = register_int_gauge!(
"greptime_servers_grpc_memory_usage_bytes",
"current grpc request memory usage in bytes"
).unwrap();
pub static ref METRIC_GRPC_REQUESTS_REJECTED_TOTAL: IntCounter = register_int_counter!(
"greptime_servers_grpc_requests_rejected_total",
"total number of grpc requests rejected due to memory limit"
/// Total number of rejected requests due to memory exhaustion.
pub static ref REQUEST_MEMORY_REJECTED: IntCounterVec = register_int_counter_vec!(
"greptime_servers_request_memory_rejected_total",
"number of requests rejected due to memory limit",
&["reason"]
).unwrap();
}

View File

@@ -1,230 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Request memory limiter for controlling total memory usage of concurrent requests.
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::error::{Result, TooManyConcurrentRequestsSnafu};
/// Limiter for total memory usage of concurrent request bodies.
///
/// Tracks the total memory used by all concurrent request bodies
/// and rejects new requests when the limit is reached.
#[derive(Clone, Default)]
pub struct RequestMemoryLimiter {
inner: Option<Arc<LimiterInner>>,
}
struct LimiterInner {
current_usage: AtomicUsize,
max_memory: usize,
}
impl RequestMemoryLimiter {
/// Create a new memory limiter.
///
/// # Arguments
/// * `max_memory` - Maximum total memory for all concurrent request bodies in bytes (0 = unlimited)
pub fn new(max_memory: usize) -> Self {
if max_memory == 0 {
return Self { inner: None };
}
Self {
inner: Some(Arc::new(LimiterInner {
current_usage: AtomicUsize::new(0),
max_memory,
})),
}
}
/// Try to acquire memory for a request of given size.
///
/// Returns `Ok(RequestMemoryGuard)` if memory was acquired successfully.
/// Returns `Err` if the memory limit would be exceeded.
pub fn try_acquire(&self, request_size: usize) -> Result<Option<RequestMemoryGuard>> {
let Some(inner) = self.inner.as_ref() else {
return Ok(None);
};
let mut new_usage = 0;
let result =
inner
.current_usage
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
new_usage = current.saturating_add(request_size);
if new_usage <= inner.max_memory {
Some(new_usage)
} else {
None
}
});
match result {
Ok(_) => Ok(Some(RequestMemoryGuard {
size: request_size,
limiter: Arc::clone(inner),
usage_snapshot: new_usage,
})),
Err(_current) => TooManyConcurrentRequestsSnafu {
limit: inner.max_memory,
request_size,
}
.fail(),
}
}
/// Check if limiter is enabled
pub fn is_enabled(&self) -> bool {
self.inner.is_some()
}
/// Get current memory usage
pub fn current_usage(&self) -> usize {
self.inner
.as_ref()
.map(|inner| inner.current_usage.load(Ordering::Relaxed))
.unwrap_or(0)
}
/// Get max memory limit
pub fn max_memory(&self) -> usize {
self.inner
.as_ref()
.map(|inner| inner.max_memory)
.unwrap_or(0)
}
}
/// RAII guard that releases memory when dropped
pub struct RequestMemoryGuard {
size: usize,
limiter: Arc<LimiterInner>,
usage_snapshot: usize,
}
impl RequestMemoryGuard {
/// Returns the total memory usage snapshot at the time this guard was acquired.
pub fn current_usage(&self) -> usize {
self.usage_snapshot
}
}
impl Drop for RequestMemoryGuard {
fn drop(&mut self) {
self.limiter
.current_usage
.fetch_sub(self.size, Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use tokio::sync::Barrier;
use super::*;
#[test]
fn test_limiter_disabled() {
let limiter = RequestMemoryLimiter::new(0);
assert!(!limiter.is_enabled());
assert!(limiter.try_acquire(1000000).unwrap().is_none());
assert_eq!(limiter.current_usage(), 0);
}
#[test]
fn test_limiter_basic() {
let limiter = RequestMemoryLimiter::new(1000);
assert!(limiter.is_enabled());
assert_eq!(limiter.max_memory(), 1000);
assert_eq!(limiter.current_usage(), 0);
// Acquire 400 bytes
let _guard1 = limiter.try_acquire(400).unwrap();
assert_eq!(limiter.current_usage(), 400);
// Acquire another 500 bytes
let _guard2 = limiter.try_acquire(500).unwrap();
assert_eq!(limiter.current_usage(), 900);
// Try to acquire 200 bytes - should fail (900 + 200 > 1000)
let result = limiter.try_acquire(200);
assert!(result.is_err());
assert_eq!(limiter.current_usage(), 900);
// Drop first guard
drop(_guard1);
assert_eq!(limiter.current_usage(), 500);
// Now we can acquire 200 bytes
let _guard3 = limiter.try_acquire(200).unwrap();
assert_eq!(limiter.current_usage(), 700);
}
#[test]
fn test_limiter_exact_limit() {
let limiter = RequestMemoryLimiter::new(1000);
// Acquire exactly the limit
let _guard = limiter.try_acquire(1000).unwrap();
assert_eq!(limiter.current_usage(), 1000);
// Try to acquire 1 more byte - should fail
let result = limiter.try_acquire(1);
assert!(result.is_err());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_limiter_concurrent() {
let limiter = RequestMemoryLimiter::new(1000);
let barrier = Arc::new(Barrier::new(11)); // 10 tasks + main
let mut handles = vec![];
// Spawn 10 tasks each trying to acquire 200 bytes
for _ in 0..10 {
let limiter_clone = limiter.clone();
let barrier_clone = barrier.clone();
let handle = tokio::spawn(async move {
barrier_clone.wait().await;
limiter_clone.try_acquire(200)
});
handles.push(handle);
}
// Let all tasks start together
barrier.wait().await;
let mut success_count = 0;
let mut fail_count = 0;
let mut guards = Vec::new();
for handle in handles {
match handle.await.unwrap() {
Ok(Some(guard)) => {
success_count += 1;
guards.push(guard);
}
Err(_) => fail_count += 1,
Ok(None) => unreachable!(),
}
}
// Only 5 tasks should succeed (5 * 200 = 1000)
assert_eq!(success_count, 5);
assert_eq!(fail_count, 5);
drop(guards);
}
}

View File

@@ -0,0 +1,76 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Unified memory limiter for all server request protocols.
use std::sync::Arc;
use common_memory_manager::{MemoryGuard, MemoryManager, OnExhaustedPolicy, PermitGranularity};
use snafu::ResultExt;
use crate::error::{MemoryLimitExceededSnafu, Result};
use crate::request_memory_metrics::RequestMemoryMetrics;
/// Unified memory limiter for all server request protocols.
///
/// Manages a global memory pool for HTTP requests, gRPC messages, and
/// Arrow Flight batches without distinguishing between them.
#[derive(Clone)]
pub struct ServerMemoryLimiter {
manager: Arc<MemoryManager<RequestMemoryMetrics>>,
policy: OnExhaustedPolicy,
}
impl Default for ServerMemoryLimiter {
/// Creates a limiter with unlimited memory (0 bytes) and default policy.
fn default() -> Self {
Self::new(0, OnExhaustedPolicy::default())
}
}
impl ServerMemoryLimiter {
/// Creates a new unified memory limiter.
///
/// # Arguments
///
/// * `total_bytes` - Maximum total memory for all concurrent requests (0 = unlimited)
/// * `policy` - Policy when memory quota is exhausted
pub fn new(total_bytes: u64, policy: OnExhaustedPolicy) -> Self {
let manager = Arc::new(MemoryManager::with_granularity(
total_bytes,
PermitGranularity::Kilobyte,
RequestMemoryMetrics,
));
Self { manager, policy }
}
/// Acquire memory for a request.
pub async fn acquire(&self, bytes: u64) -> Result<MemoryGuard<RequestMemoryMetrics>> {
self.manager
.acquire_with_policy(bytes, self.policy)
.await
.context(MemoryLimitExceededSnafu)
}
/// Try to acquire memory without waiting.
pub fn try_acquire(&self, bytes: u64) -> Option<MemoryGuard<RequestMemoryMetrics>> {
self.manager.try_acquire(bytes)
}
/// Returns total memory limit in bytes (0 if unlimited).
pub fn limit_bytes(&self) -> u64 {
self.manager.limit_bytes()
}
}

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Unified metrics adapter for all server protocols.
use common_memory_manager::MemoryMetrics;
use crate::metrics::{REQUEST_MEMORY_IN_USE, REQUEST_MEMORY_LIMIT, REQUEST_MEMORY_REJECTED};
/// Metrics adapter for unified request memory tracking.
///
/// This adapter tracks memory usage for all server protocols (HTTP, gRPC, Arrow Flight)
/// without distinguishing between them. All requests contribute to the same set of metrics.
#[derive(Clone, Copy, Debug, Default)]
pub struct RequestMemoryMetrics;
impl MemoryMetrics for RequestMemoryMetrics {
fn set_limit(&self, bytes: i64) {
REQUEST_MEMORY_LIMIT.set(bytes);
}
fn set_in_use(&self, bytes: i64) {
REQUEST_MEMORY_IN_USE.set(bytes);
}
fn inc_rejected(&self, reason: &str) {
REQUEST_MEMORY_REJECTED.with_label_values(&[reason]).inc();
}
}

View File

@@ -15,6 +15,7 @@ common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-memory-manager.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-procedure.workspace = true

View File

@@ -14,6 +14,7 @@
use common_base::readable_size::ReadableSize;
use common_config::{Configurable, KvBackendConfig};
use common_memory_manager::OnExhaustedPolicy;
use common_options::memory::MemoryOptions;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use common_wal::config::DatanodeWalConfig;
@@ -37,6 +38,12 @@ pub struct StandaloneOptions {
pub enable_telemetry: bool,
pub default_timezone: Option<String>,
pub default_column_prefix: Option<String>,
/// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
/// Set to 0 to disable the limit. Default: "0" (unlimited)
pub max_in_flight_write_bytes: ReadableSize,
/// Policy when write bytes quota is exhausted.
/// Options: "wait" (default, 10s), "wait(<duration>)", "fail"
pub write_bytes_exhausted_policy: OnExhaustedPolicy,
pub http: HttpOptions,
pub grpc: GrpcOptions,
pub mysql: MysqlOptions,
@@ -57,7 +64,6 @@ pub struct StandaloneOptions {
pub tracing: TracingOptions,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
pub max_in_flight_write_bytes: Option<ReadableSize>,
pub slow_query: SlowQueryOptions,
pub query: QueryOptions,
pub memory: MemoryOptions,
@@ -69,6 +75,8 @@ impl Default for StandaloneOptions {
enable_telemetry: true,
default_timezone: None,
default_column_prefix: None,
max_in_flight_write_bytes: ReadableSize(0),
write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
mysql: MysqlOptions::default(),
@@ -91,7 +99,6 @@ impl Default for StandaloneOptions {
tracing: TracingOptions::default(),
init_regions_in_background: false,
init_regions_parallelism: 16,
max_in_flight_write_bytes: None,
slow_query: SlowQueryOptions::default(),
query: QueryOptions::default(),
memory: MemoryOptions::default(),
@@ -120,6 +127,8 @@ impl StandaloneOptions {
let cloned_opts = self.clone();
FrontendOptions {
default_timezone: cloned_opts.default_timezone,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
write_bytes_exhausted_policy: cloned_opts.write_bytes_exhausted_policy,
http: cloned_opts.http,
grpc: cloned_opts.grpc,
mysql: cloned_opts.mysql,
@@ -131,7 +140,6 @@ impl StandaloneOptions {
meta_client: None,
logging: cloned_opts.logging,
user_provider: cloned_opts.user_provider,
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
slow_query: cloned_opts.slow_query,
..Default::default()
}

View File

@@ -30,6 +30,7 @@ common-error.workspace = true
common-event-recorder.workspace = true
common-frontend.workspace = true
common-grpc.workspace = true
common-memory-manager.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-procedure.workspace = true
common-query.workspace = true

View File

@@ -143,6 +143,7 @@ mod tests {
"test_grpc_max_connection_age",
None,
Some(config),
None,
)
.await;
let addr = server.bind_addr().unwrap().to_string();

View File

@@ -49,6 +49,7 @@ use servers::otel_arrow::OtelArrowServiceHandler;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
use servers::request_memory_limiter::ServerMemoryLimiter;
use servers::server::Server;
use servers::tls::ReloadableTlsServerConfig;
use session::context::QueryContext;
@@ -438,6 +439,23 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
store_type: StorageType,
name: &str,
user_provider: Option<UserProviderRef>,
) -> (Router, TestGuard) {
setup_test_http_app_with_frontend_and_custom_options(
store_type,
name,
user_provider,
None,
None,
)
.await
}
pub async fn setup_test_http_app_with_frontend_and_custom_options(
store_type: StorageType,
name: &str,
user_provider: Option<UserProviderRef>,
http_opts: Option<HttpOptions>,
memory_limiter: Option<ServerMemoryLimiter>,
) -> (Router, TestGuard) {
let plugins = Plugins::new();
if let Some(user_provider) = user_provider.clone() {
@@ -449,14 +467,12 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
create_test_table(instance.fe_instance(), "demo").await;
let http_opts = HttpOptions {
let http_opts = http_opts.unwrap_or_else(|| HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
};
});
let mut http_server = HttpServerBuilder::new(http_opts);
http_server = http_server
let mut http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(
instance.fe_instance().clone(),
))
@@ -471,6 +487,10 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
http_server = http_server.with_user_provider(user_provider);
}
if let Some(limiter) = memory_limiter {
http_server = http_server.with_memory_limiter(limiter);
}
let http_server = http_server.build();
let app = http_server.build(http_server.make_app()).unwrap();
@@ -561,7 +581,7 @@ pub async fn setup_grpc_server(
store_type: StorageType,
name: &str,
) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
setup_grpc_server_with(store_type, name, None, None).await
setup_grpc_server_with(store_type, name, None, None, None).await
}
pub async fn setup_grpc_server_with_user_provider(
@@ -569,7 +589,7 @@ pub async fn setup_grpc_server_with_user_provider(
name: &str,
user_provider: Option<UserProviderRef>,
) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
setup_grpc_server_with(store_type, name, user_provider, None).await
setup_grpc_server_with(store_type, name, user_provider, None, None).await
}
pub async fn setup_grpc_server_with(
@@ -577,6 +597,7 @@ pub async fn setup_grpc_server_with(
name: &str,
user_provider: Option<UserProviderRef>,
grpc_config: Option<GrpcServerConfig>,
memory_limiter: Option<servers::request_memory_limiter::ServerMemoryLimiter>,
) -> (GreptimeDbStandalone, Arc<GrpcServer>) {
let instance = setup_standalone_instance(name, store_type).await;
@@ -598,7 +619,13 @@ pub async fn setup_grpc_server_with(
let flight_handler = Arc::new(greptime_request_handler.clone());
let grpc_config = grpc_config.unwrap_or_default();
let grpc_builder = GrpcServerBuilder::new(grpc_config.clone(), runtime)
let mut grpc_builder = GrpcServerBuilder::new(grpc_config.clone(), runtime);
if let Some(limiter) = memory_limiter {
grpc_builder = grpc_builder.with_memory_limiter(limiter);
}
let grpc_builder = grpc_builder
.database_handler(greptime_request_handler)
.flight_handler(flight_handler)
.prometheus_handler(fe_instance_ref.clone(), user_provider.clone())

View File

@@ -25,6 +25,7 @@ use auth::user_provider_from_option;
use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputData};
use common_catalog::consts::MITO_ENGINE;
use common_grpc::channel_manager::ClientTlsOption;
use common_memory_manager::OnExhaustedPolicy;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_runtime::Runtime;
@@ -38,6 +39,7 @@ use servers::http::prometheus::{
PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusJsonResponse,
PrometheusResponse,
};
use servers::request_memory_limiter::ServerMemoryLimiter;
use servers::server::Server;
use servers::tls::{TlsMode, TlsOption};
use tests_integration::test_util::{
@@ -144,8 +146,14 @@ pub async fn test_grpc_message_size_ok(store_type: StorageType) {
max_send_message_size: 1024,
..Default::default()
};
let (_db, fe_grpc_server) =
setup_grpc_server_with(store_type, "test_grpc_message_size_ok", None, Some(config)).await;
let (_db, fe_grpc_server) = setup_grpc_server_with(
store_type,
"test_grpc_message_size_ok",
None,
Some(config),
None,
)
.await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let grpc_client = Client::with_urls(vec![addr]);
@@ -164,8 +172,14 @@ pub async fn test_grpc_zstd_compression(store_type: StorageType) {
max_send_message_size: 1024,
..Default::default()
};
let (_db, fe_grpc_server) =
setup_grpc_server_with(store_type, "test_grpc_zstd_compression", None, Some(config)).await;
let (_db, fe_grpc_server) = setup_grpc_server_with(
store_type,
"test_grpc_zstd_compression",
None,
Some(config),
None,
)
.await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let grpc_client = Client::with_urls(vec![addr]);
@@ -188,6 +202,7 @@ pub async fn test_grpc_message_size_limit_send(store_type: StorageType) {
"test_grpc_message_size_limit_send",
None,
Some(config),
None,
)
.await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
@@ -213,6 +228,7 @@ pub async fn test_grpc_message_size_limit_recv(store_type: StorageType) {
"test_grpc_message_size_limit_recv",
None,
Some(config),
None,
)
.await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
@@ -886,7 +902,7 @@ pub async fn test_grpc_timezone(store_type: StorageType) {
..Default::default()
};
let (_db, fe_grpc_server) =
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;
setup_grpc_server_with(store_type, "auto_create_table", None, Some(config), None).await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let grpc_client = Client::with_urls(vec![addr]);
@@ -959,12 +975,11 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
max_total_message_memory: 1024 * 1024 * 1024,
tls,
max_connection_age: None,
};
let (_db, fe_grpc_server) =
setup_grpc_server_with(store_type, "tls_create_table", None, Some(config)).await;
setup_grpc_server_with(store_type, "tls_create_table", None, Some(config), None).await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let mut client_tls = ClientTlsOption {
@@ -1003,7 +1018,6 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
max_total_message_memory: 1024 * 1024 * 1024,
tls,
max_connection_age: None,
};
@@ -1021,12 +1035,23 @@ pub async fn test_grpc_memory_limit(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024 * 1024,
max_send_message_size: 1024 * 1024,
max_total_message_memory: 200,
tls: Default::default(),
max_connection_age: None,
};
let (_db, fe_grpc_server) =
setup_grpc_server_with(store_type, "test_grpc_memory_limit", None, Some(config)).await;
// Create memory limiter with 2KB limit and fail-fast policy.
// Note: MemoryManager uses 1KB granularity (PermitGranularity::Kilobyte),
// so 2KB = 2 permits. Small/medium requests should fit, large should fail.
let memory_limiter = ServerMemoryLimiter::new(2048, OnExhaustedPolicy::Fail);
let (_db, fe_grpc_server) = setup_grpc_server_with(
store_type,
"test_grpc_memory_limit",
None,
Some(config),
Some(memory_limiter),
)
.await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let grpc_client = Client::with_urls([&addr]);
@@ -1120,13 +1145,63 @@ pub async fn test_grpc_memory_limit(store_type: StorageType) {
.await;
assert!(result.is_ok());
// Test that large request exceeds limit
let large_rows: Vec<Row> = (0..100)
// Test that medium request in the 200-1024 byte range should also succeed
// (due to 1KB granularity alignment)
let medium_rows: Vec<Row> = (0..5)
.map(|i| Row {
values: vec![
Value {
value_data: Some(ValueData::StringValue(format!("host{}", i))),
},
Value {
value_data: Some(ValueData::TimestampMillisecondValue(2000 + i)),
},
Value {
value_data: Some(ValueData::F64Value(i as f64 * 2.5)),
},
],
})
.collect();
let medium_row_insert = RowInsertRequest {
table_name: table_name.to_owned(),
rows: Some(api::v1::Rows {
schema: column_schemas
.iter()
.map(|c| api::v1::ColumnSchema {
column_name: c.name.clone(),
datatype: c.data_type,
semantic_type: c.semantic_type,
datatype_extension: None,
options: None,
})
.collect(),
rows: medium_rows,
}),
};
let result = db
.row_inserts(RowInsertRequests {
inserts: vec![medium_row_insert],
})
.await;
assert!(
result.is_ok(),
"Medium request (~500 bytes) should succeed within aligned 1KB limit"
);
// Test that large request exceeds limit (> 1KB aligned limit)
// Create a very large string to ensure we definitely exceed 1KB
// Use 100 rows with very long strings (>50 chars each) = definitely >5KB total
let large_rows: Vec<Row> = (0..100)
.map(|i| Row {
values: vec![
Value {
value_data: Some(ValueData::StringValue(format!(
"this_is_a_very_long_hostname_string_designed_to_make_the_request_exceed_memory_limit_row_number_{}",
i
))),
},
Value {
value_data: Some(ValueData::TimestampMillisecondValue(1000 + i)),
},
@@ -1159,12 +1234,15 @@ pub async fn test_grpc_memory_limit(store_type: StorageType) {
inserts: vec![large_row_insert],
})
.await;
assert!(result.is_err());
assert!(
result.is_err(),
"Large request should exceed 1KB limit and fail"
);
let err = result.unwrap_err();
let err_msg = err.to_string();
assert!(
err_msg.contains("Too many concurrent"),
"Expected memory limit error, got: {}",
err_msg.contains("Memory limit exceeded"),
"Expected 'Memory limit exceeded' error, got: {}",
err_msg
);

View File

@@ -32,6 +32,7 @@ use common_error::status_code::StatusCode as ErrorCode;
use common_frontend::slow_query_event::{
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
};
use common_memory_manager::OnExhaustedPolicy;
use flate2::Compression;
use flate2::write::GzEncoder;
use log_query::{Context, Limit, LogQuery, TimeFilter};
@@ -55,6 +56,7 @@ use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::prom_store::{self, mock_timeseries_new_label};
use servers::request_memory_limiter::ServerMemoryLimiter;
use table::table_name::TableName;
use tests_integration::test_util::{
StorageType, setup_test_http_app, setup_test_http_app_with_frontend,
@@ -143,6 +145,7 @@ macro_rules! http_tests {
test_jaeger_query_api_for_trace_v1,
test_influxdb_write,
test_http_memory_limit,
);
)*
};
@@ -1379,6 +1382,8 @@ providers = []"#,
let expected_toml_str = format!(
r#"
enable_telemetry = true
max_in_flight_write_bytes = "0KiB"
write_bytes_exhausted_policy = "wait"
init_regions_in_background = false
init_regions_parallelism = 16
@@ -3319,7 +3324,7 @@ processors:
events = del(.events)
base_host = del(.host)
base_timestamp = del(.timestamp)
# Map each event to a complete row object
map_values(array!(events)) -> |event| {
{
@@ -7117,3 +7122,147 @@ fn compress_vec_with_gzip(data: Vec<u8>) -> Vec<u8> {
encoder.write_all(&data).unwrap();
encoder.finish().unwrap()
}
pub async fn test_http_memory_limit(store_type: StorageType) {
use tests_integration::test_util::setup_test_http_app_with_frontend_and_custom_options;
common_telemetry::init_default_ut_logging();
let http_opts = servers::http::HttpOptions {
addr: format!("127.0.0.1:{}", common_test_util::ports::get_port()),
..Default::default()
};
// Create memory limiter with 2KB limit and fail-fast policy.
// Note: MemoryManager uses 1KB granularity (PermitGranularity::Kilobyte),
// so 2KB = 2 permits. Small/medium requests should fit, large should fail.
let memory_limiter = ServerMemoryLimiter::new(2048, OnExhaustedPolicy::Fail);
let (app, mut guard) = setup_test_http_app_with_frontend_and_custom_options(
store_type,
"test_http_memory_limit",
None,
Some(http_opts),
Some(memory_limiter),
)
.await;
let client = TestClient::new(app).await;
// Create table first
let res = client
.get("/v1/sql?sql=CREATE TABLE test_mem_limit(host STRING, cpu DOUBLE, ts TIMESTAMP TIME INDEX)")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
// Test 1: Small POST request should succeed (uses actual memory limiting path)
// Note: GET requests have no body, so they bypass body memory limiting entirely.
let small_insert = "INSERT INTO test_mem_limit VALUES ('host1', 1.0, 0)";
let res = client
.post("/v1/sql")
.header("Content-Type", "application/x-www-form-urlencoded")
.body(format!("sql={}", small_insert))
.send()
.await;
assert_eq!(res.status(), StatusCode::OK, "Small POST should succeed");
// Test 1B: Medium request in the 500-1024 byte range should also succeed
// (due to 1KB granularity alignment)
let medium_values: Vec<String> = (0..8)
.map(|i| format!("('host{}', {}.5, {})", i, i, i * 1000))
.collect();
let medium_insert = format!(
"INSERT INTO test_mem_limit VALUES {}",
medium_values.join(", ")
);
let res = client
.post("/v1/sql")
.header("Content-Type", "application/x-www-form-urlencoded")
.body(format!("sql={}", medium_insert))
.send()
.await;
assert_eq!(
res.status(),
StatusCode::OK,
"Medium request (~700 bytes) should succeed within aligned 1KB limit"
);
// Test 2: Large write request should be rejected (exceeds 2KB limit)
// Generate a large INSERT with many rows and long strings to definitely exceed 2KB
let large_values: Vec<String> = (0..100)
.map(|i| {
format!(
"('this_is_a_very_long_hostname_string_to_increase_body_size_row_{}', {}.5, {})",
i,
i,
i * 1000
)
})
.collect();
let large_insert = format!(
"INSERT INTO test_mem_limit VALUES {}",
large_values.join(", ")
);
let res = client
.post("/v1/sql")
.header("Content-Type", "application/x-www-form-urlencoded")
.body(format!("sql={}", large_insert))
.send()
.await;
assert_eq!(
res.status(),
StatusCode::TOO_MANY_REQUESTS,
"Large write should be rejected with 429"
);
let error_body = res.text().await;
assert!(
error_body.contains("Request body memory limit exceeded"),
"Error message should be 'Request body memory limit exceeded', got: {}",
error_body
);
// Test 3A: Small InfluxDB write should succeed
let small_influx = "test_influx,host=host1 cpu=1.5 1000000000";
let res = client
.post("/v1/influxdb/write?db=public")
.body(small_influx)
.send()
.await;
assert_eq!(
res.status(),
StatusCode::NO_CONTENT,
"Small InfluxDB write should succeed"
);
// Test 3B: Large InfluxDB write should be rejected
let large_influx_body = (0..100)
.map(|i| {
format!(
"test_influx,host=host{} cpu={}.5 {}",
i,
i,
(i as i64) * 1000000000
)
})
.collect::<Vec<_>>()
.join("\n");
let res = client
.post("/v1/influxdb/write?db=public")
.body(large_influx_body)
.send()
.await;
assert_eq!(
res.status(),
StatusCode::TOO_MANY_REQUESTS,
"Large InfluxDB write should be rejected"
);
guard.remove_all().await;
}