mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-08 14:22:58 +00:00
feat: protect datanode with concurrency limit. (#4699)
Adding parallelism in region server to protect datanode from query overload.
This commit is contained in:
@@ -17,6 +17,7 @@
|
||||
| `default_timezone` | String | Unset | The default timezone of the server. |
|
||||
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
|
||||
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
|
||||
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
|
||||
| `runtime` | -- | -- | The runtime options. |
|
||||
| `runtime.global_rt_size` | Integer | `8` | The number of threads to execute the runtime for global read operations. |
|
||||
| `runtime.compact_rt_size` | Integer | `4` | The number of threads to execute the runtime for global write operations. |
|
||||
@@ -335,6 +336,7 @@
|
||||
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
|
||||
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. |
|
||||
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
|
||||
| `max_concurrent_queries` | Integer | `0` | The maximum current queries allowed to be executed. Zero means unlimited. |
|
||||
| `rpc_addr` | String | Unset | Deprecated, use `grpc.addr` instead. |
|
||||
| `rpc_hostname` | String | Unset | Deprecated, use `grpc.hostname` instead. |
|
||||
| `rpc_runtime_size` | Integer | Unset | Deprecated, use `grpc.runtime_size` instead. |
|
||||
|
||||
@@ -19,6 +19,9 @@ enable_telemetry = true
|
||||
## Parallelism of initializing regions.
|
||||
init_regions_parallelism = 16
|
||||
|
||||
## The maximum current queries allowed to be executed. Zero means unlimited.
|
||||
max_concurrent_queries = 0
|
||||
|
||||
## Deprecated, use `grpc.addr` instead.
|
||||
## @toml2docs:none-default
|
||||
rpc_addr = "127.0.0.1:3001"
|
||||
|
||||
@@ -15,6 +15,9 @@ init_regions_in_background = false
|
||||
## Parallelism of initializing regions.
|
||||
init_regions_parallelism = 16
|
||||
|
||||
## The maximum current queries allowed to be executed. Zero means unlimited.
|
||||
max_concurrent_queries = 0
|
||||
|
||||
## The runtime options.
|
||||
#+ [runtime]
|
||||
## The number of threads to execute the runtime for global read operations.
|
||||
|
||||
@@ -305,6 +305,7 @@ pub struct DatanodeOptions {
|
||||
pub meta_client: Option<MetaClientOptions>,
|
||||
pub wal: DatanodeWalConfig,
|
||||
pub storage: StorageConfig,
|
||||
pub max_concurrent_queries: usize,
|
||||
/// Options for different store engines.
|
||||
pub region_engine: Vec<RegionEngineConfig>,
|
||||
pub logging: LoggingOptions,
|
||||
@@ -339,6 +340,7 @@ impl Default for DatanodeOptions {
|
||||
meta_client: None,
|
||||
wal: DatanodeWalConfig::default(),
|
||||
storage: StorageConfig::default(),
|
||||
max_concurrent_queries: 0,
|
||||
region_engine: vec![
|
||||
RegionEngineConfig::Mito(MitoConfig::default()),
|
||||
RegionEngineConfig::File(FileEngineConfig::default()),
|
||||
|
||||
@@ -314,7 +314,7 @@ impl DatanodeBuilder {
|
||||
&self,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
) -> Result<RegionServer> {
|
||||
let opts = &self.opts;
|
||||
let opts: &DatanodeOptions = &self.opts;
|
||||
|
||||
let query_engine_factory = QueryEngineFactory::new_with_plugins(
|
||||
// query engine in datanode only executes plan with resolved table source.
|
||||
@@ -334,6 +334,9 @@ impl DatanodeBuilder {
|
||||
common_runtime::global_runtime(),
|
||||
event_listener,
|
||||
table_provider_factory,
|
||||
opts.max_concurrent_queries,
|
||||
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
|
||||
Duration::from_millis(100),
|
||||
);
|
||||
|
||||
let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
|
||||
|
||||
@@ -22,6 +22,7 @@ use common_macro::stack_trace_debug;
|
||||
use snafu::{Location, Snafu};
|
||||
use store_api::storage::RegionId;
|
||||
use table::error::Error as TableError;
|
||||
use tokio::time::error::Elapsed;
|
||||
|
||||
/// Business error of datanode.
|
||||
#[derive(Snafu)]
|
||||
@@ -347,6 +348,22 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to acquire permit, source closed"))]
|
||||
ConcurrentQueryLimiterClosed {
|
||||
#[snafu(source)]
|
||||
error: tokio::sync::AcquireError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to acquire permit under timeouts"))]
|
||||
ConcurrentQueryLimiterTimeout {
|
||||
#[snafu(source)]
|
||||
error: Elapsed,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -411,6 +428,9 @@ impl ErrorExt for Error {
|
||||
|
||||
FindLogicalRegions { source, .. } => source.status_code(),
|
||||
BuildMitoEngine { source, .. } => source.status_code(),
|
||||
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
|
||||
StatusCode::RegionBusy
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -16,6 +16,7 @@ use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::ops::Deref;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::region::{region_request, RegionResponse as RegionResponseV1};
|
||||
@@ -58,10 +59,13 @@ use store_api::region_request::{
|
||||
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
|
||||
};
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::{Semaphore, SemaphorePermit};
|
||||
use tokio::time::timeout;
|
||||
use tonic::{Request, Response, Result as TonicResult};
|
||||
|
||||
use crate::error::{
|
||||
self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
|
||||
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
|
||||
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
|
||||
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu,
|
||||
HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
|
||||
RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
|
||||
@@ -90,6 +94,8 @@ impl RegionServer {
|
||||
runtime,
|
||||
event_listener,
|
||||
Arc::new(DummyTableProviderFactory),
|
||||
0,
|
||||
Duration::from_millis(0),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -98,6 +104,8 @@ impl RegionServer {
|
||||
runtime: Runtime,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
table_provider_factory: TableProviderFactoryRef,
|
||||
max_concurrent_queries: usize,
|
||||
concurrent_query_limiter_timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(RegionServerInner::new(
|
||||
@@ -105,6 +113,10 @@ impl RegionServer {
|
||||
runtime,
|
||||
event_listener,
|
||||
table_provider_factory,
|
||||
RegionServerParallelism::from_opts(
|
||||
max_concurrent_queries,
|
||||
concurrent_query_limiter_timeout,
|
||||
),
|
||||
)),
|
||||
}
|
||||
}
|
||||
@@ -167,6 +179,11 @@ impl RegionServer {
|
||||
&self,
|
||||
request: api::v1::region::QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
let _permit = if let Some(p) = &self.inner.parallelism {
|
||||
Some(p.acquire().await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let region_id = RegionId::from_u64(request.region_id);
|
||||
let provider = self.table_provider(region_id).await?;
|
||||
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
|
||||
@@ -200,6 +217,11 @@ impl RegionServer {
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
|
||||
let _permit = if let Some(p) = &self.inner.parallelism {
|
||||
Some(p.acquire().await?)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let provider = self.table_provider(request.region_id).await?;
|
||||
|
||||
struct RegionDataSourceInjector {
|
||||
@@ -450,6 +472,36 @@ struct RegionServerInner {
|
||||
runtime: Runtime,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
table_provider_factory: TableProviderFactoryRef,
|
||||
// The number of queries allowed to be executed at the same time.
|
||||
// Act as last line of defense on datanode to prevent query overloading.
|
||||
parallelism: Option<RegionServerParallelism>,
|
||||
}
|
||||
|
||||
struct RegionServerParallelism {
|
||||
semaphore: Semaphore,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
impl RegionServerParallelism {
|
||||
pub fn from_opts(
|
||||
max_concurrent_queries: usize,
|
||||
concurrent_query_limiter_timeout: Duration,
|
||||
) -> Option<Self> {
|
||||
if max_concurrent_queries == 0 {
|
||||
return None;
|
||||
}
|
||||
Some(RegionServerParallelism {
|
||||
semaphore: Semaphore::new(max_concurrent_queries),
|
||||
timeout: concurrent_query_limiter_timeout,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn acquire(&self) -> Result<SemaphorePermit> {
|
||||
timeout(self.timeout, self.semaphore.acquire())
|
||||
.await
|
||||
.context(ConcurrentQueryLimiterTimeoutSnafu)?
|
||||
.context(ConcurrentQueryLimiterClosedSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
enum CurrentEngine {
|
||||
@@ -478,6 +530,7 @@ impl RegionServerInner {
|
||||
runtime: Runtime,
|
||||
event_listener: RegionServerEventListenerRef,
|
||||
table_provider_factory: TableProviderFactoryRef,
|
||||
parallelism: Option<RegionServerParallelism>,
|
||||
) -> Self {
|
||||
Self {
|
||||
engines: RwLock::new(HashMap::new()),
|
||||
@@ -486,6 +539,7 @@ impl RegionServerInner {
|
||||
runtime,
|
||||
event_listener,
|
||||
table_provider_factory,
|
||||
parallelism,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1284,4 +1338,23 @@ mod tests {
|
||||
assert(result);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_region_server_parallism() {
|
||||
let p = RegionServerParallelism::from_opts(2, Duration::from_millis(1)).unwrap();
|
||||
let first_query = p.acquire().await;
|
||||
assert!(first_query.is_ok());
|
||||
let second_query = p.acquire().await;
|
||||
assert!(second_query.is_ok());
|
||||
let third_query = p.acquire().await;
|
||||
assert!(third_query.is_err());
|
||||
let err = third_query.unwrap_err();
|
||||
assert_eq!(
|
||||
err.output_msg(),
|
||||
"Failed to acquire permit under timeouts: deadline has elapsed".to_string()
|
||||
);
|
||||
drop(first_query);
|
||||
let forth_query = p.acquire().await;
|
||||
assert!(forth_query.is_ok());
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user