diff --git a/config/config.md b/config/config.md
index 82297d484e..f6139227c1 100644
--- a/config/config.md
+++ b/config/config.md
@@ -451,6 +451,7 @@
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
| `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. |
+| `concurrent_query_limiter_timeout` | String | `100ms` | Timeout to acquire a permit from the concurrent query limiter when `max_concurrent_queries` is reached. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
| `http` | -- | -- | The HTTP server options. |
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 6effec4c87..673b9985b6 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -20,6 +20,9 @@ init_regions_parallelism = 16
## The maximum concurrent queries allowed to be executed. Zero means unlimited.
max_concurrent_queries = 0
+## Timeout to acquire a permit from the concurrent query limiter when `max_concurrent_queries` is reached.
+concurrent_query_limiter_timeout = "100ms"
+
## Enable telemetry to collect anonymous usage data. Enabled by default.
#+ enable_telemetry = true
diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs
index 6f6815a869..a0011c71af 100644
--- a/src/datanode/src/config.rs
+++ b/src/datanode/src/config.rs
@@ -14,6 +14,8 @@
//! Datanode configurations
+use std::time::Duration;
+
use common_base::readable_size::ReadableSize;
use common_config::{Configurable, DEFAULT_DATA_HOME};
use common_options::memory::MemoryOptions;
@@ -75,6 +77,10 @@ pub struct DatanodeOptions {
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
pub max_concurrent_queries: usize,
+ /// Timeout to acquire a permit from the concurrent query limiter when
+ /// `max_concurrent_queries` is reached. Only effective when the limiter is enabled.
+ #[serde(with = "humantime_serde")]
+ pub concurrent_query_limiter_timeout: Duration,
/// Options for different store engines.
pub region_engine: Vec,
pub logging: LoggingOptions,
@@ -127,6 +133,7 @@ impl Default for DatanodeOptions {
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
max_concurrent_queries: 0,
+ concurrent_query_limiter_timeout: Duration::from_millis(100),
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index c848215d39..068f82cc87 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -426,8 +426,7 @@ impl DatanodeBuilder {
event_listener,
table_provider_factory,
opts.max_concurrent_queries,
- //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
- Duration::from_millis(100),
+ opts.concurrent_query_limiter_timeout,
opts.grpc.flight_compression,
);
diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs
index aa3ffbfe3a..974e19c81b 100644
--- a/src/datanode/src/region_server.rs
+++ b/src/datanode/src/region_server.rs
@@ -17,6 +17,7 @@ mod catalog;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
+use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::time::Duration;
@@ -36,7 +37,8 @@ use common_error::status_code::StatusCode;
use common_meta::datanode::TopicStatsReporter;
use common_query::OutputData;
use common_query::request::QueryRequest;
-use common_recordbatch::SendableRecordBatchStream;
+use common_recordbatch::adapter::RecordBatchMetrics;
+use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
@@ -44,7 +46,10 @@ use common_telemetry::{debug, error, info, warn};
use dashmap::DashMap;
use datafusion::datasource::TableProvider;
use datafusion_common::tree_node::TreeNode;
+use datatypes::schema::SchemaRef;
use either::Either;
+use futures::Stream;
+use futures::task::{Context, Poll};
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
@@ -75,7 +80,7 @@ use store_api::region_request::{
RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
-use tokio::sync::{Semaphore, SemaphorePermit};
+use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::time::timeout;
use tonic::{Request, Response, Result as TonicResult};
@@ -249,7 +254,7 @@ impl RegionServer {
request: api::v1::region::QueryRequest,
query_ctx: QueryContextRef,
) -> Result {
- let _permit = if let Some(p) = &self.inner.parallelism {
+ let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
} else {
None
@@ -278,7 +283,8 @@ impl RegionServer {
.await
.context(DecodeLogicalPlanSnafu)?;
- self.inner
+ let stream = self
+ .inner
.handle_read(
QueryRequest {
header: request.header,
@@ -287,12 +293,14 @@ impl RegionServer {
},
query_ctx,
)
- .await
+ .await?;
+
+ Ok(maybe_guard_stream(stream, permit))
}
#[tracing::instrument(skip_all)]
pub async fn handle_read(&self, request: QueryRequest) -> Result {
- let _permit = if let Some(p) = &self.inner.parallelism {
+ let permit = if let Some(p) = &self.inner.parallelism {
Some(p.acquire().await?)
} else {
None
@@ -313,9 +321,12 @@ impl RegionServer {
.context(DataFusionSnafu)?
.data;
- self.inner
+ let stream = self
+ .inner
.handle_read(QueryRequest { plan, ..request }, query_ctx)
- .await
+ .await?;
+
+ Ok(maybe_guard_stream(stream, permit))
}
/// Returns all opened and reportable regions.
@@ -879,7 +890,7 @@ struct RegionServerInner {
}
struct RegionServerParallelism {
- semaphore: Semaphore,
+ semaphore: Arc,
timeout: Duration,
}
@@ -892,19 +903,68 @@ impl RegionServerParallelism {
return None;
}
Some(RegionServerParallelism {
- semaphore: Semaphore::new(max_concurrent_queries),
+ semaphore: Arc::new(Semaphore::new(max_concurrent_queries)),
timeout: concurrent_query_limiter_timeout,
})
}
- pub async fn acquire(&self) -> Result> {
- timeout(self.timeout, self.semaphore.acquire())
+ pub async fn acquire(&self) -> Result {
+ timeout(self.timeout, self.semaphore.clone().acquire_owned())
.await
.context(ConcurrentQueryLimiterTimeoutSnafu)?
.context(ConcurrentQueryLimiterClosedSnafu)
}
}
+/// Wraps a record batch stream and holds a concurrency permit until the stream is
+/// fully consumed (dropped), so `max_concurrent_queries` bounds the number of
+/// in-flight read streams, not just query planning.
+struct PermitGuardedStream {
+ inner: SendableRecordBatchStream,
+ _permit: OwnedSemaphorePermit,
+}
+
+impl RecordBatchStream for PermitGuardedStream {
+ fn name(&self) -> &str {
+ self.inner.name()
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.inner.schema()
+ }
+
+ fn output_ordering(&self) -> Option<&[OrderOption]> {
+ self.inner.output_ordering()
+ }
+
+ fn metrics(&self) -> Option {
+ self.inner.metrics()
+ }
+}
+
+impl Stream for PermitGuardedStream {
+ type Item = common_recordbatch::error::Result;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll