mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-06-02 21:30:38 +00:00
feat(datanode): hold query permit for stream and expose limiter timeout
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -451,6 +451,7 @@
|
||||
| `init_regions_in_background` | Bool | `false` | Initialize all regions in the background during the startup.<br/>By default, it provides services after all regions have been initialized. |
|
||||
| `init_regions_parallelism` | Integer | `16` | Parallelism of initializing regions. |
|
||||
| `max_concurrent_queries` | Integer | `0` | The maximum concurrent queries allowed to be executed. Zero means unlimited. |
|
||||
| `concurrent_query_limiter_timeout` | String | `100ms` | Timeout to acquire a permit from the concurrent query limiter when `max_concurrent_queries` is reached. |
|
||||
| `enable_telemetry` | Bool | `true` | Enable telemetry to collect anonymous usage data. Enabled by default. |
|
||||
| `http` | -- | -- | The HTTP server options. |
|
||||
| `http.addr` | String | `127.0.0.1:4000` | The address to bind the HTTP server. |
|
||||
|
||||
@@ -20,6 +20,9 @@ init_regions_parallelism = 16
|
||||
## The maximum concurrent queries allowed to be executed. Zero means unlimited.
|
||||
max_concurrent_queries = 0
|
||||
|
||||
## Timeout to acquire a permit from the concurrent query limiter when `max_concurrent_queries` is reached.
|
||||
concurrent_query_limiter_timeout = "100ms"
|
||||
|
||||
## Enable telemetry to collect anonymous usage data. Enabled by default.
|
||||
#+ enable_telemetry = true
|
||||
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
//! Datanode configurations
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_config::{Configurable, DEFAULT_DATA_HOME};
|
||||
use common_options::memory::MemoryOptions;
|
||||
@@ -75,6 +77,10 @@ pub struct DatanodeOptions {
|
||||
pub wal: DatanodeWalConfig,
|
||||
pub storage: StorageConfig,
|
||||
pub max_concurrent_queries: usize,
|
||||
/// Timeout to acquire a permit from the concurrent query limiter when
|
||||
/// `max_concurrent_queries` is reached. Only effective when the limiter is enabled.
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub concurrent_query_limiter_timeout: Duration,
|
||||
/// Options for different store engines.
|
||||
pub region_engine: Vec<RegionEngineConfig>,
|
||||
pub logging: LoggingOptions,
|
||||
@@ -127,6 +133,7 @@ impl Default for DatanodeOptions {
|
||||
wal: DatanodeWalConfig::default(),
|
||||
storage: StorageConfig::default(),
|
||||
max_concurrent_queries: 0,
|
||||
concurrent_query_limiter_timeout: Duration::from_millis(100),
|
||||
region_engine: vec![
|
||||
RegionEngineConfig::Mito(MitoConfig::default()),
|
||||
RegionEngineConfig::File(FileEngineConfig::default()),
|
||||
|
||||
@@ -426,8 +426,7 @@ impl DatanodeBuilder {
|
||||
event_listener,
|
||||
table_provider_factory,
|
||||
opts.max_concurrent_queries,
|
||||
//TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
|
||||
Duration::from_millis(100),
|
||||
opts.concurrent_query_limiter_timeout,
|
||||
opts.grpc.flight_compression,
|
||||
);
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ mod catalog;
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Debug;
|
||||
use std::ops::Deref;
|
||||
use std::pin::Pin;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
@@ -36,7 +37,8 @@ use common_error::status_code::StatusCode;
|
||||
use common_meta::datanode::TopicStatsReporter;
|
||||
use common_query::OutputData;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_recordbatch::adapter::RecordBatchMetrics;
|
||||
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
|
||||
use common_runtime::Runtime;
|
||||
use common_telemetry::tracing::{self, info_span};
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
@@ -44,7 +46,10 @@ use common_telemetry::{debug, error, info, warn};
|
||||
use dashmap::DashMap;
|
||||
use datafusion::datasource::TableProvider;
|
||||
use datafusion_common::tree_node::TreeNode;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use either::Either;
|
||||
use futures::Stream;
|
||||
use futures::task::{Context, Poll};
|
||||
use futures_util::future::try_join_all;
|
||||
use metric_engine::engine::MetricEngine;
|
||||
use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
|
||||
@@ -75,7 +80,7 @@ use store_api::region_request::{
|
||||
RegionOpenRequest, RegionRequest,
|
||||
};
|
||||
use store_api::storage::RegionId;
|
||||
use tokio::sync::{Semaphore, SemaphorePermit};
|
||||
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
|
||||
use tokio::time::timeout;
|
||||
use tonic::{Request, Response, Result as TonicResult};
|
||||
|
||||
@@ -249,7 +254,7 @@ impl RegionServer {
|
||||
request: api::v1::region::QueryRequest,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
let _permit = if let Some(p) = &self.inner.parallelism {
|
||||
let permit = if let Some(p) = &self.inner.parallelism {
|
||||
Some(p.acquire().await?)
|
||||
} else {
|
||||
None
|
||||
@@ -278,7 +283,8 @@ impl RegionServer {
|
||||
.await
|
||||
.context(DecodeLogicalPlanSnafu)?;
|
||||
|
||||
self.inner
|
||||
let stream = self
|
||||
.inner
|
||||
.handle_read(
|
||||
QueryRequest {
|
||||
header: request.header,
|
||||
@@ -287,12 +293,14 @@ impl RegionServer {
|
||||
},
|
||||
query_ctx,
|
||||
)
|
||||
.await
|
||||
.await?;
|
||||
|
||||
Ok(maybe_guard_stream(stream, permit))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
|
||||
let _permit = if let Some(p) = &self.inner.parallelism {
|
||||
let permit = if let Some(p) = &self.inner.parallelism {
|
||||
Some(p.acquire().await?)
|
||||
} else {
|
||||
None
|
||||
@@ -313,9 +321,12 @@ impl RegionServer {
|
||||
.context(DataFusionSnafu)?
|
||||
.data;
|
||||
|
||||
self.inner
|
||||
let stream = self
|
||||
.inner
|
||||
.handle_read(QueryRequest { plan, ..request }, query_ctx)
|
||||
.await
|
||||
.await?;
|
||||
|
||||
Ok(maybe_guard_stream(stream, permit))
|
||||
}
|
||||
|
||||
/// Returns all opened and reportable regions.
|
||||
@@ -879,7 +890,7 @@ struct RegionServerInner {
|
||||
}
|
||||
|
||||
struct RegionServerParallelism {
|
||||
semaphore: Semaphore,
|
||||
semaphore: Arc<Semaphore>,
|
||||
timeout: Duration,
|
||||
}
|
||||
|
||||
@@ -892,19 +903,68 @@ impl RegionServerParallelism {
|
||||
return None;
|
||||
}
|
||||
Some(RegionServerParallelism {
|
||||
semaphore: Semaphore::new(max_concurrent_queries),
|
||||
semaphore: Arc::new(Semaphore::new(max_concurrent_queries)),
|
||||
timeout: concurrent_query_limiter_timeout,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn acquire(&self) -> Result<SemaphorePermit<'_>> {
|
||||
timeout(self.timeout, self.semaphore.acquire())
|
||||
pub async fn acquire(&self) -> Result<OwnedSemaphorePermit> {
|
||||
timeout(self.timeout, self.semaphore.clone().acquire_owned())
|
||||
.await
|
||||
.context(ConcurrentQueryLimiterTimeoutSnafu)?
|
||||
.context(ConcurrentQueryLimiterClosedSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps a record batch stream and holds a concurrency permit until the stream is
|
||||
/// fully consumed (dropped), so `max_concurrent_queries` bounds the number of
|
||||
/// in-flight read streams, not just query planning.
|
||||
struct PermitGuardedStream {
|
||||
inner: SendableRecordBatchStream,
|
||||
_permit: OwnedSemaphorePermit,
|
||||
}
|
||||
|
||||
impl RecordBatchStream for PermitGuardedStream {
|
||||
fn name(&self) -> &str {
|
||||
self.inner.name()
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.inner.schema()
|
||||
}
|
||||
|
||||
fn output_ordering(&self) -> Option<&[OrderOption]> {
|
||||
self.inner.output_ordering()
|
||||
}
|
||||
|
||||
fn metrics(&self) -> Option<RecordBatchMetrics> {
|
||||
self.inner.metrics()
|
||||
}
|
||||
}
|
||||
|
||||
impl Stream for PermitGuardedStream {
|
||||
type Item = common_recordbatch::error::Result<RecordBatch>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||
Pin::new(&mut self.inner).poll_next(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Wraps `stream` so it holds `permit` until fully consumed. Returns `stream`
|
||||
/// unchanged when no permit was acquired (limiter disabled).
|
||||
fn maybe_guard_stream(
|
||||
stream: SendableRecordBatchStream,
|
||||
permit: Option<OwnedSemaphorePermit>,
|
||||
) -> SendableRecordBatchStream {
|
||||
match permit {
|
||||
Some(permit) => Box::pin(PermitGuardedStream {
|
||||
inner: stream,
|
||||
_permit: permit,
|
||||
}),
|
||||
None => stream,
|
||||
}
|
||||
}
|
||||
|
||||
enum CurrentEngine {
|
||||
Engine(RegionEngineRef),
|
||||
EarlyReturn(AffectedRows),
|
||||
|
||||
Reference in New Issue
Block a user