diff --git a/Cargo.lock b/Cargo.lock
index 4cf1583019..02d3f2e1cf 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5036,6 +5036,7 @@ dependencies = [
"common-function",
"common-grpc",
"common-macro",
+ "common-memory-manager",
"common-meta",
"common-options",
"common-procedure",
@@ -11690,6 +11691,7 @@ dependencies = [
"common-grpc",
"common-macro",
"common-mem-prof",
+ "common-memory-manager",
"common-meta",
"common-plugins",
"common-pprof",
@@ -12482,6 +12484,7 @@ dependencies = [
"common-config",
"common-error",
"common-macro",
+ "common-memory-manager",
"common-meta",
"common-options",
"common-procedure",
@@ -13184,6 +13187,7 @@ dependencies = [
"common-event-recorder",
"common-frontend",
"common-grpc",
+ "common-memory-manager",
"common-meta",
"common-procedure",
"common-query",
diff --git a/config/config.md b/config/config.md
index 779c5a7ea8..62e78f56c9 100644
--- a/config/config.md
+++ b/config/config.md
@@ -14,11 +14,12 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
+| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight). Set to 0 to disable the limit. Default: "0" (unlimited) |
+| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted. Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail" |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup. By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. NOTE: This setting affects scan_memory_limit's privileged tier allocation. When set, 70% of queries get privileged memory access (full scan_memory_limit). The remaining 30% get standard tier access (70% of scan_memory_limit). |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
-| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -26,14 +27,12 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit. The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`. Set to 0 to disable limit. |
-| `http.max_total_body_memory` | String | Unset | Maximum total memory for all concurrent HTTP request bodies. Set to 0 to disable the limit. Default: "0" (unlimited) |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests. Available options: - strict: deny invalid UTF-8 strings (default). - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD). - unchecked: do not valid strings. |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
-| `grpc.max_total_message_memory` | String | Unset | Maximum total memory for all concurrent gRPC request messages. Set to 0 to disable the limit. Default: "0" (unlimited) |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection. The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour. Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
| `grpc.tls.mode` | String | `disable` | TLS mode. |
@@ -227,7 +226,8 @@
| --- | -----| ------- | ----------- |
| `default_timezone` | String | Unset | The default timezone of the server. |
| `default_column_prefix` | String | Unset | The default column prefix for auto-created time index and value columns. |
-| `max_in_flight_write_bytes` | String | Unset | The maximum in-flight write bytes. |
+| `max_in_flight_write_bytes` | String | Unset | Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight). Set to 0 to disable the limit. Default: "0" (unlimited) |
+| `write_bytes_exhausted_policy` | String | Unset | Policy when write bytes quota is exhausted. Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail" |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -238,7 +238,6 @@
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
| `http.timeout` | String | `0s` | HTTP request timeout. Set to 0 to disable timeout. |
| `http.body_limit` | String | `64MB` | HTTP request body limit. The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`. Set to 0 to disable limit. |
-| `http.max_total_body_memory` | String | Unset | Maximum total memory for all concurrent HTTP request bodies. Set to 0 to disable the limit. Default: "0" (unlimited) |
| `http.enable_cors` | Bool | `true` | HTTP CORS support, it's turned on by default This allows browser to access http APIs without CORS restrictions |
| `http.cors_allowed_origins` | Array | Unset | Customize allowed origins for HTTP CORS. |
| `http.prom_validation_mode` | String | `strict` | Whether to enable validation for Prometheus remote write requests. Available options: - strict: deny invalid UTF-8 strings (default). - lossy: allow invalid UTF-8 strings, replace invalid characters with REPLACEMENT_CHARACTER(U+FFFD). - unchecked: do not valid strings. |
@@ -246,7 +245,6 @@
| `grpc.bind_addr` | String | `127.0.0.1:4001` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:4001` | The address advertised to the metasrv, and used for connections from outside the host. If left empty or unset, the server will automatically use the IP address of the first network interface on the host, with the same port number as the one specified in `grpc.bind_addr`. |
| `grpc.runtime_size` | Integer | `8` | The number of server worker threads. |
-| `grpc.max_total_message_memory` | String | Unset | Maximum total memory for all concurrent gRPC request messages. Set to 0 to disable the limit. Default: "0" (unlimited) |
| `grpc.flight_compression` | String | `arrow_ipc` | Compression mode for frontend side Arrow IPC service. Available options: - `none`: disable all compression - `transport`: only enable gRPC transport compression (zstd) - `arrow_ipc`: only enable Arrow IPC compression (lz4) - `all`: enable all compression. Default to `none` |
| `grpc.max_connection_age` | String | Unset | The maximum connection age for gRPC connection. The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour. Refer to https://grpc.io/docs/guides/keepalive/ for more details. |
| `grpc.tls` | -- | -- | gRPC server TLS options, see `mysql.tls` section. |
diff --git a/config/frontend.example.toml b/config/frontend.example.toml
index 5b9fe9f27b..435504b122 100644
--- a/config/frontend.example.toml
+++ b/config/frontend.example.toml
@@ -6,9 +6,15 @@ default_timezone = "UTC"
## @toml2docs:none-default
default_column_prefix = "greptime"
-## The maximum in-flight write bytes.
+## Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
+## Set to 0 to disable the limit. Default: "0" (unlimited)
## @toml2docs:none-default
-#+ max_in_flight_write_bytes = "500MB"
+#+ max_in_flight_write_bytes = "1GB"
+
+## Policy when write bytes quota is exhausted.
+## Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail"
+## @toml2docs:none-default
+#+ write_bytes_exhausted_policy = "wait"
## The runtime options.
#+ [runtime]
@@ -35,10 +41,6 @@ timeout = "0s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
-## Maximum total memory for all concurrent HTTP request bodies.
-## Set to 0 to disable the limit. Default: "0" (unlimited)
-## @toml2docs:none-default
-#+ max_total_body_memory = "1GB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
@@ -62,10 +64,6 @@ bind_addr = "127.0.0.1:4001"
server_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
-## Maximum total memory for all concurrent gRPC request messages.
-## Set to 0 to disable the limit. Default: "0" (unlimited)
-## @toml2docs:none-default
-#+ max_total_message_memory = "1GB"
## Compression mode for frontend side Arrow IPC service. Available options:
## - `none`: disable all compression
## - `transport`: only enable gRPC transport compression (zstd)
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 6eb9a70f40..ef96406316 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -6,6 +6,16 @@ default_timezone = "UTC"
## @toml2docs:none-default
default_column_prefix = "greptime"
+## Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
+## Set to 0 to disable the limit. Default: "0" (unlimited)
+## @toml2docs:none-default
+#+ max_in_flight_write_bytes = "1GB"
+
+## Policy when write bytes quota is exhausted.
+## Options: "wait" (default, 10s timeout), "wait()" (e.g., "wait(30s)"), "fail"
+## @toml2docs:none-default
+#+ write_bytes_exhausted_policy = "wait"
+
## Initialize all regions in the background during the startup.
## By default, it provides services after all regions have been initialized.
init_regions_in_background = false
@@ -22,10 +32,6 @@ max_concurrent_queries = 0
## Enable telemetry to collect anonymous usage data. Enabled by default.
#+ enable_telemetry = true
-## The maximum in-flight write bytes.
-## @toml2docs:none-default
-#+ max_in_flight_write_bytes = "500MB"
-
## The runtime options.
#+ [runtime]
## The number of threads to execute the runtime for global read operations.
@@ -43,10 +49,6 @@ timeout = "0s"
## The following units are supported: `B`, `KB`, `KiB`, `MB`, `MiB`, `GB`, `GiB`, `TB`, `TiB`, `PB`, `PiB`.
## Set to 0 to disable limit.
body_limit = "64MB"
-## Maximum total memory for all concurrent HTTP request bodies.
-## Set to 0 to disable the limit. Default: "0" (unlimited)
-## @toml2docs:none-default
-#+ max_total_body_memory = "1GB"
## HTTP CORS support, it's turned on by default
## This allows browser to access http APIs without CORS restrictions
enable_cors = true
@@ -67,10 +69,6 @@ prom_validation_mode = "strict"
bind_addr = "127.0.0.1:4001"
## The number of server worker threads.
runtime_size = 8
-## Maximum total memory for all concurrent gRPC request messages.
-## Set to 0 to disable the limit. Default: "0" (unlimited)
-## @toml2docs:none-default
-#+ max_total_message_memory = "1GB"
## The maximum connection age for gRPC connection.
## The value can be a human-readable time string. For example: `10m` for ten minutes or `1h` for one hour.
## Refer to https://grpc.io/docs/guides/keepalive/ for more details.
diff --git a/src/common/memory-manager/src/manager.rs b/src/common/memory-manager/src/manager.rs
index b29e08445c..50360d2a31 100644
--- a/src/common/memory-manager/src/manager.rs
+++ b/src/common/memory-manager/src/manager.rs
@@ -37,6 +37,12 @@ pub struct MemoryManager {
quota: Option>,
}
+impl Default for MemoryManager {
+ fn default() -> Self {
+ Self::new(0, M::default())
+ }
+}
+
#[derive(Clone)]
pub(crate) struct MemoryQuota {
pub(crate) semaphore: Arc,
diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs
index eae97756a5..3f46203ba0 100644
--- a/src/flow/src/server.rs
+++ b/src/flow/src/server.rs
@@ -490,7 +490,6 @@ impl<'a> FlownodeServiceBuilder<'a> {
let config = GrpcServerConfig {
max_recv_message_size: opts.grpc.max_recv_message_size.as_bytes() as usize,
max_send_message_size: opts.grpc.max_send_message_size.as_bytes() as usize,
- max_total_message_memory: opts.grpc.max_total_message_memory.as_bytes() as usize,
tls: opts.grpc.tls.clone(),
max_connection_age: opts.grpc.max_connection_age,
};
diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml
index c8c78f7d74..03b0d35130 100644
--- a/src/frontend/Cargo.toml
+++ b/src/frontend/Cargo.toml
@@ -32,6 +32,7 @@ common-frontend.workspace = true
common-function.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
+common-memory-manager.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-procedure.workspace = true
diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs
index 03b809d999..d148e6aa1b 100644
--- a/src/frontend/src/error.rs
+++ b/src/frontend/src/error.rs
@@ -357,14 +357,6 @@ pub enum Error {
location: Location,
},
- #[snafu(display("Failed to acquire more permits from limiter"))]
- AcquireLimiter {
- #[snafu(source)]
- error: tokio::sync::AcquireError,
- #[snafu(implicit)]
- location: Location,
- },
-
#[snafu(display("Service suspended"))]
Suspended {
#[snafu(implicit)]
@@ -449,8 +441,6 @@ impl ErrorExt for Error {
Error::StatementTimeout { .. } => StatusCode::Cancelled,
- Error::AcquireLimiter { .. } => StatusCode::Internal,
-
Error::Suspended { .. } => StatusCode::Suspended,
}
}
diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs
index 88a81c9576..8d698d65b1 100644
--- a/src/frontend/src/frontend.rs
+++ b/src/frontend/src/frontend.rs
@@ -17,6 +17,7 @@ use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_config::config::Configurable;
use common_event_recorder::EventRecorderOptions;
+use common_memory_manager::OnExhaustedPolicy;
use common_options::datanode::DatanodeClientOptions;
use common_options::memory::MemoryOptions;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
@@ -45,6 +46,12 @@ pub struct FrontendOptions {
pub default_timezone: Option,
pub default_column_prefix: Option,
pub heartbeat: HeartbeatOptions,
+ /// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
+ /// Set to 0 to disable the limit. Default: "0" (unlimited)
+ pub max_in_flight_write_bytes: ReadableSize,
+ /// Policy when write bytes quota is exhausted.
+ /// Options: "wait" (default, 10s), "wait()", "fail"
+ pub write_bytes_exhausted_policy: OnExhaustedPolicy,
pub http: HttpOptions,
pub grpc: GrpcOptions,
/// The internal gRPC options for the frontend service.
@@ -63,7 +70,6 @@ pub struct FrontendOptions {
pub user_provider: Option,
pub tracing: TracingOptions,
pub query: QueryOptions,
- pub max_in_flight_write_bytes: Option,
pub slow_query: SlowQueryOptions,
pub memory: MemoryOptions,
/// The event recorder options.
@@ -77,6 +83,8 @@ impl Default for FrontendOptions {
default_timezone: None,
default_column_prefix: None,
heartbeat: HeartbeatOptions::frontend_default(),
+ max_in_flight_write_bytes: ReadableSize(0),
+ write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
internal_grpc: None,
@@ -93,7 +101,6 @@ impl Default for FrontendOptions {
user_provider: None,
tracing: TracingOptions::default(),
query: QueryOptions::default(),
- max_in_flight_write_bytes: None,
slow_query: SlowQueryOptions::default(),
memory: MemoryOptions::default(),
event_recorder: EventRecorderOptions::default(),
diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs
index 73b9ed72a8..3c2479fdb5 100644
--- a/src/frontend/src/instance.rs
+++ b/src/frontend/src/instance.rs
@@ -97,7 +97,6 @@ use crate::error::{
ParseSqlSnafu, PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu,
StatementTimeoutSnafu, TableOperationSnafu,
};
-use crate::limiter::LimiterRef;
use crate::stream_wrapper::CancellableStreamWrapper;
lazy_static! {
@@ -118,7 +117,6 @@ pub struct Instance {
deleter: DeleterRef,
table_metadata_manager: TableMetadataManagerRef,
event_recorder: Option,
- limiter: Option,
process_manager: ProcessManagerRef,
slow_query_options: SlowQueryOptions,
suspend: Arc,
diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs
index bd3547b371..e384465d1b 100644
--- a/src/frontend/src/instance/builder.rs
+++ b/src/frontend/src/instance/builder.rs
@@ -49,7 +49,6 @@ use crate::events::EventHandlerImpl;
use crate::frontend::FrontendOptions;
use crate::instance::Instance;
use crate::instance::region_query::FrontendRegionQueryHandler;
-use crate::limiter::Limiter;
/// The frontend [`Instance`] builder.
pub struct FrontendBuilder {
@@ -248,14 +247,6 @@ impl FrontendBuilder {
self.options.event_recorder.ttl,
))));
- // Create the limiter if the max_in_flight_write_bytes is set.
- let limiter = self
- .options
- .max_in_flight_write_bytes
- .map(|max_in_flight_write_bytes| {
- Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes() as usize))
- });
-
Ok(Instance {
catalog_manager: self.catalog_manager,
pipeline_operator,
@@ -266,7 +257,6 @@ impl FrontendBuilder {
deleter,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
event_recorder: Some(event_recorder),
- limiter,
process_manager,
otlp_metrics_table_legacy_cache: DashMap::new(),
slow_query_options: self.options.slow_query.clone(),
diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs
index 9d3e3ac85c..93be8ae283 100644
--- a/src/frontend/src/instance/grpc.rs
+++ b/src/frontend/src/instance/grpc.rs
@@ -71,12 +71,6 @@ impl GrpcQueryHandler for Instance {
.check_permission(ctx.current_user(), PermissionReq::GrpcRequest(&request))
.context(PermissionSnafu)?;
- let _guard = if let Some(limiter) = &self.limiter {
- Some(limiter.limit_request(&request).await?)
- } else {
- None
- };
-
let output = match request {
Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?,
Request::RowInserts(requests) => {
diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs
index 1b8e0c9f46..0c63688262 100644
--- a/src/frontend/src/instance/influxdb.rs
+++ b/src/frontend/src/instance/influxdb.rs
@@ -22,7 +22,7 @@ use common_error::ext::BoxedError;
use common_time::Timestamp;
use common_time::timestamp::TimeUnit;
use servers::error::{
- AuthSnafu, CatalogSnafu, Error, OtherSnafu, TimestampOverflowSnafu, UnexpectedResultSnafu,
+ AuthSnafu, CatalogSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu,
};
use servers::influxdb::InfluxdbRequest;
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
@@ -59,18 +59,6 @@ impl InfluxdbLineProtocolHandler for Instance {
.post_lines_conversion(requests, ctx.clone())
.await?;
- let _guard = if let Some(limiter) = &self.limiter {
- Some(
- limiter
- .limit_row_inserts(&requests)
- .await
- .map_err(BoxedError::new)
- .context(OtherSnafu)?,
- )
- } else {
- None
- };
-
self.handle_influx_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs
index 946f121c37..ed3654559f 100644
--- a/src/frontend/src/instance/log_handler.rs
+++ b/src/frontend/src/instance/log_handler.rs
@@ -23,8 +23,7 @@ use datatypes::timestamp::TimestampNanosecond;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
- AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, OtherSnafu, PipelineSnafu,
- Result as ServerResult,
+ AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, PipelineSnafu, Result as ServerResult,
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
@@ -124,18 +123,6 @@ impl Instance {
log: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult {
- let _guard = if let Some(limiter) = &self.limiter {
- Some(
- limiter
- .limit_row_inserts(&log)
- .await
- .map_err(BoxedError::new)
- .context(OtherSnafu)?,
- )
- } else {
- None
- };
-
self.inserter
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
.await
@@ -148,18 +135,6 @@ impl Instance {
rows: RowInsertRequests,
ctx: QueryContextRef,
) -> ServerResult {
- let _guard = if let Some(limiter) = &self.limiter {
- Some(
- limiter
- .limit_row_inserts(&rows)
- .await
- .map_err(BoxedError::new)
- .context(OtherSnafu)?,
- )
- } else {
- None
- };
-
self.inserter
.handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
.await
diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs
index d7deb840cb..cd6105933d 100644
--- a/src/frontend/src/instance/opentsdb.rs
+++ b/src/frontend/src/instance/opentsdb.rs
@@ -16,7 +16,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
-use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu};
+use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu};
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
@@ -41,18 +41,6 @@ impl OpentsdbProtocolHandler for Instance {
let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
- let _guard = if let Some(limiter) = &self.limiter {
- Some(
- limiter
- .limit_row_inserts(&requests)
- .await
- .map_err(BoxedError::new)
- .context(OtherSnafu)?,
- )
- } else {
- None
- };
-
// OpenTSDB is single value.
let output = self
.handle_row_inserts(requests, ctx, true, true)
diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs
index f0b152e08d..52df274780 100644
--- a/src/frontend/src/instance/otlp.rs
+++ b/src/frontend/src/instance/otlp.rs
@@ -24,7 +24,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
-use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult};
+use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
@@ -83,18 +83,6 @@ impl OpenTelemetryProtocolHandler for Instance {
ctx
};
- let _guard = if let Some(limiter) = &self.limiter {
- Some(
- limiter
- .limit_row_inserts(&requests)
- .await
- .map_err(BoxedError::new)
- .context(OtherSnafu)?,
- )
- } else {
- None
- };
-
// If the user uses the legacy path, it is by default without metric engine.
if metric_ctx.is_legacy || !metric_ctx.with_metric_engine {
self.handle_row_inserts(requests, ctx, false, false)
@@ -191,18 +179,6 @@ impl OpenTelemetryProtocolHandler for Instance {
)
.await?;
- let _guard = if let Some(limiter) = &self.limiter {
- Some(
- limiter
- .limit_ctx_req(&opt_req)
- .await
- .map_err(BoxedError::new)
- .context(OtherSnafu)?,
- )
- } else {
- None
- };
-
let mut outputs = vec![];
for (temp_ctx, requests) in opt_req.as_req_iter(ctx) {
diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs
index e9c9499372..8a965a2453 100644
--- a/src/frontend/src/instance/prom_store.rs
+++ b/src/frontend/src/instance/prom_store.rs
@@ -175,18 +175,6 @@ impl PromStoreProtocolHandler for Instance {
.get::>();
interceptor_ref.pre_write(&request, ctx.clone())?;
- let _guard = if let Some(limiter) = &self.limiter {
- Some(
- limiter
- .limit_row_inserts(&request)
- .await
- .map_err(BoxedError::new)
- .context(error::OtherSnafu)?,
- )
- } else {
- None
- };
-
let output = if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs
index a77799f405..16321795b7 100644
--- a/src/frontend/src/lib.rs
+++ b/src/frontend/src/lib.rs
@@ -19,7 +19,6 @@ pub mod events;
pub mod frontend;
pub mod heartbeat;
pub mod instance;
-pub(crate) mod limiter;
pub(crate) mod metrics;
pub mod server;
pub mod service_config;
diff --git a/src/frontend/src/limiter.rs b/src/frontend/src/limiter.rs
deleted file mode 100644
index 1055267b2d..0000000000
--- a/src/frontend/src/limiter.rs
+++ /dev/null
@@ -1,332 +0,0 @@
-// Copyright 2023 Greptime Team
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-use std::sync::Arc;
-
-use api::v1::column::Values;
-use api::v1::greptime_request::Request;
-use api::v1::value::ValueData;
-use api::v1::{
- Decimal128, InsertRequests, IntervalMonthDayNano, JsonValue, RowInsertRequest,
- RowInsertRequests, json_value,
-};
-use pipeline::ContextReq;
-use snafu::ResultExt;
-use tokio::sync::{OwnedSemaphorePermit, Semaphore};
-
-use crate::error::{AcquireLimiterSnafu, Result};
-
-pub(crate) type LimiterRef = Arc;
-
-/// A frontend request limiter that controls the total size of in-flight write
-/// requests.
-pub(crate) struct Limiter {
- max_in_flight_write_bytes: usize,
- byte_counter: Arc,
-}
-
-impl Limiter {
- pub fn new(max_in_flight_write_bytes: usize) -> Self {
- Self {
- byte_counter: Arc::new(Semaphore::new(max_in_flight_write_bytes)),
- max_in_flight_write_bytes,
- }
- }
-
- pub async fn limit_request(&self, request: &Request) -> Result {
- let size = match request {
- Request::Inserts(requests) => self.insert_requests_data_size(requests),
- Request::RowInserts(requests) => {
- self.rows_insert_requests_data_size(requests.inserts.iter())
- }
- _ => 0,
- };
- self.limit_in_flight_write_bytes(size).await
- }
-
- pub async fn limit_row_inserts(
- &self,
- requests: &RowInsertRequests,
- ) -> Result {
- let size = self.rows_insert_requests_data_size(requests.inserts.iter());
- self.limit_in_flight_write_bytes(size).await
- }
-
- pub async fn limit_ctx_req(&self, opt_req: &ContextReq) -> Result {
- let size = self.rows_insert_requests_data_size(opt_req.ref_all_req());
- self.limit_in_flight_write_bytes(size).await
- }
-
- /// Await until more inflight bytes are available
- pub async fn limit_in_flight_write_bytes(&self, bytes: usize) -> Result {
- self.byte_counter
- .clone()
- .acquire_many_owned(bytes as u32)
- .await
- .context(AcquireLimiterSnafu)
- }
-
- /// Returns the current in-flight write bytes.
- #[allow(dead_code)]
- pub fn in_flight_write_bytes(&self) -> usize {
- self.max_in_flight_write_bytes - self.byte_counter.available_permits()
- }
-
- fn insert_requests_data_size(&self, request: &InsertRequests) -> usize {
- let mut size: usize = 0;
- for insert in &request.inserts {
- for column in &insert.columns {
- if let Some(values) = &column.values {
- size += Self::size_of_column_values(values);
- }
- }
- }
- size
- }
-
- fn rows_insert_requests_data_size<'a>(
- &self,
- inserts: impl Iterator- ,
- ) -> usize {
- let mut size: usize = 0;
- for insert in inserts {
- if let Some(rows) = &insert.rows {
- for row in &rows.rows {
- for value in &row.values {
- if let Some(value) = &value.value_data {
- size += Self::size_of_value_data(value);
- }
- }
- }
- }
- }
- size
- }
-
- fn size_of_column_values(values: &Values) -> usize {
- let mut size: usize = 0;
- size += values.i8_values.len() * size_of::
();
- size += values.i16_values.len() * size_of::();
- size += values.i32_values.len() * size_of::();
- size += values.i64_values.len() * size_of::();
- size += values.u8_values.len() * size_of::();
- size += values.u16_values.len() * size_of::();
- size += values.u32_values.len() * size_of::();
- size += values.u64_values.len() * size_of::();
- size += values.f32_values.len() * size_of::();
- size += values.f64_values.len() * size_of::();
- size += values.bool_values.len() * size_of::();
- size += values
- .binary_values
- .iter()
- .map(|v| v.len() * size_of::())
- .sum::();
- size += values.string_values.iter().map(|v| v.len()).sum::();
- size += values.date_values.len() * size_of::();
- size += values.datetime_values.len() * size_of::();
- size += values.timestamp_second_values.len() * size_of::();
- size += values.timestamp_millisecond_values.len() * size_of::();
- size += values.timestamp_microsecond_values.len() * size_of::();
- size += values.timestamp_nanosecond_values.len() * size_of::();
- size += values.time_second_values.len() * size_of::();
- size += values.time_millisecond_values.len() * size_of::();
- size += values.time_microsecond_values.len() * size_of::();
- size += values.time_nanosecond_values.len() * size_of::();
- size += values.interval_year_month_values.len() * size_of::();
- size += values.interval_day_time_values.len() * size_of::();
- size += values.interval_month_day_nano_values.len() * size_of::();
- size += values.decimal128_values.len() * size_of::();
- size += values
- .list_values
- .iter()
- .map(|v| {
- v.items
- .iter()
- .map(|item| {
- item.value_data
- .as_ref()
- .map(Self::size_of_value_data)
- .unwrap_or(0)
- })
- .sum::()
- })
- .sum::();
- size += values
- .struct_values
- .iter()
- .map(|v| {
- v.items
- .iter()
- .map(|item| {
- item.value_data
- .as_ref()
- .map(Self::size_of_value_data)
- .unwrap_or(0)
- })
- .sum::()
- })
- .sum::();
-
- size
- }
-
- fn size_of_value_data(value: &ValueData) -> usize {
- match value {
- ValueData::I8Value(_) => size_of::(),
- ValueData::I16Value(_) => size_of::(),
- ValueData::I32Value(_) => size_of::(),
- ValueData::I64Value(_) => size_of::(),
- ValueData::U8Value(_) => size_of::(),
- ValueData::U16Value(_) => size_of::(),
- ValueData::U32Value(_) => size_of::(),
- ValueData::U64Value(_) => size_of::(),
- ValueData::F32Value(_) => size_of::(),
- ValueData::F64Value(_) => size_of::(),
- ValueData::BoolValue(_) => size_of::(),
- ValueData::BinaryValue(v) => v.len() * size_of::(),
- ValueData::StringValue(v) => v.len(),
- ValueData::DateValue(_) => size_of::(),
- ValueData::DatetimeValue(_) => size_of::(),
- ValueData::TimestampSecondValue(_) => size_of::(),
- ValueData::TimestampMillisecondValue(_) => size_of::(),
- ValueData::TimestampMicrosecondValue(_) => size_of::(),
- ValueData::TimestampNanosecondValue(_) => size_of::(),
- ValueData::TimeSecondValue(_) => size_of::(),
- ValueData::TimeMillisecondValue(_) => size_of::(),
- ValueData::TimeMicrosecondValue(_) => size_of::(),
- ValueData::TimeNanosecondValue(_) => size_of::(),
- ValueData::IntervalYearMonthValue(_) => size_of::(),
- ValueData::IntervalDayTimeValue(_) => size_of::(),
- ValueData::IntervalMonthDayNanoValue(_) => size_of::(),
- ValueData::Decimal128Value(_) => size_of::(),
- ValueData::ListValue(list_values) => list_values
- .items
- .iter()
- .map(|item| {
- item.value_data
- .as_ref()
- .map(Self::size_of_value_data)
- .unwrap_or(0)
- })
- .sum(),
- ValueData::StructValue(struct_values) => struct_values
- .items
- .iter()
- .map(|item| {
- item.value_data
- .as_ref()
- .map(Self::size_of_value_data)
- .unwrap_or(0)
- })
- .sum(),
- ValueData::JsonValue(v) => {
- fn calc(v: &JsonValue) -> usize {
- let Some(value) = v.value.as_ref() else {
- return 0;
- };
- match value {
- json_value::Value::Boolean(_) => size_of::(),
- json_value::Value::Int(_) => size_of::(),
- json_value::Value::Uint(_) => size_of::(),
- json_value::Value::Float(_) => size_of::(),
- json_value::Value::Str(s) => s.len(),
- json_value::Value::Array(array) => array.items.iter().map(calc).sum(),
- json_value::Value::Object(object) => object
- .entries
- .iter()
- .flat_map(|entry| {
- entry.value.as_ref().map(|v| entry.key.len() + calc(v))
- })
- .sum(),
- }
- }
- calc(v)
- }
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use api::v1::column::Values;
- use api::v1::greptime_request::Request;
- use api::v1::{Column, InsertRequest};
-
- use super::*;
-
- fn generate_request(size: usize) -> Request {
- let i8_values = vec![0; size / 4];
- Request::Inserts(InsertRequests {
- inserts: vec![InsertRequest {
- columns: vec![Column {
- values: Some(Values {
- i8_values,
- ..Default::default()
- }),
- ..Default::default()
- }],
- ..Default::default()
- }],
- })
- }
-
- #[tokio::test]
- async fn test_limiter() {
- let limiter_ref: LimiterRef = Arc::new(Limiter::new(1024));
- let tasks_count = 10;
- let request_data_size = 100;
- let mut handles = vec![];
-
- // Generate multiple requests to test the limiter.
- for _ in 0..tasks_count {
- let limiter = limiter_ref.clone();
- let handle = tokio::spawn(async move {
- let result = limiter
- .limit_request(&generate_request(request_data_size))
- .await;
- assert!(result.is_ok());
- });
- handles.push(handle);
- }
-
- // Wait for all threads to complete.
- for handle in handles {
- handle.await.unwrap();
- }
- }
-
- #[tokio::test]
- async fn test_in_flight_write_bytes() {
- let limiter_ref: LimiterRef = Arc::new(Limiter::new(1024));
- let req1 = generate_request(100);
- let result1 = limiter_ref
- .limit_request(&req1)
- .await
- .expect("failed to acquire permits");
- assert_eq!(limiter_ref.in_flight_write_bytes(), 100);
-
- let req2 = generate_request(200);
- let result2 = limiter_ref
- .limit_request(&req2)
- .await
- .expect("failed to acquire permits");
- assert_eq!(limiter_ref.in_flight_write_bytes(), 300);
-
- drop(result1);
- assert_eq!(limiter_ref.in_flight_write_bytes(), 200);
-
- drop(result2);
- assert_eq!(limiter_ref.in_flight_write_bytes(), 0);
- }
-}
diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs
index be14fb3cd2..a7f3d55f6b 100644
--- a/src/frontend/src/server.rs
+++ b/src/frontend/src/server.rs
@@ -40,6 +40,7 @@ use servers::otel_arrow::OtelArrowServiceHandler;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
+use servers::request_memory_limiter::ServerMemoryLimiter;
use servers::server::{Server, ServerHandlers};
use servers::tls::{ReloadableTlsServerConfig, maybe_watch_server_tls_config};
use snafu::ResultExt;
@@ -76,15 +77,25 @@ where
}
}
- pub fn grpc_server_builder(&self, opts: &GrpcOptions) -> Result {
+ pub fn grpc_server_builder(
+ &self,
+ opts: &GrpcOptions,
+ request_memory_limiter: ServerMemoryLimiter,
+ ) -> Result {
let builder = GrpcServerBuilder::new(opts.as_config(), common_runtime::global_runtime())
+ .with_memory_limiter(request_memory_limiter)
.with_tls_config(opts.tls.clone())
.context(error::InvalidTlsConfigSnafu)?;
Ok(builder)
}
- pub fn http_server_builder(&self, opts: &FrontendOptions) -> HttpServerBuilder {
+ pub fn http_server_builder(
+ &self,
+ opts: &FrontendOptions,
+ request_memory_limiter: ServerMemoryLimiter,
+ ) -> HttpServerBuilder {
let mut builder = HttpServerBuilder::new(opts.http.clone())
+ .with_memory_limiter(request_memory_limiter)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(self.instance.clone()));
let validator = self.plugins.get::();
@@ -169,11 +180,12 @@ where
meta_client: &Option,
name: Option,
external: bool,
+ request_memory_limiter: ServerMemoryLimiter,
) -> Result {
let builder = if let Some(builder) = self.grpc_server_builder.take() {
builder
} else {
- self.grpc_server_builder(grpc)?
+ self.grpc_server_builder(grpc, request_memory_limiter)?
};
let user_provider = if external {
@@ -235,11 +247,16 @@ where
Ok(grpc_server)
}
- fn build_http_server(&mut self, opts: &FrontendOptions, toml: String) -> Result {
+ fn build_http_server(
+ &mut self,
+ opts: &FrontendOptions,
+ toml: String,
+ request_memory_limiter: ServerMemoryLimiter,
+ ) -> Result {
let builder = if let Some(builder) = self.http_server_builder.take() {
builder
} else {
- self.http_server_builder(opts)
+ self.http_server_builder(opts, request_memory_limiter)
};
let http_server = builder
@@ -257,6 +274,12 @@ where
let toml = opts.to_toml().context(TomlFormatSnafu)?;
let opts: FrontendOptions = opts.into();
+ // Create request memory limiter for all server protocols
+ let request_memory_limiter = ServerMemoryLimiter::new(
+ opts.max_in_flight_write_bytes.as_bytes(),
+ opts.write_bytes_exhausted_policy,
+ );
+
let handlers = ServerHandlers::default();
let user_provider = self.plugins.get::();
@@ -264,7 +287,13 @@ where
{
// Always init GRPC server
let grpc_addr = parse_addr(&opts.grpc.bind_addr)?;
- let grpc_server = self.build_grpc_server(&opts.grpc, &opts.meta_client, None, true)?;
+ let grpc_server = self.build_grpc_server(
+ &opts.grpc,
+ &opts.meta_client,
+ None,
+ true,
+ request_memory_limiter.clone(),
+ )?;
handlers.insert((Box::new(grpc_server), grpc_addr));
}
@@ -276,6 +305,7 @@ where
&opts.meta_client,
Some("INTERNAL_GRPC_SERVER".to_string()),
false,
+ request_memory_limiter.clone(),
)?;
handlers.insert((Box::new(grpc_server), grpc_addr));
}
@@ -284,7 +314,8 @@ where
// Always init HTTP server
let http_options = &opts.http;
let http_addr = parse_addr(&http_options.addr)?;
- let http_server = self.build_http_server(&opts, toml)?;
+ let http_server =
+ self.build_http_server(&opts, toml, request_memory_limiter.clone())?;
handlers.insert((Box::new(http_server), http_addr));
}
diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml
index 40cac587d7..f4f046b4d0 100644
--- a/src/servers/Cargo.toml
+++ b/src/servers/Cargo.toml
@@ -41,6 +41,7 @@ common-frontend.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-mem-prof = { workspace = true, optional = true }
+common-memory-manager.workspace = true
common-meta.workspace = true
common-plugins.workspace = true
common-pprof = { workspace = true, optional = true }
diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs
index c52ddb7b34..edbc0e2038 100644
--- a/src/servers/src/error.rs
+++ b/src/servers/src/error.rs
@@ -95,6 +95,13 @@ pub enum Error {
error: tonic::transport::Error,
},
+ #[snafu(display("Request memory limit exceeded"))]
+ MemoryLimitExceeded {
+ #[snafu(implicit)]
+ location: Location,
+ source: common_memory_manager::Error,
+ },
+
#[snafu(display("{} server is already started", server))]
AlreadyStarted {
server: String,
@@ -785,6 +792,8 @@ impl ErrorExt for Error {
Cancelled { .. } => StatusCode::Cancelled,
Suspended { .. } => StatusCode::Suspended,
+
+ MemoryLimitExceeded { .. } => StatusCode::RateLimited,
}
}
diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs
index ff27e80c69..dbb2975533 100644
--- a/src/servers/src/grpc.rs
+++ b/src/servers/src/grpc.rs
@@ -52,7 +52,6 @@ use crate::error::{AlreadyStartedSnafu, InternalSnafu, Result, StartGrpcSnafu, T
use crate::metrics::MetricsMiddlewareLayer;
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
-use crate::request_limiter::RequestMemoryLimiter;
use crate::server::Server;
use crate::tls::TlsOption;
@@ -69,8 +68,6 @@ pub struct GrpcOptions {
pub max_recv_message_size: ReadableSize,
/// Max gRPC sending(encoding) message size
pub max_send_message_size: ReadableSize,
- /// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
- pub max_total_message_memory: ReadableSize,
/// Compression mode in Arrow Flight service.
pub flight_compression: FlightCompression,
pub runtime_size: usize,
@@ -126,7 +123,6 @@ impl GrpcOptions {
GrpcServerConfig {
max_recv_message_size: self.max_recv_message_size.as_bytes() as usize,
max_send_message_size: self.max_send_message_size.as_bytes() as usize,
- max_total_message_memory: self.max_total_message_memory.as_bytes() as usize,
tls: self.tls.clone(),
max_connection_age: self.max_connection_age,
}
@@ -145,7 +141,6 @@ impl Default for GrpcOptions {
server_addr: String::new(),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
- max_total_message_memory: ReadableSize(0),
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
@@ -167,7 +162,6 @@ impl GrpcOptions {
server_addr: format!("127.0.0.1:{}", DEFAULT_INTERNAL_GRPC_ADDR_PORT),
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE,
- max_total_message_memory: ReadableSize(0),
flight_compression: FlightCompression::ArrowIpc,
runtime_size: 8,
tls: TlsOption::default(),
@@ -234,7 +228,6 @@ pub struct GrpcServer {
bind_addr: Option,
name: Option,
config: GrpcServerConfig,
- memory_limiter: RequestMemoryLimiter,
}
/// Grpc Server configuration
@@ -244,8 +237,6 @@ pub struct GrpcServerConfig {
pub max_recv_message_size: usize,
// Max gRPC sending(encoding) message size
pub max_send_message_size: usize,
- /// Maximum total memory for all concurrent gRPC request messages. 0 disables the limit.
- pub max_total_message_memory: usize,
pub tls: TlsOption,
/// Maximum time that a channel may exist.
/// Useful when the server wants to control the reconnection of its clients.
@@ -258,7 +249,6 @@ impl Default for GrpcServerConfig {
Self {
max_recv_message_size: DEFAULT_MAX_GRPC_RECV_MESSAGE_SIZE.as_bytes() as usize,
max_send_message_size: DEFAULT_MAX_GRPC_SEND_MESSAGE_SIZE.as_bytes() as usize,
- max_total_message_memory: 0,
tls: TlsOption::default(),
max_connection_age: None,
}
@@ -298,11 +288,6 @@ impl GrpcServer {
}
Ok(())
}
-
- /// Get the memory limiter for monitoring current memory usage
- pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
- &self.memory_limiter
- }
}
pub struct HealthCheckHandler;
diff --git a/src/servers/src/grpc/builder.rs b/src/servers/src/grpc/builder.rs
index 129f07c3c5..d240cde3f1 100644
--- a/src/servers/src/grpc/builder.rs
+++ b/src/servers/src/grpc/builder.rs
@@ -46,7 +46,7 @@ use crate::grpc::{GrpcServer, GrpcServerConfig};
use crate::otel_arrow::{HeaderInterceptor, OtelArrowServiceHandler};
use crate::prometheus_handler::PrometheusHandlerRef;
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
-use crate::request_limiter::RequestMemoryLimiter;
+use crate::request_memory_limiter::ServerMemoryLimiter;
use crate::tls::TlsOption;
/// Add a gRPC service (`service`) to a `builder`([RoutesBuilder]).
@@ -92,12 +92,14 @@ pub struct GrpcServerBuilder {
HeaderInterceptor,
>,
>,
- memory_limiter: RequestMemoryLimiter,
+ memory_limiter: ServerMemoryLimiter,
}
impl GrpcServerBuilder {
pub fn new(config: GrpcServerConfig, runtime: Runtime) -> Self {
- let memory_limiter = RequestMemoryLimiter::new(config.max_total_message_memory);
+ // Create a default unlimited limiter (can be overridden with with_memory_limiter)
+ let memory_limiter = ServerMemoryLimiter::default();
+
Self {
name: None,
config,
@@ -109,6 +111,12 @@ impl GrpcServerBuilder {
}
}
+ /// Set a global memory limiter for all server protocols.
+ pub fn with_memory_limiter(mut self, limiter: ServerMemoryLimiter) -> Self {
+ self.memory_limiter = limiter;
+ self
+ }
+
pub fn config(&self) -> &GrpcServerConfig {
&self.config
}
@@ -117,7 +125,7 @@ impl GrpcServerBuilder {
&self.runtime
}
- pub fn memory_limiter(&self) -> &RequestMemoryLimiter {
+ pub fn memory_limiter(&self) -> &ServerMemoryLimiter {
&self.memory_limiter
}
@@ -238,7 +246,6 @@ impl GrpcServerBuilder {
bind_addr: None,
name: self.name,
config: self.config,
- memory_limiter: self.memory_limiter,
}
}
}
diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs
index 5d132c434e..79904806ab 100644
--- a/src/servers/src/grpc/database.rs
+++ b/src/servers/src/grpc/database.rs
@@ -26,8 +26,7 @@ use tonic::{Request, Response, Status, Streaming};
use crate::grpc::greptime_handler::GreptimeRequestHandler;
use crate::grpc::{TonicResult, cancellation};
use crate::hint_headers;
-use crate::metrics::{METRIC_GRPC_MEMORY_USAGE_BYTES, METRIC_GRPC_REQUESTS_REJECTED_TOTAL};
-use crate::request_limiter::RequestMemoryLimiter;
+use crate::request_memory_limiter::ServerMemoryLimiter;
pub(crate) struct DatabaseService {
handler: GreptimeRequestHandler,
@@ -52,25 +51,12 @@ impl GreptimeDatabase for DatabaseService {
remote_addr, hints
);
- let _guard = request
- .extensions()
- .get::()
- .filter(|limiter| limiter.is_enabled())
- .and_then(|limiter| {
- let message_size = request.get_ref().encoded_len();
- limiter
- .try_acquire(message_size)
- .map(|guard| {
- guard.inspect(|g| {
- METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
- })
- })
- .inspect_err(|_| {
- METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
- })
- .transpose()
- })
- .transpose()?;
+ let _guard = if let Some(limiter) = request.extensions().get::() {
+ let message_size = request.get_ref().encoded_len() as u64;
+ Some(limiter.acquire(message_size).await?)
+ } else {
+ None
+ };
let handler = self.handler.clone();
let request_future = async move {
@@ -119,7 +105,7 @@ impl GreptimeDatabase for DatabaseService {
remote_addr, hints
);
- let limiter = request.extensions().get::().cloned();
+ let limiter = request.extensions().get::().cloned();
let handler = self.handler.clone();
let request_future = async move {
@@ -129,24 +115,12 @@ impl GreptimeDatabase for DatabaseService {
while let Some(request) = stream.next().await {
let request = request?;
- let _guard = limiter
- .as_ref()
- .filter(|limiter| limiter.is_enabled())
- .and_then(|limiter| {
- let message_size = request.encoded_len();
- limiter
- .try_acquire(message_size)
- .map(|guard| {
- guard.inspect(|g| {
- METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
- })
- })
- .inspect_err(|_| {
- METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
- })
- .transpose()
- })
- .transpose()?;
+ let _guard = if let Some(limiter_ref) = &limiter {
+ let message_size = request.encoded_len() as u64;
+ Some(limiter_ref.acquire(message_size).await?)
+ } else {
+ None
+ };
let output = handler.handle_request(request, hints.clone()).await?;
match output.data {
OutputData::AffectedRows(rows) => affected_rows += rows,
diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs
index a3835b14ff..02755fcfd0 100644
--- a/src/servers/src/grpc/flight.rs
+++ b/src/servers/src/grpc/flight.rs
@@ -29,6 +29,7 @@ use bytes;
use bytes::Bytes;
use common_grpc::flight::do_put::{DoPutMetadata, DoPutResponse};
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
+use common_memory_manager::MemoryGuard;
use common_query::{Output, OutputData};
use common_recordbatch::DfRecordBatch;
use common_telemetry::debug;
@@ -39,7 +40,7 @@ use futures::{Stream, future, ready};
use futures_util::{StreamExt, TryStreamExt};
use prost::Message;
use session::context::{QueryContext, QueryContextRef};
-use snafu::{ResultExt, ensure};
+use snafu::{IntoError, ResultExt, ensure};
use table::table_name::TableName;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
@@ -49,8 +50,8 @@ use crate::error::{InvalidParameterSnafu, Result, ToJsonSnafu};
pub use crate::grpc::flight::stream::FlightRecordBatchStream;
use crate::grpc::greptime_handler::{GreptimeRequestHandler, get_request_type};
use crate::grpc::{FlightCompression, TonicResult, context_auth};
-use crate::metrics::{METRIC_GRPC_MEMORY_USAGE_BYTES, METRIC_GRPC_REQUESTS_REJECTED_TOTAL};
-use crate::request_limiter::{RequestMemoryGuard, RequestMemoryLimiter};
+use crate::request_memory_limiter::ServerMemoryLimiter;
+use crate::request_memory_metrics::RequestMemoryMetrics;
use crate::{error, hint_headers};
pub type TonicStream = Pin> + Send + 'static>>;
@@ -219,7 +220,7 @@ impl FlightCraft for GreptimeRequestHandler {
) -> TonicResult>> {
let (headers, extensions, stream) = request.into_parts();
- let limiter = extensions.get::().cloned();
+ let limiter = extensions.get::().cloned();
let query_ctx = context_auth::create_query_context_from_grpc_metadata(&headers)?;
context_auth::check_auth(self.user_provider.clone(), &headers, query_ctx.clone()).await?;
@@ -260,7 +261,7 @@ pub struct PutRecordBatchRequest {
pub record_batch: DfRecordBatch,
pub schema_bytes: Bytes,
pub flight_data: FlightData,
- pub(crate) _guard: Option,
+ pub(crate) _guard: Option>,
}
impl PutRecordBatchRequest {
@@ -270,28 +271,24 @@ impl PutRecordBatchRequest {
request_id: i64,
schema_bytes: Bytes,
flight_data: FlightData,
- limiter: Option<&RequestMemoryLimiter>,
+ limiter: Option<&ServerMemoryLimiter>,
) -> Result {
let memory_usage = flight_data.data_body.len()
+ flight_data.app_metadata.len()
+ flight_data.data_header.len();
- let _guard = limiter
- .filter(|limiter| limiter.is_enabled())
- .map(|limiter| {
- limiter
- .try_acquire(memory_usage)
- .map(|guard| {
- guard.inspect(|g| {
- METRIC_GRPC_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
- })
- })
- .inspect_err(|_| {
- METRIC_GRPC_REQUESTS_REJECTED_TOTAL.inc();
- })
- })
- .transpose()?
- .flatten();
+ let _guard = if let Some(limiter) = limiter {
+ let guard = limiter.try_acquire(memory_usage as u64).ok_or_else(|| {
+ let inner_err = common_memory_manager::Error::MemoryLimitExceeded {
+ requested_bytes: memory_usage as u64,
+ limit_bytes: limiter.limit_bytes(),
+ };
+ error::MemoryLimitExceededSnafu.into_error(inner_err)
+ })?;
+ Some(guard)
+ } else {
+ None
+ };
Ok(Self {
table_name,
@@ -308,7 +305,7 @@ pub struct PutRecordBatchRequestStream {
flight_data_stream: Streaming,
catalog: String,
schema_name: String,
- limiter: Option,
+ limiter: Option,
// Client now lazily sends schema data so we cannot eagerly wait for it.
// Instead, we need to decode while receiving record batches.
state: StreamState,
@@ -331,7 +328,7 @@ impl PutRecordBatchRequestStream {
flight_data_stream: Streaming,
catalog: String,
schema: String,
- limiter: Option,
+ limiter: Option,
) -> TonicResult {
Ok(Self {
flight_data_stream,
@@ -395,7 +392,6 @@ impl Stream for PutRecordBatchRequestStream {
match poll {
Some(Ok(flight_data)) => {
- // Clone limiter once to avoid borrowing issues
let limiter = self.limiter.clone();
match &mut self.state {
diff --git a/src/servers/src/grpc/memory_limit.rs b/src/servers/src/grpc/memory_limit.rs
index a3dee9da57..722a2c68c0 100644
--- a/src/servers/src/grpc/memory_limit.rs
+++ b/src/servers/src/grpc/memory_limit.rs
@@ -18,15 +18,15 @@ use futures::future::BoxFuture;
use tonic::server::NamedService;
use tower::{Layer, Service};
-use crate::request_limiter::RequestMemoryLimiter;
+use crate::request_memory_limiter::ServerMemoryLimiter;
#[derive(Clone)]
pub struct MemoryLimiterExtensionLayer {
- limiter: RequestMemoryLimiter,
+ limiter: ServerMemoryLimiter,
}
impl MemoryLimiterExtensionLayer {
- pub fn new(limiter: RequestMemoryLimiter) -> Self {
+ pub fn new(limiter: ServerMemoryLimiter) -> Self {
Self { limiter }
}
}
@@ -45,7 +45,7 @@ impl Layer for MemoryLimiterExtensionLayer {
#[derive(Clone)]
pub struct MemoryLimiterExtensionService {
inner: S,
- limiter: RequestMemoryLimiter,
+ limiter: ServerMemoryLimiter,
}
impl NamedService for MemoryLimiterExtensionService {
diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs
index 2d2b3a4320..b99e8136df 100644
--- a/src/servers/src/http.rs
+++ b/src/servers/src/http.rs
@@ -83,7 +83,7 @@ use crate::query_handler::{
OpenTelemetryProtocolHandlerRef, OpentsdbProtocolHandlerRef, PipelineHandlerRef,
PromStoreProtocolHandlerRef,
};
-use crate::request_limiter::RequestMemoryLimiter;
+use crate::request_memory_limiter::ServerMemoryLimiter;
use crate::server::Server;
pub mod authorize;
@@ -134,7 +134,7 @@ pub struct HttpServer {
router: StdMutex,
shutdown_tx: Mutex>>,
user_provider: Option,
- memory_limiter: RequestMemoryLimiter,
+ memory_limiter: ServerMemoryLimiter,
// plugins
plugins: Plugins,
@@ -157,9 +157,6 @@ pub struct HttpOptions {
pub body_limit: ReadableSize,
- /// Maximum total memory for all concurrent HTTP request bodies. 0 disables the limit.
- pub max_total_body_memory: ReadableSize,
-
/// Validation mode while decoding Prometheus remote write requests.
pub prom_validation_mode: PromValidationMode,
@@ -204,7 +201,6 @@ impl Default for HttpOptions {
timeout: Duration::from_secs(0),
disable_dashboard: false,
body_limit: DEFAULT_BODY_LIMIT,
- max_total_body_memory: ReadableSize(0),
cors_allowed_origins: Vec::new(),
enable_cors: true,
prom_validation_mode: PromValidationMode::Strict,
@@ -539,12 +535,12 @@ pub struct GreptimeOptionsConfigState {
pub greptime_config_options: String,
}
-#[derive(Default)]
pub struct HttpServerBuilder {
options: HttpOptions,
plugins: Plugins,
user_provider: Option,
router: Router,
+ memory_limiter: ServerMemoryLimiter,
}
impl HttpServerBuilder {
@@ -554,9 +550,16 @@ impl HttpServerBuilder {
plugins: Plugins::default(),
user_provider: None,
router: Router::new(),
+ memory_limiter: ServerMemoryLimiter::default(),
}
}
+ /// Set a global memory limiter for all server protocols.
+ pub fn with_memory_limiter(mut self, limiter: ServerMemoryLimiter) -> Self {
+ self.memory_limiter = limiter;
+ self
+ }
+
pub fn with_sql_handler(self, sql_handler: ServerSqlQueryHandlerRef) -> Self {
let sql_router = HttpServer::route_sql(ApiState { sql_handler });
@@ -750,8 +753,6 @@ impl HttpServerBuilder {
}
pub fn build(self) -> HttpServer {
- let memory_limiter =
- RequestMemoryLimiter::new(self.options.max_total_body_memory.as_bytes() as usize);
HttpServer {
options: self.options,
user_provider: self.user_provider,
@@ -759,7 +760,7 @@ impl HttpServerBuilder {
plugins: self.plugins,
router: StdMutex::new(self.router),
bind_addr: None,
- memory_limiter,
+ memory_limiter: self.memory_limiter,
}
}
}
diff --git a/src/servers/src/http/memory_limit.rs b/src/servers/src/http/memory_limit.rs
index 346b5d3409..50b4c68f03 100644
--- a/src/servers/src/http/memory_limit.rs
+++ b/src/servers/src/http/memory_limit.rs
@@ -19,11 +19,10 @@ use axum::middleware::Next;
use axum::response::{IntoResponse, Response};
use http::StatusCode;
-use crate::metrics::{METRIC_HTTP_MEMORY_USAGE_BYTES, METRIC_HTTP_REQUESTS_REJECTED_TOTAL};
-use crate::request_limiter::RequestMemoryLimiter;
+use crate::request_memory_limiter::ServerMemoryLimiter;
pub async fn memory_limit_middleware(
- State(limiter): State,
+ State(limiter): State,
req: Request,
next: Next,
) -> Response {
@@ -31,15 +30,12 @@ pub async fn memory_limit_middleware(
.headers()
.get(http::header::CONTENT_LENGTH)
.and_then(|v| v.to_str().ok())
- .and_then(|v| v.parse::().ok())
+ .and_then(|v| v.parse::().ok())
.unwrap_or(0);
- let _guard = match limiter.try_acquire(content_length) {
- Ok(guard) => guard.inspect(|g| {
- METRIC_HTTP_MEMORY_USAGE_BYTES.set(g.current_usage() as i64);
- }),
+ let _guard = match limiter.acquire(content_length).await {
+ Ok(guard) => guard,
Err(e) => {
- METRIC_HTTP_REQUESTS_REJECTED_TOTAL.inc();
return (
StatusCode::TOO_MANY_REQUESTS,
format!("Request body memory limit exceeded: {}", e),
diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs
index 40635cd036..8f77b853f9 100644
--- a/src/servers/src/lib.rs
+++ b/src/servers/src/lib.rs
@@ -50,7 +50,8 @@ pub mod prometheus_handler;
pub mod proto;
pub mod query_handler;
pub mod repeated_field;
-pub mod request_limiter;
+pub mod request_memory_limiter;
+pub mod request_memory_metrics;
mod row_writer;
pub mod server;
pub mod tls;
diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs
index 8662465f94..25a900ed3d 100644
--- a/src/servers/src/metrics.rs
+++ b/src/servers/src/metrics.rs
@@ -299,24 +299,24 @@ lazy_static! {
"servers handle bulk insert elapsed",
).unwrap();
- pub static ref METRIC_HTTP_MEMORY_USAGE_BYTES: IntGauge = register_int_gauge!(
- "greptime_servers_http_memory_usage_bytes",
- "current http request memory usage in bytes"
+ // Unified request memory metrics
+ /// Current memory in use by all concurrent requests (HTTP, gRPC, Flight).
+ pub static ref REQUEST_MEMORY_IN_USE: IntGauge = register_int_gauge!(
+ "greptime_servers_request_memory_in_use_bytes",
+ "bytes currently reserved for all concurrent request bodies and messages"
).unwrap();
- pub static ref METRIC_HTTP_REQUESTS_REJECTED_TOTAL: IntCounter = register_int_counter!(
- "greptime_servers_http_requests_rejected_total",
- "total number of http requests rejected due to memory limit"
+ /// Maximum configured memory for all concurrent requests.
+ pub static ref REQUEST_MEMORY_LIMIT: IntGauge = register_int_gauge!(
+ "greptime_servers_request_memory_limit_bytes",
+ "maximum bytes allowed for all concurrent request bodies and messages"
).unwrap();
- pub static ref METRIC_GRPC_MEMORY_USAGE_BYTES: IntGauge = register_int_gauge!(
- "greptime_servers_grpc_memory_usage_bytes",
- "current grpc request memory usage in bytes"
- ).unwrap();
-
- pub static ref METRIC_GRPC_REQUESTS_REJECTED_TOTAL: IntCounter = register_int_counter!(
- "greptime_servers_grpc_requests_rejected_total",
- "total number of grpc requests rejected due to memory limit"
+ /// Total number of rejected requests due to memory exhaustion.
+ pub static ref REQUEST_MEMORY_REJECTED: IntCounterVec = register_int_counter_vec!(
+ "greptime_servers_request_memory_rejected_total",
+ "number of requests rejected due to memory limit",
+ &["reason"]
).unwrap();
}
diff --git a/src/servers/src/request_limiter.rs b/src/servers/src/request_limiter.rs
deleted file mode 100644
index a93104581f..0000000000
--- a/src/servers/src/request_limiter.rs
+++ /dev/null
@@ -1,230 +0,0 @@
-// Copyright 2023 Greptime Team
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-//! Request memory limiter for controlling total memory usage of concurrent requests.
-
-use std::sync::Arc;
-use std::sync::atomic::{AtomicUsize, Ordering};
-
-use crate::error::{Result, TooManyConcurrentRequestsSnafu};
-
-/// Limiter for total memory usage of concurrent request bodies.
-///
-/// Tracks the total memory used by all concurrent request bodies
-/// and rejects new requests when the limit is reached.
-#[derive(Clone, Default)]
-pub struct RequestMemoryLimiter {
- inner: Option>,
-}
-
-struct LimiterInner {
- current_usage: AtomicUsize,
- max_memory: usize,
-}
-
-impl RequestMemoryLimiter {
- /// Create a new memory limiter.
- ///
- /// # Arguments
- /// * `max_memory` - Maximum total memory for all concurrent request bodies in bytes (0 = unlimited)
- pub fn new(max_memory: usize) -> Self {
- if max_memory == 0 {
- return Self { inner: None };
- }
-
- Self {
- inner: Some(Arc::new(LimiterInner {
- current_usage: AtomicUsize::new(0),
- max_memory,
- })),
- }
- }
-
- /// Try to acquire memory for a request of given size.
- ///
- /// Returns `Ok(RequestMemoryGuard)` if memory was acquired successfully.
- /// Returns `Err` if the memory limit would be exceeded.
- pub fn try_acquire(&self, request_size: usize) -> Result> {
- let Some(inner) = self.inner.as_ref() else {
- return Ok(None);
- };
-
- let mut new_usage = 0;
- let result =
- inner
- .current_usage
- .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |current| {
- new_usage = current.saturating_add(request_size);
- if new_usage <= inner.max_memory {
- Some(new_usage)
- } else {
- None
- }
- });
-
- match result {
- Ok(_) => Ok(Some(RequestMemoryGuard {
- size: request_size,
- limiter: Arc::clone(inner),
- usage_snapshot: new_usage,
- })),
- Err(_current) => TooManyConcurrentRequestsSnafu {
- limit: inner.max_memory,
- request_size,
- }
- .fail(),
- }
- }
-
- /// Check if limiter is enabled
- pub fn is_enabled(&self) -> bool {
- self.inner.is_some()
- }
-
- /// Get current memory usage
- pub fn current_usage(&self) -> usize {
- self.inner
- .as_ref()
- .map(|inner| inner.current_usage.load(Ordering::Relaxed))
- .unwrap_or(0)
- }
-
- /// Get max memory limit
- pub fn max_memory(&self) -> usize {
- self.inner
- .as_ref()
- .map(|inner| inner.max_memory)
- .unwrap_or(0)
- }
-}
-
-/// RAII guard that releases memory when dropped
-pub struct RequestMemoryGuard {
- size: usize,
- limiter: Arc,
- usage_snapshot: usize,
-}
-
-impl RequestMemoryGuard {
- /// Returns the total memory usage snapshot at the time this guard was acquired.
- pub fn current_usage(&self) -> usize {
- self.usage_snapshot
- }
-}
-
-impl Drop for RequestMemoryGuard {
- fn drop(&mut self) {
- self.limiter
- .current_usage
- .fetch_sub(self.size, Ordering::Release);
- }
-}
-
-#[cfg(test)]
-mod tests {
- use tokio::sync::Barrier;
-
- use super::*;
-
- #[test]
- fn test_limiter_disabled() {
- let limiter = RequestMemoryLimiter::new(0);
- assert!(!limiter.is_enabled());
- assert!(limiter.try_acquire(1000000).unwrap().is_none());
- assert_eq!(limiter.current_usage(), 0);
- }
-
- #[test]
- fn test_limiter_basic() {
- let limiter = RequestMemoryLimiter::new(1000);
- assert!(limiter.is_enabled());
- assert_eq!(limiter.max_memory(), 1000);
- assert_eq!(limiter.current_usage(), 0);
-
- // Acquire 400 bytes
- let _guard1 = limiter.try_acquire(400).unwrap();
- assert_eq!(limiter.current_usage(), 400);
-
- // Acquire another 500 bytes
- let _guard2 = limiter.try_acquire(500).unwrap();
- assert_eq!(limiter.current_usage(), 900);
-
- // Try to acquire 200 bytes - should fail (900 + 200 > 1000)
- let result = limiter.try_acquire(200);
- assert!(result.is_err());
- assert_eq!(limiter.current_usage(), 900);
-
- // Drop first guard
- drop(_guard1);
- assert_eq!(limiter.current_usage(), 500);
-
- // Now we can acquire 200 bytes
- let _guard3 = limiter.try_acquire(200).unwrap();
- assert_eq!(limiter.current_usage(), 700);
- }
-
- #[test]
- fn test_limiter_exact_limit() {
- let limiter = RequestMemoryLimiter::new(1000);
-
- // Acquire exactly the limit
- let _guard = limiter.try_acquire(1000).unwrap();
- assert_eq!(limiter.current_usage(), 1000);
-
- // Try to acquire 1 more byte - should fail
- let result = limiter.try_acquire(1);
- assert!(result.is_err());
- }
-
- #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
- async fn test_limiter_concurrent() {
- let limiter = RequestMemoryLimiter::new(1000);
- let barrier = Arc::new(Barrier::new(11)); // 10 tasks + main
- let mut handles = vec![];
-
- // Spawn 10 tasks each trying to acquire 200 bytes
- for _ in 0..10 {
- let limiter_clone = limiter.clone();
- let barrier_clone = barrier.clone();
- let handle = tokio::spawn(async move {
- barrier_clone.wait().await;
- limiter_clone.try_acquire(200)
- });
- handles.push(handle);
- }
-
- // Let all tasks start together
- barrier.wait().await;
-
- let mut success_count = 0;
- let mut fail_count = 0;
- let mut guards = Vec::new();
-
- for handle in handles {
- match handle.await.unwrap() {
- Ok(Some(guard)) => {
- success_count += 1;
- guards.push(guard);
- }
- Err(_) => fail_count += 1,
- Ok(None) => unreachable!(),
- }
- }
-
- // Only 5 tasks should succeed (5 * 200 = 1000)
- assert_eq!(success_count, 5);
- assert_eq!(fail_count, 5);
- drop(guards);
- }
-}
diff --git a/src/servers/src/request_memory_limiter.rs b/src/servers/src/request_memory_limiter.rs
new file mode 100644
index 0000000000..8c3d106d9c
--- /dev/null
+++ b/src/servers/src/request_memory_limiter.rs
@@ -0,0 +1,76 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Unified memory limiter for all server request protocols.
+
+use std::sync::Arc;
+
+use common_memory_manager::{MemoryGuard, MemoryManager, OnExhaustedPolicy, PermitGranularity};
+use snafu::ResultExt;
+
+use crate::error::{MemoryLimitExceededSnafu, Result};
+use crate::request_memory_metrics::RequestMemoryMetrics;
+
+/// Unified memory limiter for all server request protocols.
+///
+/// Manages a global memory pool for HTTP requests, gRPC messages, and
+/// Arrow Flight batches without distinguishing between them.
+#[derive(Clone)]
+pub struct ServerMemoryLimiter {
+ manager: Arc>,
+ policy: OnExhaustedPolicy,
+}
+
+impl Default for ServerMemoryLimiter {
+ /// Creates a limiter with unlimited memory (0 bytes) and default policy.
+ fn default() -> Self {
+ Self::new(0, OnExhaustedPolicy::default())
+ }
+}
+
+impl ServerMemoryLimiter {
+ /// Creates a new unified memory limiter.
+ ///
+ /// # Arguments
+ ///
+ /// * `total_bytes` - Maximum total memory for all concurrent requests (0 = unlimited)
+ /// * `policy` - Policy when memory quota is exhausted
+ pub fn new(total_bytes: u64, policy: OnExhaustedPolicy) -> Self {
+ let manager = Arc::new(MemoryManager::with_granularity(
+ total_bytes,
+ PermitGranularity::Kilobyte,
+ RequestMemoryMetrics,
+ ));
+
+ Self { manager, policy }
+ }
+
+ /// Acquire memory for a request.
+ pub async fn acquire(&self, bytes: u64) -> Result> {
+ self.manager
+ .acquire_with_policy(bytes, self.policy)
+ .await
+ .context(MemoryLimitExceededSnafu)
+ }
+
+ /// Try to acquire memory without waiting.
+ pub fn try_acquire(&self, bytes: u64) -> Option> {
+ self.manager.try_acquire(bytes)
+ }
+
+ /// Returns total memory limit in bytes (0 if unlimited).
+ pub fn limit_bytes(&self) -> u64 {
+ self.manager.limit_bytes()
+ }
+}
diff --git a/src/servers/src/request_memory_metrics.rs b/src/servers/src/request_memory_metrics.rs
new file mode 100644
index 0000000000..4298830f18
--- /dev/null
+++ b/src/servers/src/request_memory_metrics.rs
@@ -0,0 +1,40 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+//! Unified metrics adapter for all server protocols.
+
+use common_memory_manager::MemoryMetrics;
+
+use crate::metrics::{REQUEST_MEMORY_IN_USE, REQUEST_MEMORY_LIMIT, REQUEST_MEMORY_REJECTED};
+
+/// Metrics adapter for unified request memory tracking.
+///
+/// This adapter tracks memory usage for all server protocols (HTTP, gRPC, Arrow Flight)
+/// without distinguishing between them. All requests contribute to the same set of metrics.
+#[derive(Clone, Copy, Debug, Default)]
+pub struct RequestMemoryMetrics;
+
+impl MemoryMetrics for RequestMemoryMetrics {
+ fn set_limit(&self, bytes: i64) {
+ REQUEST_MEMORY_LIMIT.set(bytes);
+ }
+
+ fn set_in_use(&self, bytes: i64) {
+ REQUEST_MEMORY_IN_USE.set(bytes);
+ }
+
+ fn inc_rejected(&self, reason: &str) {
+ REQUEST_MEMORY_REJECTED.with_label_values(&[reason]).inc();
+ }
+}
diff --git a/src/standalone/Cargo.toml b/src/standalone/Cargo.toml
index d94975f270..26d858272a 100644
--- a/src/standalone/Cargo.toml
+++ b/src/standalone/Cargo.toml
@@ -15,6 +15,7 @@ common-base.workspace = true
common-config.workspace = true
common-error.workspace = true
common-macro.workspace = true
+common-memory-manager.workspace = true
common-meta.workspace = true
common-options.workspace = true
common-procedure.workspace = true
diff --git a/src/standalone/src/options.rs b/src/standalone/src/options.rs
index 20aad773b1..35915742a7 100644
--- a/src/standalone/src/options.rs
+++ b/src/standalone/src/options.rs
@@ -14,6 +14,7 @@
use common_base::readable_size::ReadableSize;
use common_config::{Configurable, KvBackendConfig};
+use common_memory_manager::OnExhaustedPolicy;
use common_options::memory::MemoryOptions;
use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions};
use common_wal::config::DatanodeWalConfig;
@@ -37,6 +38,12 @@ pub struct StandaloneOptions {
pub enable_telemetry: bool,
pub default_timezone: Option,
pub default_column_prefix: Option,
+ /// Maximum total memory for all concurrent write request bodies and messages (HTTP, gRPC, Flight).
+ /// Set to 0 to disable the limit. Default: "0" (unlimited)
+ pub max_in_flight_write_bytes: ReadableSize,
+ /// Policy when write bytes quota is exhausted.
+ /// Options: "wait" (default, 10s), "wait()", "fail"
+ pub write_bytes_exhausted_policy: OnExhaustedPolicy,
pub http: HttpOptions,
pub grpc: GrpcOptions,
pub mysql: MysqlOptions,
@@ -57,7 +64,6 @@ pub struct StandaloneOptions {
pub tracing: TracingOptions,
pub init_regions_in_background: bool,
pub init_regions_parallelism: usize,
- pub max_in_flight_write_bytes: Option,
pub slow_query: SlowQueryOptions,
pub query: QueryOptions,
pub memory: MemoryOptions,
@@ -69,6 +75,8 @@ impl Default for StandaloneOptions {
enable_telemetry: true,
default_timezone: None,
default_column_prefix: None,
+ max_in_flight_write_bytes: ReadableSize(0),
+ write_bytes_exhausted_policy: OnExhaustedPolicy::default(),
http: HttpOptions::default(),
grpc: GrpcOptions::default(),
mysql: MysqlOptions::default(),
@@ -91,7 +99,6 @@ impl Default for StandaloneOptions {
tracing: TracingOptions::default(),
init_regions_in_background: false,
init_regions_parallelism: 16,
- max_in_flight_write_bytes: None,
slow_query: SlowQueryOptions::default(),
query: QueryOptions::default(),
memory: MemoryOptions::default(),
@@ -120,6 +127,8 @@ impl StandaloneOptions {
let cloned_opts = self.clone();
FrontendOptions {
default_timezone: cloned_opts.default_timezone,
+ max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
+ write_bytes_exhausted_policy: cloned_opts.write_bytes_exhausted_policy,
http: cloned_opts.http,
grpc: cloned_opts.grpc,
mysql: cloned_opts.mysql,
@@ -131,7 +140,6 @@ impl StandaloneOptions {
meta_client: None,
logging: cloned_opts.logging,
user_provider: cloned_opts.user_provider,
- max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
slow_query: cloned_opts.slow_query,
..Default::default()
}
diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml
index 0dbfededb3..36210046f6 100644
--- a/tests-integration/Cargo.toml
+++ b/tests-integration/Cargo.toml
@@ -30,6 +30,7 @@ common-error.workspace = true
common-event-recorder.workspace = true
common-frontend.workspace = true
common-grpc.workspace = true
+common-memory-manager.workspace = true
common-meta = { workspace = true, features = ["testing"] }
common-procedure.workspace = true
common-query.workspace = true
diff --git a/tests-integration/src/grpc/network.rs b/tests-integration/src/grpc/network.rs
index afe1ae05d5..427317fb05 100644
--- a/tests-integration/src/grpc/network.rs
+++ b/tests-integration/src/grpc/network.rs
@@ -143,6 +143,7 @@ mod tests {
"test_grpc_max_connection_age",
None,
Some(config),
+ None,
)
.await;
let addr = server.bind_addr().unwrap().to_string();
diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs
index 5a0619c1cc..c095e266b2 100644
--- a/tests-integration/src/test_util.rs
+++ b/tests-integration/src/test_util.rs
@@ -49,6 +49,7 @@ use servers::otel_arrow::OtelArrowServiceHandler;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
+use servers::request_memory_limiter::ServerMemoryLimiter;
use servers::server::Server;
use servers::tls::ReloadableTlsServerConfig;
use session::context::QueryContext;
@@ -438,6 +439,23 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
store_type: StorageType,
name: &str,
user_provider: Option,
+) -> (Router, TestGuard) {
+ setup_test_http_app_with_frontend_and_custom_options(
+ store_type,
+ name,
+ user_provider,
+ None,
+ None,
+ )
+ .await
+}
+
+pub async fn setup_test_http_app_with_frontend_and_custom_options(
+ store_type: StorageType,
+ name: &str,
+ user_provider: Option,
+ http_opts: Option,
+ memory_limiter: Option,
) -> (Router, TestGuard) {
let plugins = Plugins::new();
if let Some(user_provider) = user_provider.clone() {
@@ -449,14 +467,12 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
create_test_table(instance.fe_instance(), "demo").await;
- let http_opts = HttpOptions {
+ let http_opts = http_opts.unwrap_or_else(|| HttpOptions {
addr: format!("127.0.0.1:{}", ports::get_port()),
..Default::default()
- };
+ });
- let mut http_server = HttpServerBuilder::new(http_opts);
-
- http_server = http_server
+ let mut http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(
instance.fe_instance().clone(),
))
@@ -471,6 +487,10 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
http_server = http_server.with_user_provider(user_provider);
}
+ if let Some(limiter) = memory_limiter {
+ http_server = http_server.with_memory_limiter(limiter);
+ }
+
let http_server = http_server.build();
let app = http_server.build(http_server.make_app()).unwrap();
@@ -561,7 +581,7 @@ pub async fn setup_grpc_server(
store_type: StorageType,
name: &str,
) -> (GreptimeDbStandalone, Arc) {
- setup_grpc_server_with(store_type, name, None, None).await
+ setup_grpc_server_with(store_type, name, None, None, None).await
}
pub async fn setup_grpc_server_with_user_provider(
@@ -569,7 +589,7 @@ pub async fn setup_grpc_server_with_user_provider(
name: &str,
user_provider: Option,
) -> (GreptimeDbStandalone, Arc) {
- setup_grpc_server_with(store_type, name, user_provider, None).await
+ setup_grpc_server_with(store_type, name, user_provider, None, None).await
}
pub async fn setup_grpc_server_with(
@@ -577,6 +597,7 @@ pub async fn setup_grpc_server_with(
name: &str,
user_provider: Option,
grpc_config: Option,
+ memory_limiter: Option,
) -> (GreptimeDbStandalone, Arc) {
let instance = setup_standalone_instance(name, store_type).await;
@@ -598,7 +619,13 @@ pub async fn setup_grpc_server_with(
let flight_handler = Arc::new(greptime_request_handler.clone());
let grpc_config = grpc_config.unwrap_or_default();
- let grpc_builder = GrpcServerBuilder::new(grpc_config.clone(), runtime)
+ let mut grpc_builder = GrpcServerBuilder::new(grpc_config.clone(), runtime);
+
+ if let Some(limiter) = memory_limiter {
+ grpc_builder = grpc_builder.with_memory_limiter(limiter);
+ }
+
+ let grpc_builder = grpc_builder
.database_handler(greptime_request_handler)
.flight_handler(flight_handler)
.prometheus_handler(fe_instance_ref.clone(), user_provider.clone())
diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs
index 447d5afe50..e31444b45c 100644
--- a/tests-integration/tests/grpc.rs
+++ b/tests-integration/tests/grpc.rs
@@ -25,6 +25,7 @@ use auth::user_provider_from_option;
use client::{Client, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, Database, OutputData};
use common_catalog::consts::MITO_ENGINE;
use common_grpc::channel_manager::ClientTlsOption;
+use common_memory_manager::OnExhaustedPolicy;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_runtime::Runtime;
@@ -38,6 +39,7 @@ use servers::http::prometheus::{
PromData, PromQueryResult, PromSeriesMatrix, PromSeriesVector, PrometheusJsonResponse,
PrometheusResponse,
};
+use servers::request_memory_limiter::ServerMemoryLimiter;
use servers::server::Server;
use servers::tls::{TlsMode, TlsOption};
use tests_integration::test_util::{
@@ -144,8 +146,14 @@ pub async fn test_grpc_message_size_ok(store_type: StorageType) {
max_send_message_size: 1024,
..Default::default()
};
- let (_db, fe_grpc_server) =
- setup_grpc_server_with(store_type, "test_grpc_message_size_ok", None, Some(config)).await;
+ let (_db, fe_grpc_server) = setup_grpc_server_with(
+ store_type,
+ "test_grpc_message_size_ok",
+ None,
+ Some(config),
+ None,
+ )
+ .await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let grpc_client = Client::with_urls(vec![addr]);
@@ -164,8 +172,14 @@ pub async fn test_grpc_zstd_compression(store_type: StorageType) {
max_send_message_size: 1024,
..Default::default()
};
- let (_db, fe_grpc_server) =
- setup_grpc_server_with(store_type, "test_grpc_zstd_compression", None, Some(config)).await;
+ let (_db, fe_grpc_server) = setup_grpc_server_with(
+ store_type,
+ "test_grpc_zstd_compression",
+ None,
+ Some(config),
+ None,
+ )
+ .await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let grpc_client = Client::with_urls(vec![addr]);
@@ -188,6 +202,7 @@ pub async fn test_grpc_message_size_limit_send(store_type: StorageType) {
"test_grpc_message_size_limit_send",
None,
Some(config),
+ None,
)
.await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
@@ -213,6 +228,7 @@ pub async fn test_grpc_message_size_limit_recv(store_type: StorageType) {
"test_grpc_message_size_limit_recv",
None,
Some(config),
+ None,
)
.await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
@@ -886,7 +902,7 @@ pub async fn test_grpc_timezone(store_type: StorageType) {
..Default::default()
};
let (_db, fe_grpc_server) =
- setup_grpc_server_with(store_type, "auto_create_table", None, Some(config)).await;
+ setup_grpc_server_with(store_type, "auto_create_table", None, Some(config), None).await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let grpc_client = Client::with_urls(vec![addr]);
@@ -959,12 +975,11 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
- max_total_message_memory: 1024 * 1024 * 1024,
tls,
max_connection_age: None,
};
let (_db, fe_grpc_server) =
- setup_grpc_server_with(store_type, "tls_create_table", None, Some(config)).await;
+ setup_grpc_server_with(store_type, "tls_create_table", None, Some(config), None).await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let mut client_tls = ClientTlsOption {
@@ -1003,7 +1018,6 @@ pub async fn test_grpc_tls_config(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024,
max_send_message_size: 1024,
- max_total_message_memory: 1024 * 1024 * 1024,
tls,
max_connection_age: None,
};
@@ -1021,12 +1035,23 @@ pub async fn test_grpc_memory_limit(store_type: StorageType) {
let config = GrpcServerConfig {
max_recv_message_size: 1024 * 1024,
max_send_message_size: 1024 * 1024,
- max_total_message_memory: 200,
tls: Default::default(),
max_connection_age: None,
};
- let (_db, fe_grpc_server) =
- setup_grpc_server_with(store_type, "test_grpc_memory_limit", None, Some(config)).await;
+
+ // Create memory limiter with 2KB limit and fail-fast policy.
+ // Note: MemoryManager uses 1KB granularity (PermitGranularity::Kilobyte),
+ // so 2KB = 2 permits. Small/medium requests should fit, large should fail.
+ let memory_limiter = ServerMemoryLimiter::new(2048, OnExhaustedPolicy::Fail);
+
+ let (_db, fe_grpc_server) = setup_grpc_server_with(
+ store_type,
+ "test_grpc_memory_limit",
+ None,
+ Some(config),
+ Some(memory_limiter),
+ )
+ .await;
let addr = fe_grpc_server.bind_addr().unwrap().to_string();
let grpc_client = Client::with_urls([&addr]);
@@ -1120,13 +1145,63 @@ pub async fn test_grpc_memory_limit(store_type: StorageType) {
.await;
assert!(result.is_ok());
- // Test that large request exceeds limit
- let large_rows: Vec = (0..100)
+ // Test that medium request in the 200-1024 byte range should also succeed
+ // (due to 1KB granularity alignment)
+ let medium_rows: Vec = (0..5)
.map(|i| Row {
values: vec![
Value {
value_data: Some(ValueData::StringValue(format!("host{}", i))),
},
+ Value {
+ value_data: Some(ValueData::TimestampMillisecondValue(2000 + i)),
+ },
+ Value {
+ value_data: Some(ValueData::F64Value(i as f64 * 2.5)),
+ },
+ ],
+ })
+ .collect();
+
+ let medium_row_insert = RowInsertRequest {
+ table_name: table_name.to_owned(),
+ rows: Some(api::v1::Rows {
+ schema: column_schemas
+ .iter()
+ .map(|c| api::v1::ColumnSchema {
+ column_name: c.name.clone(),
+ datatype: c.data_type,
+ semantic_type: c.semantic_type,
+ datatype_extension: None,
+ options: None,
+ })
+ .collect(),
+ rows: medium_rows,
+ }),
+ };
+
+ let result = db
+ .row_inserts(RowInsertRequests {
+ inserts: vec![medium_row_insert],
+ })
+ .await;
+ assert!(
+ result.is_ok(),
+ "Medium request (~500 bytes) should succeed within aligned 1KB limit"
+ );
+
+ // Test that large request exceeds limit (> 1KB aligned limit)
+ // Create a very large string to ensure we definitely exceed 1KB
+ // Use 100 rows with very long strings (>50 chars each) = definitely >5KB total
+ let large_rows: Vec = (0..100)
+ .map(|i| Row {
+ values: vec![
+ Value {
+ value_data: Some(ValueData::StringValue(format!(
+ "this_is_a_very_long_hostname_string_designed_to_make_the_request_exceed_memory_limit_row_number_{}",
+ i
+ ))),
+ },
Value {
value_data: Some(ValueData::TimestampMillisecondValue(1000 + i)),
},
@@ -1159,12 +1234,15 @@ pub async fn test_grpc_memory_limit(store_type: StorageType) {
inserts: vec![large_row_insert],
})
.await;
- assert!(result.is_err());
+ assert!(
+ result.is_err(),
+ "Large request should exceed 1KB limit and fail"
+ );
let err = result.unwrap_err();
let err_msg = err.to_string();
assert!(
- err_msg.contains("Too many concurrent"),
- "Expected memory limit error, got: {}",
+ err_msg.contains("Memory limit exceeded"),
+ "Expected 'Memory limit exceeded' error, got: {}",
err_msg
);
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 5c400776fd..bf3a9e8a96 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -32,6 +32,7 @@ use common_error::status_code::StatusCode as ErrorCode;
use common_frontend::slow_query_event::{
SLOW_QUERY_TABLE_NAME, SLOW_QUERY_TABLE_QUERY_COLUMN_NAME,
};
+use common_memory_manager::OnExhaustedPolicy;
use flate2::Compression;
use flate2::write::GzEncoder;
use log_query::{Context, Limit, LogQuery, TimeFilter};
@@ -55,6 +56,7 @@ use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::test_helpers::{TestClient, TestResponse};
use servers::prom_store::{self, mock_timeseries_new_label};
+use servers::request_memory_limiter::ServerMemoryLimiter;
use table::table_name::TableName;
use tests_integration::test_util::{
StorageType, setup_test_http_app, setup_test_http_app_with_frontend,
@@ -143,6 +145,7 @@ macro_rules! http_tests {
test_jaeger_query_api_for_trace_v1,
test_influxdb_write,
+ test_http_memory_limit,
);
)*
};
@@ -1379,6 +1382,8 @@ providers = []"#,
let expected_toml_str = format!(
r#"
enable_telemetry = true
+max_in_flight_write_bytes = "0KiB"
+write_bytes_exhausted_policy = "wait"
init_regions_in_background = false
init_regions_parallelism = 16
@@ -3319,7 +3324,7 @@ processors:
events = del(.events)
base_host = del(.host)
base_timestamp = del(.timestamp)
-
+
# Map each event to a complete row object
map_values(array!(events)) -> |event| {
{
@@ -7117,3 +7122,147 @@ fn compress_vec_with_gzip(data: Vec) -> Vec {
encoder.write_all(&data).unwrap();
encoder.finish().unwrap()
}
+
+pub async fn test_http_memory_limit(store_type: StorageType) {
+ use tests_integration::test_util::setup_test_http_app_with_frontend_and_custom_options;
+
+ common_telemetry::init_default_ut_logging();
+
+ let http_opts = servers::http::HttpOptions {
+ addr: format!("127.0.0.1:{}", common_test_util::ports::get_port()),
+ ..Default::default()
+ };
+
+ // Create memory limiter with 2KB limit and fail-fast policy.
+ // Note: MemoryManager uses 1KB granularity (PermitGranularity::Kilobyte),
+ // so 2KB = 2 permits. Small/medium requests should fit, large should fail.
+ let memory_limiter = ServerMemoryLimiter::new(2048, OnExhaustedPolicy::Fail);
+
+ let (app, mut guard) = setup_test_http_app_with_frontend_and_custom_options(
+ store_type,
+ "test_http_memory_limit",
+ None,
+ Some(http_opts),
+ Some(memory_limiter),
+ )
+ .await;
+
+ let client = TestClient::new(app).await;
+
+ // Create table first
+ let res = client
+ .get("/v1/sql?sql=CREATE TABLE test_mem_limit(host STRING, cpu DOUBLE, ts TIMESTAMP TIME INDEX)")
+ .send()
+ .await;
+ assert_eq!(res.status(), StatusCode::OK);
+
+ // Test 1: Small POST request should succeed (uses actual memory limiting path)
+ // Note: GET requests have no body, so they bypass body memory limiting entirely.
+ let small_insert = "INSERT INTO test_mem_limit VALUES ('host1', 1.0, 0)";
+ let res = client
+ .post("/v1/sql")
+ .header("Content-Type", "application/x-www-form-urlencoded")
+ .body(format!("sql={}", small_insert))
+ .send()
+ .await;
+ assert_eq!(res.status(), StatusCode::OK, "Small POST should succeed");
+
+ // Test 1B: Medium request in the 500-1024 byte range should also succeed
+ // (due to 1KB granularity alignment)
+ let medium_values: Vec = (0..8)
+ .map(|i| format!("('host{}', {}.5, {})", i, i, i * 1000))
+ .collect();
+ let medium_insert = format!(
+ "INSERT INTO test_mem_limit VALUES {}",
+ medium_values.join(", ")
+ );
+
+ let res = client
+ .post("/v1/sql")
+ .header("Content-Type", "application/x-www-form-urlencoded")
+ .body(format!("sql={}", medium_insert))
+ .send()
+ .await;
+ assert_eq!(
+ res.status(),
+ StatusCode::OK,
+ "Medium request (~700 bytes) should succeed within aligned 1KB limit"
+ );
+
+ // Test 2: Large write request should be rejected (exceeds 2KB limit)
+ // Generate a large INSERT with many rows and long strings to definitely exceed 2KB
+ let large_values: Vec = (0..100)
+ .map(|i| {
+ format!(
+ "('this_is_a_very_long_hostname_string_to_increase_body_size_row_{}', {}.5, {})",
+ i,
+ i,
+ i * 1000
+ )
+ })
+ .collect();
+ let large_insert = format!(
+ "INSERT INTO test_mem_limit VALUES {}",
+ large_values.join(", ")
+ );
+
+ let res = client
+ .post("/v1/sql")
+ .header("Content-Type", "application/x-www-form-urlencoded")
+ .body(format!("sql={}", large_insert))
+ .send()
+ .await;
+
+ assert_eq!(
+ res.status(),
+ StatusCode::TOO_MANY_REQUESTS,
+ "Large write should be rejected with 429"
+ );
+
+ let error_body = res.text().await;
+ assert!(
+ error_body.contains("Request body memory limit exceeded"),
+ "Error message should be 'Request body memory limit exceeded', got: {}",
+ error_body
+ );
+
+ // Test 3A: Small InfluxDB write should succeed
+ let small_influx = "test_influx,host=host1 cpu=1.5 1000000000";
+ let res = client
+ .post("/v1/influxdb/write?db=public")
+ .body(small_influx)
+ .send()
+ .await;
+ assert_eq!(
+ res.status(),
+ StatusCode::NO_CONTENT,
+ "Small InfluxDB write should succeed"
+ );
+
+ // Test 3B: Large InfluxDB write should be rejected
+ let large_influx_body = (0..100)
+ .map(|i| {
+ format!(
+ "test_influx,host=host{} cpu={}.5 {}",
+ i,
+ i,
+ (i as i64) * 1000000000
+ )
+ })
+ .collect::>()
+ .join("\n");
+
+ let res = client
+ .post("/v1/influxdb/write?db=public")
+ .body(large_influx_body)
+ .send()
+ .await;
+
+ assert_eq!(
+ res.status(),
+ StatusCode::TOO_MANY_REQUESTS,
+ "Large InfluxDB write should be rejected"
+ );
+
+ guard.remove_all().await;
+}