diff --git a/config/config.md b/config/config.md
index 235b1f86fd..641eee4b58 100644
--- a/config/config.md
+++ b/config/config.md
@@ -17,6 +17,7 @@
| `default_timezone` | String | Unset | The default timezone of the server. |
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
+| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `runtime` | -- | -- | The runtime options. |
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
@@ -335,6 +336,7 @@
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.
By default, it provides services after all regions have been initialized. |
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
+| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
| `rpc_addr` | String | Unset | Deprecated, use `grpc.addr` instead. |
| `rpc_hostname` | String | Unset | Deprecated, use `grpc.hostname` instead. |
| `rpc_runtime_size` | Integer | Unset | Deprecated, use `grpc.runtime_size` instead. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 4388c4420f..e4a3dca6d3 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -19,6 +19,9 @@ enable_telemetry = true
## Parallelism of initializing regions.
init_regions_parallelism = 16
+## The maximum current queries allowed to be executed. Zero means unlimited.
+max_concurrent_queries = 0
+
## Deprecated, use `grpc.addr` instead.
## @toml2docs:none-default
rpc_addr = "127.0.0.1:3001"
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 040e1e62c2..1cd75e6414 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -15,6 +15,9 @@ init_regions_in_background = false
## Parallelism of initializing regions.
init_regions_parallelism = 16
+## The maximum current queries allowed to be executed. Zero means unlimited.
+max_concurrent_queries = 0
+
## The runtime options.
#+ [runtime]
## The number of threads to execute the runtime for global read operations.
diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs
index 7cb18131ed..70be2513b2 100644
--- a/src/datanode/src/config.rs
+++ b/src/datanode/src/config.rs
@@ -305,6 +305,7 @@ pub struct DatanodeOptions {
pub meta_client: Option,
pub wal: DatanodeWalConfig,
pub storage: StorageConfig,
+ pub max_concurrent_queries: usize,
/// Options for different store engines.
pub region_engine: Vec,
pub logging: LoggingOptions,
@@ -339,6 +340,7 @@ impl Default for DatanodeOptions {
meta_client: None,
wal: DatanodeWalConfig::default(),
storage: StorageConfig::default(),
+ max_concurrent_queries: 0,
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
RegionEngineConfig::File(FileEngineConfig::default()),
diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs
index 149aa44ebe..f5d7bd9fc6 100644
--- a/src/datanode/src/datanode.rs
+++ b/src/datanode/src/datanode.rs
@@ -314,7 +314,7 @@ impl DatanodeBuilder {
&self,
event_listener: RegionServerEventListenerRef,
) -> Result {
- let opts = &self.opts;
+ let opts: &DatanodeOptions = &self.opts;
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
@@ -334,6 +334,9 @@ impl DatanodeBuilder {
common_runtime::global_runtime(),
event_listener,
table_provider_factory,
+ opts.max_concurrent_queries,
+ //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
+ Duration::from_millis(100),
);
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs
index 5717d0a403..0b36245924 100644
--- a/src/datanode/src/error.rs
+++ b/src/datanode/src/error.rs
@@ -22,6 +22,7 @@ use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
use store_api::storage::RegionId;
use table::error::Error as TableError;
+use tokio::time::error::Elapsed;
/// Business error of datanode.
#[derive(Snafu)]
@@ -347,6 +348,22 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
+
+ #[snafu(display("Failed to acquire permit, source closed"))]
+ ConcurrentQueryLimiterClosed {
+ #[snafu(source)]
+ error: tokio::sync::AcquireError,
+ #[snafu(implicit)]
+ location: Location,
+ },
+
+ #[snafu(display("Failed to acquire permit under timeouts"))]
+ ConcurrentQueryLimiterTimeout {
+ #[snafu(source)]
+ error: Elapsed,
+ #[snafu(implicit)]
+ location: Location,
+ },
}
pub type Result = std::result::Result;
@@ -411,6 +428,9 @@ impl ErrorExt for Error {
FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
+ ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
+ StatusCode::RegionBusy
+ }
}
}
diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs
index 54e1cdbafd..4fcdfb86af 100644
--- a/src/datanode/src/region_server.rs
+++ b/src/datanode/src/region_server.rs
@@ -16,6 +16,7 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
+use std::time::Duration;
use api::region::RegionResponse;
use api::v1::region::{region_request, RegionResponse as RegionResponseV1};
@@ -58,10 +59,13 @@ use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
+use tokio::sync::{Semaphore, SemaphorePermit};
+use tokio::time::timeout;
use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
- self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
+ self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
+ ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu,
HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
@@ -90,6 +94,8 @@ impl RegionServer {
runtime,
event_listener,
Arc::new(DummyTableProviderFactory),
+ 0,
+ Duration::from_millis(0),
)
}
@@ -98,6 +104,8 @@ impl RegionServer {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
+ max_concurrent_queries: usize,
+ concurrent_query_limiter_timeout: Duration,
) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(
@@ -105,6 +113,10 @@ impl RegionServer {
runtime,
event_listener,
table_provider_factory,
+ RegionServerParallelism::from_opts(
+ max_concurrent_queries,
+ concurrent_query_limiter_timeout,
+ ),
)),
}
}
@@ -167,6 +179,11 @@ impl RegionServer {
&self,
request: api::v1::region::QueryRequest,
) -> Result {
+ let _permit = if let Some(p) = &self.inner.parallelism {
+ Some(p.acquire().await?)
+ } else {
+ None
+ };
let region_id = RegionId::from_u64(request.region_id);
let provider = self.table_provider(region_id).await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
@@ -200,6 +217,11 @@ impl RegionServer {
#[tracing::instrument(skip_all)]
pub async fn handle_read(&self, request: QueryRequest) -> Result {
+ let _permit = if let Some(p) = &self.inner.parallelism {
+ Some(p.acquire().await?)
+ } else {
+ None
+ };
let provider = self.table_provider(request.region_id).await?;
struct RegionDataSourceInjector {
@@ -450,6 +472,36 @@ struct RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
+ // The number of queries allowed to be executed at the same time.
+ // Act as last line of defense on datanode to prevent query overloading.
+ parallelism: Option,
+}
+
+struct RegionServerParallelism {
+ semaphore: Semaphore,
+ timeout: Duration,
+}
+
+impl RegionServerParallelism {
+ pub fn from_opts(
+ max_concurrent_queries: usize,
+ concurrent_query_limiter_timeout: Duration,
+ ) -> Option {
+ if max_concurrent_queries == 0 {
+ return None;
+ }
+ Some(RegionServerParallelism {
+ semaphore: Semaphore::new(max_concurrent_queries),
+ timeout: concurrent_query_limiter_timeout,
+ })
+ }
+
+ pub async fn acquire(&self) -> Result {
+ timeout(self.timeout, self.semaphore.acquire())
+ .await
+ .context(ConcurrentQueryLimiterTimeoutSnafu)?
+ .context(ConcurrentQueryLimiterClosedSnafu)
+ }
}
enum CurrentEngine {
@@ -478,6 +530,7 @@ impl RegionServerInner {
runtime: Runtime,
event_listener: RegionServerEventListenerRef,
table_provider_factory: TableProviderFactoryRef,
+ parallelism: Option,
) -> Self {
Self {
engines: RwLock::new(HashMap::new()),
@@ -486,6 +539,7 @@ impl RegionServerInner {
runtime,
event_listener,
table_provider_factory,
+ parallelism,
}
}
@@ -1284,4 +1338,23 @@ mod tests {
assert(result);
}
}
+
+ #[tokio::test]
+ async fn test_region_server_parallism() {
+ let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
+ let first_query = p.acquire().await;
+ assert!(first_query.is_ok());
+ let second_query = p.acquire().await;
+ assert!(second_query.is_ok());
+ let third_query = p.acquire().await;
+ assert!(third_query.is_err());
+ let err = third_query.unwrap_err();
+ assert_eq!(
+ err.output_msg(),
+ "Failed to acquire permit under timeouts: deadline has elapsed".to_string()
+ );
+ drop(first_query);
+ let forth_query = p.acquire().await;
+ assert!(forth_query.is_ok());
+ }
}