refactor: expose flow batching mode constants to config (#6442)

* refactor: make flow batching mode constant to configs

Signed-off-by: discord9 <discord9@163.com>

* docs: config docs

Signed-off-by: discord9 <discord9@163.com>

* docs: update code comment

Signed-off-by: discord9 <discord9@163.com>

* test: fix test_config_api

Signed-off-by: discord9 <discord9@163.com>

* feat: more batch opts

Signed-off-by: discord9 <discord9@163.com>

* fix after rebase

Signed-off-by: discord9 <discord9@163.com>

* chore: per review

Signed-off-by: discord9 <discord9@163.com>

* per review experimental options

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-07-16 16:05:20 +08:00
committed by GitHub
parent fac8c3e62c
commit bbc9f3ea1e
14 changed files with 242 additions and 96 deletions

1
Cargo.lock generated
View File

@@ -4688,6 +4688,7 @@ dependencies = [
"get-size2",
"greptime-proto",
"http 1.1.0",
"humantime-serde",
"itertools 0.14.0",
"lazy_static",
"meta-client",

View File

@@ -562,6 +562,16 @@
| `node_id` | Integer | Unset | The flownode identifier and should be unique in the cluster. |
| `flow` | -- | -- | flow engine options. |
| `flow.num_workers` | Integer | `0` | The number of flow worker in flownode.<br/>Not setting(or set to 0) this value will use the number of CPU cores divided by 2. |
| `flow.batching_mode` | -- | -- | -- |
| `flow.batching_mode.query_timeout` | String | `600s` | The default batching engine query timeout is 10 minutes. |
| `flow.batching_mode.slow_query_threshold` | String | `60s` | will output a warn log for any query that runs for more that this threshold |
| `flow.batching_mode.experimental_min_refresh_duration` | String | `5s` | The minimum duration between two queries execution by batching mode task |
| `flow.batching_mode.grpc_conn_timeout` | String | `5s` | The gRPC connection timeout |
| `flow.batching_mode.experimental_grpc_max_retries` | Integer | `3` | The gRPC max retry number |
| `flow.batching_mode.experimental_frontend_scan_timeout` | String | `30s` | Flow wait for available frontend timeout,<br/>if failed to find available frontend after frontend_scan_timeout elapsed, return error<br/>which prevent flownode from starting |
| `flow.batching_mode.experimental_frontend_activity_timeout` | String | `60s` | Frontend activity timeout<br/>if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,<br/>it will be removed from the list that flownode use to connect |
| `flow.batching_mode.experimental_max_filter_num_per_query` | Integer | `20` | Maximum number of filters allowed in a single query |
| `flow.batching_mode.experimental_time_window_merge_threshold` | Integer | `3` | Time window merge distance |
| `grpc` | -- | -- | The gRPC server options. |
| `grpc.bind_addr` | String | `127.0.0.1:6800` | The address to bind the gRPC server. |
| `grpc.server_addr` | String | `127.0.0.1:6800` | The address advertised to the metasrv,<br/>and used for connections from outside the host |

View File

@@ -7,6 +7,29 @@ node_id = 14
## The number of flow worker in flownode.
## Not setting(or set to 0) this value will use the number of CPU cores divided by 2.
#+num_workers=0
[flow.batching_mode]
## The default batching engine query timeout is 10 minutes.
#+query_timeout="600s"
## will output a warn log for any query that runs for more that this threshold
#+slow_query_threshold="60s"
## The minimum duration between two queries execution by batching mode task
#+experimental_min_refresh_duration="5s"
## The gRPC connection timeout
#+grpc_conn_timeout="5s"
## The gRPC max retry number
#+experimental_grpc_max_retries=3
## Flow wait for available frontend timeout,
## if failed to find available frontend after frontend_scan_timeout elapsed, return error
## which prevent flownode from starting
#+experimental_frontend_scan_timeout="30s"
## Frontend activity timeout
## if frontend is down(not sending heartbeat) for more than frontend_activity_timeout,
## it will be removed from the list that flownode use to connect
#+experimental_frontend_activity_timeout="60s"
## Maximum number of filters allowed in a single query
#+experimental_max_filter_num_per_query=20
## Time window merge distance
#+experimental_time_window_merge_threshold=3
## The gRPC server options.
[grpc]

View File

@@ -374,6 +374,7 @@ impl StartCommand {
meta_client.clone(),
flow_auth_header,
opts.query.clone(),
opts.flow.batching_mode.clone(),
);
let frontend_client = Arc::new(frontend_client);
let flownode_builder = FlownodeBuilder::new(

View File

@@ -48,6 +48,7 @@ futures.workspace = true
get-size2 = "0.1.2"
greptime-proto.workspace = true
http.workspace = true
humantime-serde.workspace = true
itertools.workspace = true
lazy_static.workspace = true
meta-client.workspace = true

View File

@@ -50,6 +50,7 @@ use crate::adapter::refill::RefillTask;
use crate::adapter::table_source::ManagedTableSource;
use crate::adapter::util::relation_desc_to_column_schemas_with_fallback;
pub(crate) use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::batching_mode::BatchingModeOptions;
use crate::compute::ErrCollector;
use crate::df_optimizer::sql_to_flow_plan;
use crate::error::{EvalSnafu, ExternalSnafu, InternalSnafu, InvalidQuerySnafu, UnexpectedSnafu};
@@ -84,12 +85,14 @@ pub const AUTO_CREATED_UPDATE_AT_TS_COL: &str = "update_at";
#[serde(default)]
pub struct FlowConfig {
pub num_workers: usize,
pub batching_mode: BatchingModeOptions,
}
impl Default for FlowConfig {
fn default() -> Self {
Self {
num_workers: (common_config::utils::get_cpus() / 2).max(1),
batching_mode: BatchingModeOptions::default(),
}
}
}

View File

@@ -41,7 +41,6 @@ use tokio::sync::{Mutex, RwLock};
use crate::adapter::{CreateFlowArgs, StreamingEngine};
use crate::batching_mode::engine::BatchingEngine;
use crate::batching_mode::{FRONTEND_SCAN_TIMEOUT, MIN_REFRESH_DURATION};
use crate::engine::FlowEngine;
use crate::error::{
CreateFlowSnafu, ExternalSnafu, FlowNotFoundSnafu, FlowNotRecoveredSnafu,
@@ -440,13 +439,21 @@ struct ConsistentCheckTask {
impl ConsistentCheckTask {
async fn start_check_task(engine: &Arc<FlowDualEngine>) -> Result<Self, Error> {
let engine = engine.clone();
let min_refresh_duration = engine
.batching_engine()
.batch_opts
.experimental_min_refresh_duration;
let frontend_scan_timeout = engine
.batching_engine()
.batch_opts
.experimental_frontend_scan_timeout;
let (tx, mut rx) = tokio::sync::mpsc::channel(1);
let (trigger_tx, mut trigger_rx) =
tokio::sync::mpsc::channel::<(bool, bool, tokio::sync::oneshot::Sender<()>)>(10);
let handle = common_runtime::spawn_global(async move {
// first check if available frontend is found
if let Err(err) = engine
.wait_for_available_frontend(FRONTEND_SCAN_TIMEOUT)
.wait_for_available_frontend(frontend_scan_timeout)
.await
{
warn!("No frontend is available yet:\n {err:?}");
@@ -459,9 +466,9 @@ impl ConsistentCheckTask {
error!(
"Failed to recover flows:\n {err:?}, retry {} in {}s",
recover_retry,
MIN_REFRESH_DURATION.as_secs()
min_refresh_duration.as_secs()
);
tokio::time::sleep(MIN_REFRESH_DURATION).await;
tokio::time::sleep(min_refresh_duration).await;
}
engine.set_done_recovering();

View File

@@ -16,6 +16,8 @@
use std::time::Duration;
use serde::{Deserialize, Serialize};
pub(crate) mod engine;
pub(crate) mod frontend_client;
mod state;
@@ -23,27 +25,49 @@ mod task;
mod time_window;
mod utils;
/// TODO(discord9): make those constants configurable
/// The default batching engine query timeout is 10 minutes
pub const DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(10 * 60);
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct BatchingModeOptions {
/// The default batching engine query timeout is 10 minutes
#[serde(with = "humantime_serde")]
pub query_timeout: Duration,
/// will output a warn log for any query that runs for more that this threshold
#[serde(with = "humantime_serde")]
pub slow_query_threshold: Duration,
/// The minimum duration between two queries execution by batching mode task
#[serde(with = "humantime_serde")]
pub experimental_min_refresh_duration: Duration,
/// The gRPC connection timeout
#[serde(with = "humantime_serde")]
pub grpc_conn_timeout: Duration,
/// The gRPC max retry number
pub experimental_grpc_max_retries: u32,
/// Flow wait for available frontend timeout,
/// if failed to find available frontend after frontend_scan_timeout elapsed, return error
/// which prevent flownode from starting
#[serde(with = "humantime_serde")]
pub experimental_frontend_scan_timeout: Duration,
/// Frontend activity timeout
/// if frontend is down(not sending heartbeat) for more than frontend_activity_timeout, it will be removed from the list that flownode use to connect
#[serde(with = "humantime_serde")]
pub experimental_frontend_activity_timeout: Duration,
/// Maximum number of filters allowed in a single query
pub experimental_max_filter_num_per_query: usize,
/// Time window merge distance
pub experimental_time_window_merge_threshold: usize,
}
/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// The minimum duration between two queries execution by batching mode task
pub const MIN_REFRESH_DURATION: Duration = Duration::new(5, 0);
/// Grpc connection timeout
const GRPC_CONN_TIMEOUT: Duration = Duration::from_secs(5);
/// Grpc max retry number
const GRPC_MAX_RETRIES: u32 = 3;
/// Flow wait for available frontend timeout,
/// if failed to find available frontend after FRONTEND_SCAN_TIMEOUT elapsed, return error
/// which should prevent flownode from starting
pub const FRONTEND_SCAN_TIMEOUT: Duration = Duration::from_secs(30);
/// Frontend activity timeout
/// if frontend is down(not sending heartbeat) for more than FRONTEND_ACTIVITY_TIMEOUT, it will be removed from the list that flownode use to connect
pub const FRONTEND_ACTIVITY_TIMEOUT: Duration = Duration::from_secs(60);
impl Default for BatchingModeOptions {
fn default() -> Self {
Self {
query_timeout: Duration::from_secs(10 * 60),
slow_query_threshold: Duration::from_secs(60),
experimental_min_refresh_duration: Duration::new(5, 0),
grpc_conn_timeout: Duration::from_secs(5),
experimental_grpc_max_retries: 3,
experimental_frontend_scan_timeout: Duration::from_secs(30),
experimental_frontend_activity_timeout: Duration::from_secs(60),
experimental_max_filter_num_per_query: 20,
experimental_time_window_merge_threshold: 3,
}
}
}

View File

@@ -34,9 +34,10 @@ use store_api::storage::{RegionId, TableId};
use tokio::sync::{oneshot, RwLock};
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::task::{BatchingTask, TaskArgs};
use crate::batching_mode::time_window::{find_time_window_expr, TimeWindowExpr};
use crate::batching_mode::utils::sql_to_df_plan;
use crate::batching_mode::BatchingModeOptions;
use crate::engine::FlowEngine;
use crate::error::{
ExternalSnafu, FlowAlreadyExistSnafu, FlowNotFoundSnafu, TableNotFoundMetaSnafu,
@@ -57,6 +58,9 @@ pub struct BatchingEngine {
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
/// Batching mode options for control how batching mode query works
///
pub(crate) batch_opts: Arc<BatchingModeOptions>,
}
impl BatchingEngine {
@@ -66,6 +70,7 @@ impl BatchingEngine {
flow_metadata_manager: FlowMetadataManagerRef,
table_meta: TableMetadataManagerRef,
catalog_manager: CatalogManagerRef,
batch_opts: BatchingModeOptions,
) -> Self {
Self {
tasks: Default::default(),
@@ -75,6 +80,7 @@ impl BatchingEngine {
table_meta,
catalog_manager,
query_engine,
batch_opts: Arc::new(batch_opts),
}
}
@@ -424,18 +430,21 @@ impl BatchingEngine {
.unwrap_or("None".to_string())
);
let task = BatchingTask::try_new(
let task_args = TaskArgs {
flow_id,
&sql,
query: &sql,
plan,
phy_expr,
time_window_expr: phy_expr,
expire_after,
sink_table_name,
source_table_names,
query_ctx,
self.catalog_manager.clone(),
rx,
)?;
catalog_manager: self.catalog_manager.clone(),
shutdown_rx: rx,
batch_opts: self.batch_opts.clone(),
};
let task = BatchingTask::try_new(task_args)?;
let task_inner = task.clone();
let engine = self.query_engine.clone();

View File

@@ -38,10 +38,7 @@ use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, FRONTEND_ACTIVITY_TIMEOUT, GRPC_CONN_TIMEOUT,
GRPC_MAX_RETRIES,
};
use crate::batching_mode::BatchingModeOptions;
use crate::error::{ExternalSnafu, InvalidRequestSnafu, NoAvailableFrontendSnafu, UnexpectedSnafu};
use crate::{Error, FlowAuthHeader};
@@ -88,6 +85,7 @@ pub enum FrontendClient {
chnl_mgr: ChannelManager,
auth: Option<FlowAuthHeader>,
query: QueryOptions,
batch_opts: BatchingModeOptions,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -114,18 +112,20 @@ impl FrontendClient {
meta_client: Arc<MetaClient>,
auth: Option<FlowAuthHeader>,
query: QueryOptions,
batch_opts: BatchingModeOptions,
) -> Self {
common_telemetry::info!("Frontend client build with auth={:?}", auth);
Self::Distributed {
meta_client,
chnl_mgr: {
let cfg = ChannelConfig::new()
.connect_timeout(GRPC_CONN_TIMEOUT)
.timeout(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT);
.connect_timeout(batch_opts.grpc_conn_timeout)
.timeout(batch_opts.query_timeout);
ChannelManager::with_config(cfg)
},
auth,
query,
batch_opts,
}
}
@@ -209,6 +209,7 @@ impl FrontendClient {
chnl_mgr,
auth,
query: _,
batch_opts,
} = self
else {
return UnexpectedSnafu {
@@ -217,9 +218,9 @@ impl FrontendClient {
.fail();
};
let mut interval = tokio::time::interval(GRPC_CONN_TIMEOUT);
let mut interval = tokio::time::interval(batch_opts.grpc_conn_timeout);
interval.tick().await;
for retry in 0..GRPC_MAX_RETRIES {
for retry in 0..batch_opts.experimental_grpc_max_retries {
let mut frontends = self.scan_for_frontend().await?;
let now_in_ms = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
@@ -233,7 +234,10 @@ impl FrontendClient {
.iter()
// filter out frontend that have been down for more than 1 min
.filter(|(_, node_info)| {
node_info.last_activity_ts + FRONTEND_ACTIVITY_TIMEOUT.as_millis() as i64
node_info.last_activity_ts
+ batch_opts
.experimental_frontend_activity_timeout
.as_millis() as i64
> now_in_ms
})
{
@@ -263,7 +267,7 @@ impl FrontendClient {
}
NoAvailableFrontendSnafu {
timeout: GRPC_CONN_TIMEOUT,
timeout: batch_opts.grpc_conn_timeout,
context: "No available frontend found that is able to process query",
}
.fail()
@@ -346,7 +350,9 @@ impl FrontendClient {
peer_desc: &mut Option<PeerDesc>,
) -> Result<u32, Error> {
match self {
FrontendClient::Distributed { query, .. } => {
FrontendClient::Distributed {
query, batch_opts, ..
} => {
let db = self.get_random_active_frontend(catalog, schema).await?;
*peer_desc = Some(PeerDesc::Dist {
@@ -356,7 +362,7 @@ impl FrontendClient {
db.database
.handle_with_retry(
req.clone(),
GRPC_MAX_RETRIES,
batch_opts.experimental_grpc_max_retries,
&[(QUERY_PARALLELISM_HINT, &query.parallelism.to_string())],
)
.await

View File

@@ -28,7 +28,6 @@ use tokio::time::Instant;
use crate::batching_mode::task::BatchingTask;
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_SIZE,
@@ -89,10 +88,12 @@ impl TaskState {
&self,
flow_id: FlowId,
time_window_size: &Option<Duration>,
min_refresh_duration: Duration,
max_timeout: Option<Duration>,
max_filter_num_per_query: usize,
) -> Instant {
// = last query duration, capped by [max(min_run_interval, time_window_size), max_timeout], note at most `max_timeout`
let lower = time_window_size.unwrap_or(MIN_REFRESH_DURATION);
let lower = time_window_size.unwrap_or(min_refresh_duration);
let next_duration = self.last_query_duration.max(lower);
let next_duration = if let Some(max_timeout) = max_timeout {
next_duration.min(max_timeout)
@@ -104,7 +105,7 @@ impl TaskState {
// compute how much time range can be handled in one query
let max_query_update_range = (*time_window_size)
.unwrap_or_default()
.mul_f64(DirtyTimeWindows::MAX_FILTER_NUM as f64);
.mul_f64(max_filter_num_per_query as f64);
// if dirty time range is more than one query can handle, execute immediately
// to faster clean up dirty time windows
if cur_dirty_window_size < max_query_update_range {
@@ -125,11 +126,36 @@ impl TaskState {
/// For keep recording of dirty time windows, which is time window that have new data inserted
/// since last query.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct DirtyTimeWindows {
/// windows's `start -> end` and non-overlapping
/// `end` is exclusive(and optional)
windows: BTreeMap<Timestamp, Option<Timestamp>>,
/// Maximum number of filters allowed in a single query
max_filter_num_per_query: usize,
/// Time window merge distance
///
time_window_merge_threshold: usize,
}
impl DirtyTimeWindows {
pub fn new(max_filter_num_per_query: usize, time_window_merge_threshold: usize) -> Self {
Self {
windows: BTreeMap::new(),
max_filter_num_per_query,
time_window_merge_threshold,
}
}
}
impl Default for DirtyTimeWindows {
fn default() -> Self {
Self {
windows: BTreeMap::new(),
max_filter_num_per_query: 20,
time_window_merge_threshold: 3,
}
}
}
impl DirtyTimeWindows {
@@ -138,9 +164,6 @@ impl DirtyTimeWindows {
/// TODO(discord9): make those configurable
pub const MERGE_DIST: i32 = 3;
/// Maximum number of filters allowed in a single query
pub const MAX_FILTER_NUM: usize = 20;
/// Add lower bounds to the dirty time windows. Upper bounds are ignored.
///
/// # Arguments
@@ -234,7 +257,7 @@ impl DirtyTimeWindows {
);
self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
if self.windows.len() > Self::MAX_FILTER_NUM {
if self.windows.len() > self.max_filter_num_per_query {
let first_time_window = self.windows.first_key_value();
let last_time_window = self.windows.last_key_value();
@@ -243,7 +266,7 @@ impl DirtyTimeWindows {
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
task_ctx.config.flow_id,
self.windows.len(),
Self::MAX_FILTER_NUM,
self.max_filter_num_per_query,
task_ctx.config.time_window_expr,
task_ctx.config.expire_after,
first_time_window,
@@ -254,7 +277,7 @@ impl DirtyTimeWindows {
warn!("Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. first_time_window={:?}, last_time_window={:?}",
flow_id,
self.windows.len(),
Self::MAX_FILTER_NUM,
self.max_filter_num_per_query,
first_time_window,
last_time_window
)
@@ -460,7 +483,7 @@ impl DirtyTimeWindows {
if lower_bound
.sub(&prev_upper)
.map(|dist| dist <= window_size * Self::MERGE_DIST)
.map(|dist| dist <= window_size * self.time_window_merge_threshold as i32)
.unwrap_or(false)
{
prev_tw.1 = Some(cur_upper);
@@ -508,18 +531,19 @@ mod test {
#[test]
fn test_merge_dirty_time_windows() {
let merge_dist = DirtyTimeWindows::default().time_window_merge_threshold;
let testcases = vec![
// just enough to merge
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
Timestamp::new_second((1 + merge_dist as i64) * 5 * 60),
],
(chrono::Duration::seconds(5 * 60), None),
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
(2 + merge_dist as i64) * 5 * 60,
)),
)]),
Some(
@@ -530,7 +554,7 @@ mod test {
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
],
(chrono::Duration::seconds(5 * 60), None),
BTreeMap::from([
@@ -539,9 +563,9 @@ mod test {
Some(Timestamp::new_second(5 * 60)),
),
(
Timestamp::new_second((2 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
Timestamp::new_second((2 + merge_dist as i64) * 5 * 60),
Some(Timestamp::new_second(
(3 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
(3 + merge_dist as i64) * 5 * 60,
)),
),
]),
@@ -553,13 +577,13 @@ mod test {
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
Timestamp::new_second((merge_dist as i64) * 5 * 60),
],
(chrono::Duration::seconds(5 * 60), None),
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(1 + DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60,
(1 + merge_dist as i64) * 5 * 60,
)),
)]),
Some(
@@ -570,14 +594,14 @@ mod test {
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 3 * 2),
Timestamp::new_second((merge_dist as i64) * 3),
Timestamp::new_second((merge_dist as i64) * 3 * 2),
],
(chrono::Duration::seconds(3), None),
BTreeMap::from([(
Timestamp::new_second(0),
Some(Timestamp::new_second(
(DirtyTimeWindows::MERGE_DIST as i64) * 7
(merge_dist as i64) * 7
)),
)]),
Some(
@@ -646,12 +670,12 @@ mod test {
(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
Timestamp::new_second((merge_dist as i64) * 5 * 60),
],
(
chrono::Duration::seconds(5 * 60),
Some(Timestamp::new_second(
(DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
(merge_dist as i64) * 6 * 60,
)),
),
BTreeMap::from([]),
@@ -674,7 +698,7 @@ mod test {
"ts",
expire_lower_bound,
window_size,
DirtyTimeWindows::MAX_FILTER_NUM,
dirty.max_filter_num_per_query,
0,
None,
)

View File

@@ -46,15 +46,13 @@ use tokio::time::Instant;
use crate::adapter::{AUTO_CREATED_PLACEHOLDER_TS_COL, AUTO_CREATED_UPDATE_AT_TS_COL};
use crate::batching_mode::frontend_client::FrontendClient;
use crate::batching_mode::state::{DirtyTimeWindows, TaskState};
use crate::batching_mode::state::TaskState;
use crate::batching_mode::time_window::TimeWindowExpr;
use crate::batching_mode::utils::{
get_table_info_df_schema, sql_to_df_plan, AddAutoColumnRewriter, AddFilterRewriter,
FindGroupByFinalName,
};
use crate::batching_mode::{
DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT, MIN_REFRESH_DURATION, SLOW_QUERY_THRESHOLD,
};
use crate::batching_mode::BatchingModeOptions;
use crate::df_optimizer::apply_df_optimizer;
use crate::error::{
ConvertColumnSchemaSnafu, DatafusionSnafu, ExternalSnafu, InvalidQuerySnafu,
@@ -81,6 +79,7 @@ pub struct TaskConfig {
pub source_table_names: HashSet<[String; 3]>,
catalog_manager: CatalogManagerRef,
query_type: QueryType,
batch_opts: Arc<BatchingModeOptions>,
}
fn determine_query_type(query: &str, query_ctx: &QueryContextRef) -> Result<QueryType, Error> {
@@ -116,19 +115,37 @@ pub struct BatchingTask {
pub state: Arc<RwLock<TaskState>>,
}
/// Arguments for creating batching task
pub struct TaskArgs<'a> {
pub flow_id: FlowId,
pub query: &'a str,
pub plan: LogicalPlan,
pub time_window_expr: Option<TimeWindowExpr>,
pub expire_after: Option<i64>,
pub sink_table_name: [String; 3],
pub source_table_names: Vec<[String; 3]>,
pub query_ctx: QueryContextRef,
pub catalog_manager: CatalogManagerRef,
pub shutdown_rx: oneshot::Receiver<()>,
pub batch_opts: Arc<BatchingModeOptions>,
}
impl BatchingTask {
#[allow(clippy::too_many_arguments)]
pub fn try_new(
flow_id: FlowId,
query: &str,
plan: LogicalPlan,
time_window_expr: Option<TimeWindowExpr>,
expire_after: Option<i64>,
sink_table_name: [String; 3],
source_table_names: Vec<[String; 3]>,
query_ctx: QueryContextRef,
catalog_manager: CatalogManagerRef,
shutdown_rx: oneshot::Receiver<()>,
TaskArgs {
flow_id,
query,
plan,
time_window_expr,
expire_after,
sink_table_name,
source_table_names,
query_ctx,
catalog_manager,
shutdown_rx,
batch_opts,
}: TaskArgs<'_>,
) -> Result<Self, Error> {
Ok(Self {
config: Arc::new(TaskConfig {
@@ -141,6 +158,7 @@ impl BatchingTask {
catalog_manager,
output_schema: plan.schema().clone(),
query_type: determine_query_type(query, &query_ctx)?,
batch_opts,
}),
state: Arc::new(RwLock::new(TaskState::new(query_ctx, shutdown_rx))),
})
@@ -386,7 +404,7 @@ impl BatchingTask {
}
// record slow query
if elapsed >= SLOW_QUERY_THRESHOLD {
if elapsed >= self.config.batch_opts.slow_query_threshold {
warn!(
"Flow {flow_id} on frontend {:?} executed for {:?} before complete, query: {}",
peer_desc, elapsed, &plan
@@ -439,12 +457,14 @@ impl BatchingTask {
.with_label_values(&[&flow_id_str])
.inc();
let min_refresh = self.config.batch_opts.experimental_min_refresh_duration;
let new_query = match self.gen_insert_plan(&engine, None).await {
Ok(new_query) => new_query,
Err(err) => {
common_telemetry::error!(err; "Failed to generate query for flow={}", self.config.flow_id);
// also sleep for a little while before try again to prevent flooding logs
tokio::time::sleep(MIN_REFRESH_DURATION).await;
tokio::time::sleep(min_refresh).await;
continue;
}
};
@@ -461,14 +481,18 @@ impl BatchingTask {
let sleep_until = {
let state = self.state.write().unwrap();
let time_window_size = self
.config
.time_window_expr
.as_ref()
.and_then(|t| *t.time_window_size());
state.get_next_start_query_time(
self.config.flow_id,
&self
.config
.time_window_expr
.as_ref()
.and_then(|t| *t.time_window_size()),
Some(DEFAULT_BATCHING_ENGINE_QUERY_TIMEOUT),
&time_window_size,
min_refresh,
Some(self.config.batch_opts.query_timeout),
self.config.batch_opts.experimental_max_filter_num_per_query,
)
};
tokio::time::sleep_until(sleep_until).await;
@@ -477,9 +501,9 @@ impl BatchingTask {
Ok(None) => {
debug!(
"Flow id = {:?} found no new data, sleep for {:?} then continue",
self.config.flow_id, MIN_REFRESH_DURATION
self.config.flow_id, min_refresh
);
tokio::time::sleep(MIN_REFRESH_DURATION).await;
tokio::time::sleep(min_refresh).await;
continue;
}
// TODO(discord9): this error should have better place to go, but for now just print error, also more context is needed
@@ -496,7 +520,7 @@ impl BatchingTask {
}
}
// also sleep for a little while before try again to prevent flooding logs
tokio::time::sleep(MIN_REFRESH_DURATION).await;
tokio::time::sleep(min_refresh).await;
}
}
}
@@ -604,7 +628,8 @@ impl BatchingTask {
&col_name,
Some(l),
window_size,
max_window_cnt.unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM),
max_window_cnt
.unwrap_or(self.config.batch_opts.experimental_max_filter_num_per_query),
self.config.flow_id,
Some(self),
)?;

View File

@@ -360,6 +360,7 @@ impl FlownodeBuilder {
self.flow_metadata_manager.clone(),
self.table_meta.clone(),
self.catalog_manager.clone(),
self.opts.flow.batching_mode.clone(),
));
let dual = FlowDualEngine::new(
manager.clone(),

View File

@@ -1140,6 +1140,17 @@ max_running_procedures = 128
[flow]
[flow.batching_mode]
query_timeout = "10m"
slow_query_threshold = "1m"
experimental_min_refresh_duration = "5s"
grpc_conn_timeout = "5s"
experimental_grpc_max_retries = 3
experimental_frontend_scan_timeout = "30s"
experimental_frontend_activity_timeout = "1m"
experimental_max_filter_num_per_query = 20
experimental_time_window_merge_threshold = 3
[logging]
max_log_files = 720
append_stdout = true