From 4daf5adce5ad4cd6b3547253fbe7958a2ed33029 Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Sun, 31 Aug 2025 02:17:08 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20update=20rate=20limiter=20to=20use=20se?= =?UTF-8?q?maphore=20that=20will=20block=20without=20re=E2=80=A6=20(#6853)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: update rate limiter to use semaphore that will block without return error Signed-off-by: Ning Sun * fix: remove unused error Signed-off-by: Ning Sun --------- Signed-off-by: Ning Sun --- src/frontend/src/error.rs | 18 ++-- src/frontend/src/instance/builder.rs | 2 +- src/frontend/src/instance/grpc.rs | 12 +-- src/frontend/src/instance/influxdb.rs | 14 +-- src/frontend/src/instance/log_handler.rs | 28 ++--- src/frontend/src/instance/opentsdb.rs | 17 +-- src/frontend/src/instance/otlp.rs | 26 +++-- src/frontend/src/instance/prom_store.rs | 14 +-- src/frontend/src/limiter.rs | 131 ++++++++--------------- src/servers/src/error.rs | 8 -- 10 files changed, 113 insertions(+), 157 deletions(-) diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 6dac8e9898..286e633eeb 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -337,12 +337,6 @@ pub enum Error { source: BoxedError, }, - #[snafu(display("In-flight write bytes exceeded the maximum limit"))] - InFlightWriteBytesExceeded { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to decode logical plan from substrait"))] SubstraitDecodeLogicalPlan { #[snafu(implicit)] @@ -369,6 +363,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to acquire more permits from limiter"))] + AcquireLimiter { + #[snafu(source)] + error: tokio::sync::AcquireError, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -444,13 +446,13 @@ impl ErrorExt for Error { Error::TableOperation { source, .. } => source.status_code(), - Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited, - Error::DataFusion { error, .. } => datafusion_status_code::(error, None), Error::Cancelled { .. } => StatusCode::Cancelled, Error::StatementTimeout { .. } => StatusCode::Cancelled, + + Error::AcquireLimiter { .. } => StatusCode::Internal, } } diff --git a/src/frontend/src/instance/builder.rs b/src/frontend/src/instance/builder.rs index 2bc959c403..cb4923763c 100644 --- a/src/frontend/src/instance/builder.rs +++ b/src/frontend/src/instance/builder.rs @@ -207,7 +207,7 @@ impl FrontendBuilder { .options .max_in_flight_write_bytes .map(|max_in_flight_write_bytes| { - Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes())) + Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes() as usize)) }); Ok(Instance { diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 5a269267d0..8b6983f52b 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -43,9 +43,9 @@ use table::table_name::TableName; use table::TableRef; use crate::error::{ - CatalogSnafu, Error, ExternalSnafu, InFlightWriteBytesExceededSnafu, - IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result, - SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu, + CatalogSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu, + PermissionSnafu, PlanStatementSnafu, Result, SubstraitDecodeLogicalPlanSnafu, + TableNotFoundSnafu, TableOperationSnafu, }; use crate::instance::{attach_timer, Instance}; use crate::metrics::{ @@ -68,11 +68,7 @@ impl GrpcQueryHandler for Instance { .context(PermissionSnafu)?; let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_request(&request); - if result.is_none() { - return InFlightWriteBytesExceededSnafu.fail(); - } - result + Some(limiter.limit_request(&request).await?) } else { None }; diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 864c88e89e..01c526aba9 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -16,7 +16,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::Output; use common_error::ext::BoxedError; -use servers::error::{AuthSnafu, Error, InFlightWriteBytesExceededSnafu}; +use servers::error::{AuthSnafu, Error, OtherSnafu}; use servers::influxdb::InfluxdbRequest; use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef}; use servers::query_handler::InfluxdbLineProtocolHandler; @@ -47,11 +47,13 @@ impl InfluxdbLineProtocolHandler for Instance { .await?; let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_row_inserts(&requests); - if result.is_none() { - return InFlightWriteBytesExceededSnafu.fail(); - } - result + Some( + limiter + .limit_row_inserts(&requests) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?, + ) } else { None }; diff --git a/src/frontend/src/instance/log_handler.rs b/src/frontend/src/instance/log_handler.rs index 179d5e098f..946f121c37 100644 --- a/src/frontend/src/instance/log_handler.rs +++ b/src/frontend/src/instance/log_handler.rs @@ -23,8 +23,8 @@ use datatypes::timestamp::TimestampNanosecond; use pipeline::pipeline_operator::PipelineOperator; use pipeline::{Pipeline, PipelineInfo, PipelineVersion}; use servers::error::{ - AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, InFlightWriteBytesExceededSnafu, - PipelineSnafu, Result as ServerResult, + AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, OtherSnafu, PipelineSnafu, + Result as ServerResult, }; use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef}; use servers::query_handler::PipelineHandler; @@ -125,11 +125,13 @@ impl Instance { ctx: QueryContextRef, ) -> ServerResult { let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_row_inserts(&log); - if result.is_none() { - return InFlightWriteBytesExceededSnafu.fail(); - } - result + Some( + limiter + .limit_row_inserts(&log) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?, + ) } else { None }; @@ -147,11 +149,13 @@ impl Instance { ctx: QueryContextRef, ) -> ServerResult { let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_row_inserts(&rows); - if result.is_none() { - return InFlightWriteBytesExceededSnafu.fail(); - } - result + Some( + limiter + .limit_row_inserts(&rows) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?, + ) } else { None }; diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index fbf8e67a6e..d7deb840cb 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -16,8 +16,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_error::ext::BoxedError; use common_telemetry::tracing; -use servers::error as server_error; -use servers::error::{AuthSnafu, InFlightWriteBytesExceededSnafu}; +use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu}; use servers::opentsdb::codec::DataPoint; use servers::opentsdb::data_point_to_grpc_row_insert_requests; use servers::query_handler::OpentsdbProtocolHandler; @@ -43,11 +42,13 @@ impl OpentsdbProtocolHandler for Instance { let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?; let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_row_inserts(&requests); - if result.is_none() { - return InFlightWriteBytesExceededSnafu.fail(); - } - result + Some( + limiter + .limit_row_inserts(&requests) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?, + ) } else { None }; @@ -57,7 +58,7 @@ impl OpentsdbProtocolHandler for Instance { .handle_row_inserts(requests, ctx, true, true) .await .map_err(BoxedError::new) - .context(servers::error::ExecuteGrpcQuerySnafu)?; + .context(ExecuteGrpcQuerySnafu)?; Ok(match output.data { common_query::OutputData::AffectedRows(rows) => rows, diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index b11a034429..7e4a867730 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -24,7 +24,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; -use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult}; +use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; @@ -84,11 +84,13 @@ impl OpenTelemetryProtocolHandler for Instance { }; let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_row_inserts(&requests); - if result.is_none() { - return InFlightWriteBytesExceededSnafu.fail(); - } - result + Some( + limiter + .limit_row_inserts(&requests) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?, + ) } else { None }; @@ -190,11 +192,13 @@ impl OpenTelemetryProtocolHandler for Instance { .await?; let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_ctx_req(&opt_req); - if result.is_none() { - return InFlightWriteBytesExceededSnafu.fail(); - } - result + Some( + limiter + .limit_ctx_req(&opt_req) + .await + .map_err(BoxedError::new) + .context(OtherSnafu)?, + ) } else { None }; diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 13a02ff476..3cf5a200fd 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -30,7 +30,7 @@ use common_telemetry::{debug, tracing}; use operator::insert::InserterRef; use operator::statement::StatementExecutor; use prost::Message; -use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult}; +use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef}; @@ -176,11 +176,13 @@ impl PromStoreProtocolHandler for Instance { interceptor_ref.pre_write(&request, ctx.clone())?; let _guard = if let Some(limiter) = &self.limiter { - let result = limiter.limit_row_inserts(&request); - if result.is_none() { - return InFlightWriteBytesExceededSnafu.fail(); - } - result + Some( + limiter + .limit_row_inserts(&request) + .await + .map_err(BoxedError::new) + .context(error::OtherSnafu)?, + ) } else { None }; diff --git a/src/frontend/src/limiter.rs b/src/frontend/src/limiter.rs index 3c09192cbe..7b5eb7bf59 100644 --- a/src/frontend/src/limiter.rs +++ b/src/frontend/src/limiter.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use api::v1::column::Values; @@ -21,61 +20,30 @@ use api::v1::value::ValueData; use api::v1::{ Decimal128, InsertRequests, IntervalMonthDayNano, RowInsertRequest, RowInsertRequests, }; -use common_telemetry::{debug, warn}; use pipeline::ContextReq; +use snafu::ResultExt; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; + +use crate::error::{AcquireLimiterSnafu, Result}; pub(crate) type LimiterRef = Arc; -/// A frontend request limiter that controls the total size of in-flight write requests. +/// A frontend request limiter that controls the total size of in-flight write +/// requests. pub(crate) struct Limiter { - // The maximum number of bytes that can be in flight. - max_in_flight_write_bytes: u64, - - // The current in-flight write bytes. - in_flight_write_bytes: Arc, -} - -/// A counter for the in-flight write bytes. -pub(crate) struct InFlightWriteBytesCounter { - // The current in-flight write bytes. - in_flight_write_bytes: Arc, - - // The write bytes that are being processed. - processing_write_bytes: u64, -} - -impl InFlightWriteBytesCounter { - /// Creates a new InFlightWriteBytesCounter. It will decrease the in-flight write bytes when dropped. - pub fn new(in_flight_write_bytes: Arc, processing_write_bytes: u64) -> Self { - debug!( - "processing write bytes: {}, current in-flight write bytes: {}", - processing_write_bytes, - in_flight_write_bytes.load(Ordering::Relaxed) - ); - Self { - in_flight_write_bytes, - processing_write_bytes, - } - } -} - -impl Drop for InFlightWriteBytesCounter { - // When the request is finished, the in-flight write bytes should be decreased. - fn drop(&mut self) { - self.in_flight_write_bytes - .fetch_sub(self.processing_write_bytes, Ordering::Relaxed); - } + max_in_flight_write_bytes: usize, + byte_counter: Arc, } impl Limiter { - pub fn new(max_in_flight_write_bytes: u64) -> Self { + pub fn new(max_in_flight_write_bytes: usize) -> Self { Self { + byte_counter: Arc::new(Semaphore::new(max_in_flight_write_bytes)), max_in_flight_write_bytes, - in_flight_write_bytes: Arc::new(AtomicU64::new(0)), } } - pub fn limit_request(&self, request: &Request) -> Option { + pub async fn limit_request(&self, request: &Request) -> Result { let size = match request { Request::Inserts(requests) => self.insert_requests_data_size(requests), Request::RowInserts(requests) => { @@ -83,56 +51,35 @@ impl Limiter { } _ => 0, }; - self.limit_in_flight_write_bytes(size as u64) + self.limit_in_flight_write_bytes(size).await } - pub fn limit_row_inserts( + pub async fn limit_row_inserts( &self, requests: &RowInsertRequests, - ) -> Option { + ) -> Result { let size = self.rows_insert_requests_data_size(requests.inserts.iter()); - self.limit_in_flight_write_bytes(size as u64) + self.limit_in_flight_write_bytes(size).await } - pub fn limit_ctx_req(&self, opt_req: &ContextReq) -> Option { + pub async fn limit_ctx_req(&self, opt_req: &ContextReq) -> Result { let size = self.rows_insert_requests_data_size(opt_req.ref_all_req()); - self.limit_in_flight_write_bytes(size as u64) + self.limit_in_flight_write_bytes(size).await } - /// Returns None if the in-flight write bytes exceed the maximum limit. - /// Otherwise, returns Some(InFlightWriteBytesCounter) and the in-flight write bytes will be increased. - pub fn limit_in_flight_write_bytes(&self, bytes: u64) -> Option { - let result = self.in_flight_write_bytes.fetch_update( - Ordering::Relaxed, - Ordering::Relaxed, - |current| { - if current + bytes > self.max_in_flight_write_bytes { - warn!( - "in-flight write bytes exceed the maximum limit {}, request with {} bytes will be limited", - self.max_in_flight_write_bytes, - bytes - ); - return None; - } - Some(current + bytes) - }, - ); - - match result { - // Update the in-flight write bytes successfully. - Ok(_) => Some(InFlightWriteBytesCounter::new( - self.in_flight_write_bytes.clone(), - bytes, - )), - // It means the in-flight write bytes exceed the maximum limit. - Err(_) => None, - } + /// Await until more inflight bytes are available + pub async fn limit_in_flight_write_bytes(&self, bytes: usize) -> Result { + self.byte_counter + .clone() + .acquire_many_owned(bytes as u32) + .await + .context(AcquireLimiterSnafu) } /// Returns the current in-flight write bytes. #[allow(dead_code)] - pub fn in_flight_write_bytes(&self) -> u64 { - self.in_flight_write_bytes.load(Ordering::Relaxed) + pub fn in_flight_write_bytes(&self) -> usize { + self.max_in_flight_write_bytes - self.byte_counter.available_permits() } fn insert_requests_data_size(&self, request: &InsertRequests) -> usize { @@ -270,8 +217,10 @@ mod tests { for _ in 0..tasks_count { let limiter = limiter_ref.clone(); let handle = tokio::spawn(async move { - let result = limiter.limit_request(&generate_request(request_data_size)); - assert!(result.is_some()); + let result = limiter + .limit_request(&generate_request(request_data_size)) + .await; + assert!(result.is_ok()); }); handles.push(handle); } @@ -282,23 +231,27 @@ mod tests { } } - #[test] - fn test_in_flight_write_bytes() { + #[tokio::test] + async fn test_in_flight_write_bytes() { let limiter_ref: LimiterRef = Arc::new(Limiter::new(1024)); let req1 = generate_request(100); - let result1 = limiter_ref.limit_request(&req1); - assert!(result1.is_some()); + let result1 = limiter_ref + .limit_request(&req1) + .await + .expect("failed to acquire permits"); assert_eq!(limiter_ref.in_flight_write_bytes(), 100); let req2 = generate_request(200); - let result2 = limiter_ref.limit_request(&req2); - assert!(result2.is_some()); + let result2 = limiter_ref + .limit_request(&req2) + .await + .expect("failed to acquire permits"); assert_eq!(limiter_ref.in_flight_write_bytes(), 300); - drop(result1.unwrap()); + drop(result1); assert_eq!(limiter_ref.in_flight_write_bytes(), 200); - drop(result2.unwrap()); + drop(result2); assert_eq!(limiter_ref.in_flight_write_bytes(), 0); } } diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 6eacef618f..7d470a531e 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -594,12 +594,6 @@ pub enum Error { location: Location, }, - #[snafu(display("In-flight write bytes exceeded the maximum limit"))] - InFlightWriteBytesExceeded { - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Invalid elasticsearch input, reason: {}", reason))] InvalidElasticsearchInput { reason: String, @@ -759,8 +753,6 @@ impl ErrorExt for Error { ConvertSqlValue { source, .. } => source.status_code(), - InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited, - DurationOverflow { .. } => StatusCode::InvalidArguments, HandleOtelArrowRequest { .. } => StatusCode::Internal,