feat: flow query parallel=1&query faster with many windows&min one time window (#6324)

* feat: flow query parallel=1&query faster when
windows too many&min one time window

Signed-off-by: discord9 <discord9@163.com>

* chore: default flow query parallelism=1

Signed-off-by: discord9 <discord9@163.com>

* refactor: use query options in flownode per review

Signed-off-by: discord9 <discord9@163.com>

* docs: update comment

Signed-off-by: discord9 <discord9@163.com>

* chore: fix test

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* chore: make config docs

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-06-30 10:17:01 +08:00
committed by GitHub
parent bc42d35c2a
commit 616e76941a
11 changed files with 113 additions and 37 deletions

View File

@@ -598,3 +598,5 @@
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
| `query` | -- | -- | -- |
| `query.parallelism` | Integer | `1` | Parallelism of the query engine for query sent by flownode.<br/>Default to 1, so it won't use too much cpu or memory |

View File

@@ -108,3 +108,8 @@ default_ratio = 1.0
## The tokio console address.
## @toml2docs:none-default
#+ tokio_console_addr = "127.0.0.1"
[query]
## Parallelism of the query engine for query sent by flownode.
## Default to 1, so it won't use too much cpu or memory
parallelism = 1

View File

@@ -196,12 +196,22 @@ impl Database {
/// Retry if connection fails, max_retries is the max number of retries, so the total wait time
/// is `max_retries * GRPC_CONN_TIMEOUT`
pub async fn handle_with_retry(&self, request: Request, max_retries: u32) -> Result<u32> {
pub async fn handle_with_retry(
&self,
request: Request,
max_retries: u32,
hints: &[(&str, &str)],
) -> Result<u32> {
let mut client = make_database_client(&self.client)?.inner;
let mut retries = 0;
let request = self.to_rpc_request(request);
loop {
let raw_response = client.handle(request.clone()).await;
let mut tonic_request = tonic::Request::new(request.clone());
let metadata = tonic_request.metadata_mut();
Self::put_hints(metadata, hints)?;
let raw_response = client.handle(tonic_request).await;
match (raw_response, retries < max_retries) {
(Ok(resp), _) => return from_grpc_response(resp.into_inner()),
(Err(err), true) => {

View File

@@ -371,8 +371,11 @@ impl StartCommand {
let flow_metadata_manager = Arc::new(FlowMetadataManager::new(cached_meta_backend.clone()));
let flow_auth_header = get_flow_auth_options(&opts).context(StartFlownodeSnafu)?;
let frontend_client =
FrontendClient::from_meta_client(meta_client.clone(), flow_auth_header);
let frontend_client = FrontendClient::from_meta_client(
meta_client.clone(),
flow_auth_header,
opts.query.clone(),
);
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(
opts.clone(),

View File

@@ -564,7 +564,7 @@ impl StartCommand {
// for standalone not use grpc, but get a handler to frontend grpc client without
// actually make a connection
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler();
FrontendClient::from_empty_grpc_handler(opts.query.clone());
let frontend_client = Arc::new(frontend_client);
let flow_builder = FlownodeBuilder::new(
flownode_options,

View File

@@ -30,6 +30,7 @@ use meta_srv::metasrv::MetasrvOptions;
use meta_srv::selector::SelectorType;
use metric_engine::config::EngineConfig as MetricEngineConfig;
use mito2::config::MitoConfig;
use query::options::QueryOptions;
use servers::export_metrics::ExportMetricsOption;
use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
@@ -216,12 +217,15 @@ fn test_load_flownode_example_config() {
dir: format!("{}/{}", DEFAULT_DATA_HOME, DEFAULT_LOGGING_DIR),
level: Some("info".to_string()),
otlp_endpoint: Some(DEFAULT_OTLP_HTTP_ENDPOINT.to_string()),
otlp_export_protocol: Some(common_telemetry::logging::OtlpExportProtocol::Http),
tracing_sample_ratio: Some(Default::default()),
..Default::default()
},
tracing: Default::default(),
heartbeat: Default::default(),
query: Default::default(),
// flownode deliberately use a slower query parallelism
// to avoid overwhelming the frontend with too many queries
query: QueryOptions { parallelism: 1 },
meta_client: Some(MetaClientOptions {
metasrv_addrs: vec!["127.0.0.1:3002".to_string()],
timeout: Duration::from_secs(3),

View File

@@ -121,7 +121,9 @@ impl Default for FlownodeOptions {
logging: LoggingOptions::default(),
tracing: TracingOptions::default(),
heartbeat: HeartbeatOptions::default(),
query: QueryOptions::default(),
// flownode's query option is set to 1 to throttle flow's query so
// that it won't use too much cpu or memory
query: QueryOptions { parallelism: 1 },
user_provider: None,
}
}

View File

@@ -14,6 +14,7 @@
//! Frontend client to run flow as batching task which is time-window-aware normal query triggered every tick set by user
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::time::SystemTime;
@@ -29,6 +30,8 @@ use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use common_telemetry::warn;
use meta_client::client::MetaClient;
use query::datafusion::QUERY_PARALLELISM_HINT;
use query::options::QueryOptions;
use rand::rng;
use rand::seq::SliceRandom;
use servers::query_handler::grpc::GrpcQueryHandler;
@@ -84,27 +87,34 @@ pub enum FrontendClient {
meta_client: Arc<MetaClient>,
chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>,
query: QueryOptions,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
database_client: HandlerMutable,
query: QueryOptions,
},
}
impl FrontendClient {
/// Create a new empty frontend client, with a `HandlerMutable` to set the grpc handler later
pub fn from_empty_grpc_handler() -> (Self, HandlerMutable) {
pub fn from_empty_grpc_handler(query: QueryOptions) -> (Self, HandlerMutable) {
let handler = Arc::new(std::sync::Mutex::new(None));
(
Self::Standalone {
database_client: handler.clone(),
query,
},
handler,
)
}
pub fn from_meta_client(meta_client: Arc<MetaClient>, auth: Option<FlowAuthHeader>) -> Self {
pub fn from_meta_client(
meta_client: Arc<MetaClient>,
auth: Option<FlowAuthHeader>,
query: QueryOptions,
) -> Self {
common_telemetry::info!("Frontend client build with auth={:?}", auth);
Self::Distributed {
meta_client,
@@ -115,12 +125,17 @@ impl FrontendClient {
ChannelManager::with_config(cfg)
},
auth,
query,
}
}
pub fn from_grpc_handler(grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>) -> Self {
pub fn from_grpc_handler(
grpc_handler: Weak<dyn GrpcQueryHandlerWithBoxedError>,
query: QueryOptions,
) -> Self {
Self::Standalone {
database_client: Arc::new(std::sync::Mutex::new(Some(grpc_handler))),
query,
}
}
}
@@ -193,6 +208,7 @@ impl FrontendClient {
meta_client: _,
chnl_mgr,
auth,
query: _,
} = self
else {
return UnexpectedSnafu {
@@ -281,7 +297,9 @@ impl FrontendClient {
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
FrontendClient::Standalone { database_client } => {
FrontendClient::Standalone {
database_client, ..
} => {
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
@@ -328,7 +346,7 @@ impl FrontendClient {
peer_desc: &mut Option<PeerDesc>,
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { .. } => {
FrontendClient::Distributed { query, .. } => {
let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
@@ -336,16 +354,27 @@ impl FrontendClient {
});
db.database
.handle_with_retry(req.clone(), GRPC_MAX_RETRIES)
.handle_with_retry(
req.clone(),
GRPC_MAX_RETRIES,
&[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
)
.await
.with_context(|_| InvalidRequestSnafu {
context: format!("Failed to handle request at {:?}: {:?}", db.peer, req),
})
}
FrontendClient::Standalone { database_client } => {
FrontendClient::Standalone {
database_client,
query,
} => {
let ctx = QueryContextBuilder::default()
.current_catalog(catalog.to_string())
.current_schema(schema.to_string())
.extensions(HashMap::from([(
QUERY_PARALLELISM_HINT.to_string(),
query.parallelism.to_string(),
)]))
.build();
let ctx = Arc::new(ctx);
{

View File

@@ -76,38 +76,43 @@ impl TaskState {
/// Compute the next query delay based on the time window size or the last query duration.
/// Aiming to avoid too frequent queries. But also not too long delay.
/// The delay is computed as follows:
/// - If `time_window_size` is set, the delay is half the time window size, constrained to be
/// at least `last_query_duration` and at most `max_timeout`.
/// - If `time_window_size` is not set, the delay defaults to `last_query_duration`, constrained
/// to be at least `MIN_REFRESH_DURATION` and at most `max_timeout`.
///
/// If there are dirty time windows, the function returns an immediate execution time to clean them.
/// TODO: Make this behavior configurable.
/// next wait time is calculated as:
/// last query duration, capped by [max(min_run_interval, time_window_size), max_timeout],
/// note at most wait for `max_timeout`.
///
/// if current the dirty time range is longer than one query can handle,
/// execute immediately to faster clean up dirty time windows.
///
pub fn get_next_start_query_time(
&self,
flow_id: FlowId,
time_window_size: &Option<Duration>,
max_timeout: Option<Duration>,
) -> Instant {
let last_duration = max_timeout
.unwrap_or(self.last_query_duration)
.min(self.last_query_duration)
.max(MIN_REFRESH_DURATION);
// = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
let lower = time_window_size.unwrap_or(MIN_REFRESH_DURATION);
let next_duration = self.last_query_duration.max(lower);
let next_duration = if let Some(max_timeout) = max_timeout {
next_duration.min(max_timeout)
} else {
next_duration
};
let next_duration = time_window_size
.map(|t| {
let half = t / 2;
half.max(last_duration)
})
.unwrap_or(last_duration);
// if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() {
let cur_dirty_window_size = self.dirty_time_windows.window_size();
// compute how much time range can be handled in one query
let max_query_update_range = (*time_window_size)
.unwrap_or_default()
.mul_f64(DirtyTimeWindows::MAX_FILTER_NUM as f64);
// if dirty time range is more than one query can handle, execute immediately
// to faster clean up dirty time windows
if cur_dirty_window_size < max_query_update_range {
self.last_update_time + next_duration
} else {
// if dirty time windows can't be clean up in one query, execute immediately to faster
// clean up dirty time windows
debug!(
"Flow id = {}, still have {} dirty time window({:?}), execute immediately",
"Flow id = {}, still have too many {} dirty time window({:?}), execute immediately",
flow_id,
self.dirty_time_windows.windows.len(),
self.dirty_time_windows.windows
@@ -147,6 +152,18 @@ impl DirtyTimeWindows {
}
}
pub fn window_size(&self) -> Duration {
let mut ret = Duration::from_secs(0);
for (start, end) in &self.windows {
if let Some(end) = end {
if let Some(duration) = end.sub(start) {
ret += duration.to_std().unwrap_or_default();
}
}
}
ret
}
pub fn add_window(&mut self, start: Timestamp, end: Option<Timestamp>) {
self.windows.insert(start, end);
}

View File

@@ -62,6 +62,10 @@ use crate::planner::{DfLogicalPlanner, LogicalPlanner};
use crate::query_engine::{DescribeResult, QueryEngineContext, QueryEngineState};
use crate::{metrics, QueryEngine};
/// Query parallelism hint key.
/// This hint can be set in the query context to control the parallelism of the query execution.
pub const QUERY_PARALLELISM_HINT: &str = "query_parallelism";
pub struct DatafusionQueryEngine {
state: Arc<QueryEngineState>,
plugins: Plugins,
@@ -480,7 +484,7 @@ impl QueryEngine for DatafusionQueryEngine {
state.config_mut().set_extension(query_ctx.clone());
// note that hints in "x-greptime-hints" is automatically parsed
// and set to query context's extension, so we can get it from query context.
if let Some(parallelism) = query_ctx.extension("query_parallelism") {
if let Some(parallelism) = query_ctx.extension(QUERY_PARALLELISM_HINT) {
if let Ok(n) = parallelism.parse::<u64>() {
if n > 0 {
let new_cfg = state.config().clone().with_target_partitions(n as usize);

View File

@@ -172,7 +172,7 @@ impl GreptimeDbStandaloneBuilder {
);
let (frontend_client, frontend_instance_handler) =
FrontendClient::from_empty_grpc_handler();
FrontendClient::from_empty_grpc_handler(opts.query.clone());
let flow_builder = FlownodeBuilder::new(
Default::default(),
plugins.clone(),