feat: update rate limiter to use semaphore that will block without re… (#6853)

* feat: update rate limiter to use semaphore that will block without return error

Signed-off-by: Ning Sun <sunning@greptime.com>

* fix: remove unused error

Signed-off-by: Ning Sun <sunning@greptime.com>

---------

Signed-off-by: Ning Sun <sunning@greptime.com>
This commit is contained in:
Ning Sun
2025-08-31 02:17:08 +08:00
committed by GitHub
parent 575093f85f
commit 4daf5adce5
10 changed files with 113 additions and 157 deletions

View File

@@ -337,12 +337,6 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("In-flight write bytes exceeded the maximum limit"))]
InFlightWriteBytesExceeded {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode logical plan from substrait"))]
SubstraitDecodeLogicalPlan {
#[snafu(implicit)]
@@ -369,6 +363,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to acquire more permits from limiter"))]
AcquireLimiter {
#[snafu(source)]
error: tokio::sync::AcquireError,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -444,13 +446,13 @@ impl ErrorExt for Error {
Error::TableOperation { source, .. } => source.status_code(),
Error::InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
Error::DataFusion { error, .. } => datafusion_status_code::<Self>(error, None),
Error::Cancelled { .. } => StatusCode::Cancelled,
Error::StatementTimeout { .. } => StatusCode::Cancelled,
Error::AcquireLimiter { .. } => StatusCode::Internal,
}
}

View File

@@ -207,7 +207,7 @@ impl FrontendBuilder {
.options
.max_in_flight_write_bytes
.map(|max_in_flight_write_bytes| {
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes()))
Arc::new(Limiter::new(max_in_flight_write_bytes.as_bytes() as usize))
});
Ok(Instance {

View File

@@ -43,9 +43,9 @@ use table::table_name::TableName;
use table::TableRef;
use crate::error::{
CatalogSnafu, Error, ExternalSnafu, InFlightWriteBytesExceededSnafu,
IncompleteGrpcRequestSnafu, NotSupportedSnafu, PermissionSnafu, PlanStatementSnafu, Result,
SubstraitDecodeLogicalPlanSnafu, TableNotFoundSnafu, TableOperationSnafu,
CatalogSnafu, Error, ExternalSnafu, IncompleteGrpcRequestSnafu, NotSupportedSnafu,
PermissionSnafu, PlanStatementSnafu, Result, SubstraitDecodeLogicalPlanSnafu,
TableNotFoundSnafu, TableOperationSnafu,
};
use crate::instance::{attach_timer, Instance};
use crate::metrics::{
@@ -68,11 +68,7 @@ impl GrpcQueryHandler for Instance {
.context(PermissionSnafu)?;
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_request(&request);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
Some(limiter.limit_request(&request).await?)
} else {
None
};

View File

@@ -16,7 +16,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use servers::error::{AuthSnafu, Error, InFlightWriteBytesExceededSnafu};
use servers::error::{AuthSnafu, Error, OtherSnafu};
use servers::influxdb::InfluxdbRequest;
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
use servers::query_handler::InfluxdbLineProtocolHandler;
@@ -47,11 +47,13 @@ impl InfluxdbLineProtocolHandler for Instance {
.await?;
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
Some(
limiter
.limit_row_inserts(&requests)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};

View File

@@ -23,8 +23,8 @@ use datatypes::timestamp::TimestampNanosecond;
use pipeline::pipeline_operator::PipelineOperator;
use pipeline::{Pipeline, PipelineInfo, PipelineVersion};
use servers::error::{
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, InFlightWriteBytesExceededSnafu,
PipelineSnafu, Result as ServerResult,
AuthSnafu, Error as ServerError, ExecuteGrpcRequestSnafu, OtherSnafu, PipelineSnafu,
Result as ServerResult,
};
use servers::interceptor::{LogIngestInterceptor, LogIngestInterceptorRef};
use servers::query_handler::PipelineHandler;
@@ -125,11 +125,13 @@ impl Instance {
ctx: QueryContextRef,
) -> ServerResult<Output> {
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&log);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
Some(
limiter
.limit_row_inserts(&log)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};
@@ -147,11 +149,13 @@ impl Instance {
ctx: QueryContextRef,
) -> ServerResult<Output> {
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&rows);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
Some(
limiter
.limit_row_inserts(&rows)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};

View File

@@ -16,8 +16,7 @@ use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use servers::error as server_error;
use servers::error::{AuthSnafu, InFlightWriteBytesExceededSnafu};
use servers::error::{self as server_error, AuthSnafu, ExecuteGrpcQuerySnafu, OtherSnafu};
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
@@ -43,11 +42,13 @@ impl OpentsdbProtocolHandler for Instance {
let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
Some(
limiter
.limit_row_inserts(&requests)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};
@@ -57,7 +58,7 @@ impl OpentsdbProtocolHandler for Instance {
.handle_row_inserts(requests, ctx, true, true)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;
.context(ExecuteGrpcQuerySnafu)?;
Ok(match output.data {
common_query::OutputData::AffectedRows(rows) => rows,

View File

@@ -24,7 +24,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
use pipeline::{GreptimePipelineParams, PipelineWay};
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, OtherSnafu, Result as ServerResult};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
@@ -84,11 +84,13 @@ impl OpenTelemetryProtocolHandler for Instance {
};
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&requests);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
Some(
limiter
.limit_row_inserts(&requests)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};
@@ -190,11 +192,13 @@ impl OpenTelemetryProtocolHandler for Instance {
.await?;
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_ctx_req(&opt_req);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
Some(
limiter
.limit_ctx_req(&opt_req)
.await
.map_err(BoxedError::new)
.context(OtherSnafu)?,
)
} else {
None
};

View File

@@ -30,7 +30,7 @@ use common_telemetry::{debug, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
@@ -176,11 +176,13 @@ impl PromStoreProtocolHandler for Instance {
interceptor_ref.pre_write(&request, ctx.clone())?;
let _guard = if let Some(limiter) = &self.limiter {
let result = limiter.limit_row_inserts(&request);
if result.is_none() {
return InFlightWriteBytesExceededSnafu.fail();
}
result
Some(
limiter
.limit_row_inserts(&request)
.await
.map_err(BoxedError::new)
.context(error::OtherSnafu)?,
)
} else {
None
};

View File

@@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use api::v1::column::Values;
@@ -21,61 +20,30 @@ use api::v1::value::ValueData;
use api::v1::{
Decimal128, InsertRequests, IntervalMonthDayNano, RowInsertRequest, RowInsertRequests,
};
use common_telemetry::{debug, warn};
use pipeline::ContextReq;
use snafu::ResultExt;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use crate::error::{AcquireLimiterSnafu, Result};
pub(crate) type LimiterRef = Arc<Limiter>;
/// A frontend request limiter that controls the total size of in-flight write requests.
/// A frontend request limiter that controls the total size of in-flight write
/// requests.
pub(crate) struct Limiter {
// The maximum number of bytes that can be in flight.
max_in_flight_write_bytes: u64,
// The current in-flight write bytes.
in_flight_write_bytes: Arc<AtomicU64>,
}
/// A counter for the in-flight write bytes.
pub(crate) struct InFlightWriteBytesCounter {
// The current in-flight write bytes.
in_flight_write_bytes: Arc<AtomicU64>,
// The write bytes that are being processed.
processing_write_bytes: u64,
}
impl InFlightWriteBytesCounter {
/// Creates a new InFlightWriteBytesCounter. It will decrease the in-flight write bytes when dropped.
pub fn new(in_flight_write_bytes: Arc<AtomicU64>, processing_write_bytes: u64) -> Self {
debug!(
"processing write bytes: {}, current in-flight write bytes: {}",
processing_write_bytes,
in_flight_write_bytes.load(Ordering::Relaxed)
);
Self {
in_flight_write_bytes,
processing_write_bytes,
}
}
}
impl Drop for InFlightWriteBytesCounter {
// When the request is finished, the in-flight write bytes should be decreased.
fn drop(&mut self) {
self.in_flight_write_bytes
.fetch_sub(self.processing_write_bytes, Ordering::Relaxed);
}
max_in_flight_write_bytes: usize,
byte_counter: Arc<Semaphore>,
}
impl Limiter {
pub fn new(max_in_flight_write_bytes: u64) -> Self {
pub fn new(max_in_flight_write_bytes: usize) -> Self {
Self {
byte_counter: Arc::new(Semaphore::new(max_in_flight_write_bytes)),
max_in_flight_write_bytes,
in_flight_write_bytes: Arc::new(AtomicU64::new(0)),
}
}
pub fn limit_request(&self, request: &Request) -> Option<InFlightWriteBytesCounter> {
pub async fn limit_request(&self, request: &Request) -> Result<OwnedSemaphorePermit> {
let size = match request {
Request::Inserts(requests) => self.insert_requests_data_size(requests),
Request::RowInserts(requests) => {
@@ -83,56 +51,35 @@ impl Limiter {
}
_ => 0,
};
self.limit_in_flight_write_bytes(size as u64)
self.limit_in_flight_write_bytes(size).await
}
pub fn limit_row_inserts(
pub async fn limit_row_inserts(
&self,
requests: &RowInsertRequests,
) -> Option<InFlightWriteBytesCounter> {
) -> Result<OwnedSemaphorePermit> {
let size = self.rows_insert_requests_data_size(requests.inserts.iter());
self.limit_in_flight_write_bytes(size as u64)
self.limit_in_flight_write_bytes(size).await
}
pub fn limit_ctx_req(&self, opt_req: &ContextReq) -> Option<InFlightWriteBytesCounter> {
pub async fn limit_ctx_req(&self, opt_req: &ContextReq) -> Result<OwnedSemaphorePermit> {
let size = self.rows_insert_requests_data_size(opt_req.ref_all_req());
self.limit_in_flight_write_bytes(size as u64)
self.limit_in_flight_write_bytes(size).await
}
/// Returns None if the in-flight write bytes exceed the maximum limit.
/// Otherwise, returns Some(InFlightWriteBytesCounter) and the in-flight write bytes will be increased.
pub fn limit_in_flight_write_bytes(&self, bytes: u64) -> Option<InFlightWriteBytesCounter> {
let result = self.in_flight_write_bytes.fetch_update(
Ordering::Relaxed,
Ordering::Relaxed,
|current| {
if current + bytes > self.max_in_flight_write_bytes {
warn!(
"in-flight write bytes exceed the maximum limit {}, request with {} bytes will be limited",
self.max_in_flight_write_bytes,
bytes
);
return None;
}
Some(current + bytes)
},
);
match result {
// Update the in-flight write bytes successfully.
Ok(_) => Some(InFlightWriteBytesCounter::new(
self.in_flight_write_bytes.clone(),
bytes,
)),
// It means the in-flight write bytes exceed the maximum limit.
Err(_) => None,
}
/// Await until more inflight bytes are available
pub async fn limit_in_flight_write_bytes(&self, bytes: usize) -> Result<OwnedSemaphorePermit> {
self.byte_counter
.clone()
.acquire_many_owned(bytes as u32)
.await
.context(AcquireLimiterSnafu)
}
/// Returns the current in-flight write bytes.
#[allow(dead_code)]
pub fn in_flight_write_bytes(&self) -> u64 {
self.in_flight_write_bytes.load(Ordering::Relaxed)
pub fn in_flight_write_bytes(&self) -> usize {
self.max_in_flight_write_bytes - self.byte_counter.available_permits()
}
fn insert_requests_data_size(&self, request: &InsertRequests) -> usize {
@@ -270,8 +217,10 @@ mod tests {
for _ in 0..tasks_count {
let limiter = limiter_ref.clone();
let handle = tokio::spawn(async move {
let result = limiter.limit_request(&generate_request(request_data_size));
assert!(result.is_some());
let result = limiter
.limit_request(&generate_request(request_data_size))
.await;
assert!(result.is_ok());
});
handles.push(handle);
}
@@ -282,23 +231,27 @@ mod tests {
}
}
#[test]
fn test_in_flight_write_bytes() {
#[tokio::test]
async fn test_in_flight_write_bytes() {
let limiter_ref: LimiterRef = Arc::new(Limiter::new(1024));
let req1 = generate_request(100);
let result1 = limiter_ref.limit_request(&req1);
assert!(result1.is_some());
let result1 = limiter_ref
.limit_request(&req1)
.await
.expect("failed to acquire permits");
assert_eq!(limiter_ref.in_flight_write_bytes(), 100);
let req2 = generate_request(200);
let result2 = limiter_ref.limit_request(&req2);
assert!(result2.is_some());
let result2 = limiter_ref
.limit_request(&req2)
.await
.expect("failed to acquire permits");
assert_eq!(limiter_ref.in_flight_write_bytes(), 300);
drop(result1.unwrap());
drop(result1);
assert_eq!(limiter_ref.in_flight_write_bytes(), 200);
drop(result2.unwrap());
drop(result2);
assert_eq!(limiter_ref.in_flight_write_bytes(), 0);
}
}

View File

@@ -594,12 +594,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("In-flight write bytes exceeded the maximum limit"))]
InFlightWriteBytesExceeded {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Invalid elasticsearch input, reason: {}", reason))]
InvalidElasticsearchInput {
reason: String,
@@ -759,8 +753,6 @@ impl ErrorExt for Error {
ConvertSqlValue { source, .. } => source.status_code(),
InFlightWriteBytesExceeded { .. } => StatusCode::RateLimited,
DurationOverflow { .. } => StatusCode::InvalidArguments,
HandleOtelArrowRequest { .. } => StatusCode::Internal,