From 199f03ae8d33e641b17f70576fff22ff699a3752 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 1 Jun 2026 22:04:47 +0800 Subject: [PATCH] feat(datanode): hold query permit for stream and expose limiter timeout Signed-off-by: evenyag --- config/config.md | 1 + config/datanode.example.toml | 3 ++ src/datanode/src/config.rs | 7 +++ src/datanode/src/datanode.rs | 3 +- src/datanode/src/region_server.rs | 84 ++++++++++++++++++++++++++----- 5 files changed, 84 insertions(+), 14 deletions(-) diff --git a/config/config.md b/config/config.md index 82297d484e..f6139227c1 100644 --- a/config/config.md +++ b/config/config.md @@ -451,6 +451,7 @@ | `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. | | `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. | | `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. | +| `concurrent_query_limiter_timeout` | String | `100ms` | Timeout to acquire a permit from the concurrent query limiter when `max_concurrent_queries` is reached. | | `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. | | `http` | -- | -- | The HTTP server options. | | `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. | diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 6effec4c87..673b9985b6 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -20,6 +20,9 @@ init_regions_parallelism = 16 ## The maximum concurrent queries allowed to be executed. Zero means unlimited. max_concurrent_queries = 0 +## Timeout to acquire a permit from the concurrent query limiter when `max_concurrent_queries` is reached. +concurrent_query_limiter_timeout = "100ms" + ## Enable telemetry to collect anonymous usage data. Enabled by default. #+ enable_telemetry = true diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 6f6815a869..a0011c71af 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -14,6 +14,8 @@ //! Datanode configurations +use std::time::Duration; + use common_base::readable_size::ReadableSize; use common_config::{Configurable, DEFAULT_DATA_HOME}; use common_options::memory::MemoryOptions; @@ -75,6 +77,10 @@ pub struct DatanodeOptions { pub wal: DatanodeWalConfig, pub storage: StorageConfig, pub max_concurrent_queries: usize, + /// Timeout to acquire a permit from the concurrent query limiter when + /// `max_concurrent_queries` is reached. Only effective when the limiter is enabled. + #[serde(with = "humantime_serde")] + pub concurrent_query_limiter_timeout: Duration, /// Options for different store engines. pub region_engine: Vec, pub logging: LoggingOptions, @@ -127,6 +133,7 @@ impl Default for DatanodeOptions { wal: DatanodeWalConfig::default(), storage: StorageConfig::default(), max_concurrent_queries: 0, + concurrent_query_limiter_timeout: Duration::from_millis(100), region_engine: vec![ RegionEngineConfig::Mito(MitoConfig::default()), RegionEngineConfig::File(FileEngineConfig::default()), diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index c848215d39..068f82cc87 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -426,8 +426,7 @@ impl DatanodeBuilder { event_listener, table_provider_factory, opts.max_concurrent_queries, - //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter. - Duration::from_millis(100), + opts.concurrent_query_limiter_timeout, opts.grpc.flight_compression, ); diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index aa3ffbfe3a..974e19c81b 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -17,6 +17,7 @@ mod catalog; use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -36,7 +37,8 @@ use common_error::status_code::StatusCode; use common_meta::datanode::TopicStatsReporter; use common_query::OutputData; use common_query::request::QueryRequest; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::adapter::RecordBatchMetrics; +use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use common_runtime::Runtime; use common_telemetry::tracing::{self, info_span}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; @@ -44,7 +46,10 @@ use common_telemetry::{debug, error, info, warn}; use dashmap::DashMap; use datafusion::datasource::TableProvider; use datafusion_common::tree_node::TreeNode; +use datatypes::schema::SchemaRef; use either::Either; +use futures::Stream; +use futures::task::{Context, Poll}; use futures_util::future::try_join_all; use metric_engine::engine::MetricEngine; use mito2::engine::{MITO_ENGINE_NAME, MitoEngine}; @@ -75,7 +80,7 @@ use store_api::region_request::{ RegionOpenRequest, RegionRequest, }; use store_api::storage::RegionId; -use tokio::sync::{Semaphore, SemaphorePermit}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::time::timeout; use tonic::{Request, Response, Result as TonicResult}; @@ -249,7 +254,7 @@ impl RegionServer { request: api::v1::region::QueryRequest, query_ctx: QueryContextRef, ) -> Result { - let _permit = if let Some(p) = &self.inner.parallelism { + let permit = if let Some(p) = &self.inner.parallelism { Some(p.acquire().await?) } else { None @@ -278,7 +283,8 @@ impl RegionServer { .await .context(DecodeLogicalPlanSnafu)?; - self.inner + let stream = self + .inner .handle_read( QueryRequest { header: request.header, @@ -287,12 +293,14 @@ impl RegionServer { }, query_ctx, ) - .await + .await?; + + Ok(maybe_guard_stream(stream, permit)) } #[tracing::instrument(skip_all)] pub async fn handle_read(&self, request: QueryRequest) -> Result { - let _permit = if let Some(p) = &self.inner.parallelism { + let permit = if let Some(p) = &self.inner.parallelism { Some(p.acquire().await?) } else { None @@ -313,9 +321,12 @@ impl RegionServer { .context(DataFusionSnafu)? .data; - self.inner + let stream = self + .inner .handle_read(QueryRequest { plan, ..request }, query_ctx) - .await + .await?; + + Ok(maybe_guard_stream(stream, permit)) } /// Returns all opened and reportable regions. @@ -879,7 +890,7 @@ struct RegionServerInner { } struct RegionServerParallelism { - semaphore: Semaphore, + semaphore: Arc, timeout: Duration, } @@ -892,19 +903,68 @@ impl RegionServerParallelism { return None; } Some(RegionServerParallelism { - semaphore: Semaphore::new(max_concurrent_queries), + semaphore: Arc::new(Semaphore::new(max_concurrent_queries)), timeout: concurrent_query_limiter_timeout, }) } - pub async fn acquire(&self) -> Result> { - timeout(self.timeout, self.semaphore.acquire()) + pub async fn acquire(&self) -> Result { + timeout(self.timeout, self.semaphore.clone().acquire_owned()) .await .context(ConcurrentQueryLimiterTimeoutSnafu)? .context(ConcurrentQueryLimiterClosedSnafu) } } +/// Wraps a record batch stream and holds a concurrency permit until the stream is +/// fully consumed (dropped), so `max_concurrent_queries` bounds the number of +/// in-flight read streams, not just query planning. +struct PermitGuardedStream { + inner: SendableRecordBatchStream, + _permit: OwnedSemaphorePermit, +} + +impl RecordBatchStream for PermitGuardedStream { + fn name(&self) -> &str { + self.inner.name() + } + + fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.inner.output_ordering() + } + + fn metrics(&self) -> Option { + self.inner.metrics() + } +} + +impl Stream for PermitGuardedStream { + type Item = common_recordbatch::error::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_next(cx) + } +} + +/// Wraps `stream` so it holds `permit` until fully consumed. Returns `stream` +/// unchanged when no permit was acquired (limiter disabled). +fn maybe_guard_stream( + stream: SendableRecordBatchStream, + permit: Option, +) -> SendableRecordBatchStream { + match permit { + Some(permit) => Box::pin(PermitGuardedStream { + inner: stream, + _permit: permit, + }), + None => stream, + } +} + enum CurrentEngine { Engine(RegionEngineRef), EarlyReturn(AffectedRows),