diff --git a/Cargo.lock b/Cargo.lock index 9293aa58fe..cd574dae43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5133,7 +5133,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=860dccde96c0f0a8e4007b7060d2fce4b62b23dc#860dccde96c0f0a8e4007b7060d2fce4b62b23dc" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=52083925a15d741c259800a9a54eba3467939180#52083925a15d741c259800a9a54eba3467939180" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 93f1ccb60b..c077ff0625 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -133,7 +133,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "860dccde96c0f0a8e4007b7060d2fce4b62b23dc" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "52083925a15d741c259800a9a54eba3467939180" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/function/src/adjust_flow.rs b/src/common/function/src/adjust_flow.rs new file mode 100644 index 0000000000..00804177b3 --- /dev/null +++ b/src/common/function/src/adjust_flow.rs @@ -0,0 +1,90 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_macro::admin_fn; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::Signature; +use datafusion::logical_expr::Volatility; +use datatypes::value::{Value, ValueRef}; +use session::context::QueryContextRef; +use snafu::ensure; +use store_api::storage::ConcreteDataType; + +use crate::handlers::FlowServiceHandlerRef; +use crate::helper::parse_catalog_flow; + +fn adjust_signature() -> Signature { + Signature::exact( + vec![ + ConcreteDataType::string_datatype(), // flow name + ConcreteDataType::uint64_datatype(), // min_run_interval in seconds + ConcreteDataType::uint64_datatype(), // max filter number per query + ], + Volatility::Immutable, + ) +} + +#[admin_fn( + name = AdjustFlowFunction, + display_name = adjust_flow, + sig_fn = adjust_signature, + ret = uint64 +)] +pub(crate) async fn adjust_flow( + flow_service_handler: &FlowServiceHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + ensure!( + params.len() == 3, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 3, have: {}", + params.len() + ), + } + ); + + let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) { + ( + ValueRef::String(flow_name), + ValueRef::UInt64(min_run_interval), + ValueRef::UInt64(max_filter_num), + ) => (flow_name, min_run_interval, max_filter_num), + _ => { + return UnsupportedInputDataTypeSnafu { + function: "adjust_flow", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + } + }; + + let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?; + + let res = flow_service_handler + .adjust( + &catalog_name, + &flow_name, + min_run_interval, + max_filter_num as usize, + query_ctx.clone(), + ) + .await?; + let affected_rows = res.affected_rows; + + Ok(Value::from(affected_rows)) +} diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index c06b28e7d5..a91f90d417 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction}; use migrate_region::MigrateRegionFunction; use remove_region_follower::RemoveRegionFollowerFunction; +use crate::adjust_flow::AdjustFlowFunction; use crate::flush_flow::FlushFlowFunction; use crate::function_registry::FunctionRegistry; @@ -43,5 +44,6 @@ impl AdminFunction { registry.register_async(Arc::new(FlushTableFunction)); registry.register_async(Arc::new(CompactTableFunction)); registry.register_async(Arc::new(FlushFlowFunction)); + registry.register_async(Arc::new(AdjustFlowFunction)); } } diff --git a/src/common/function/src/flush_flow.rs b/src/common/function/src/flush_flow.rs index 63fd49ac93..e00a904b67 100644 --- a/src/common/function/src/flush_flow.rs +++ b/src/common/function/src/flush_flow.rs @@ -12,21 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_error::ext::BoxedError; use common_macro::admin_fn; use common_query::error::{ - ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, - UnsupportedInputDataTypeSnafu, + InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu, }; use common_query::prelude::Signature; use datafusion::logical_expr::Volatility; use datatypes::value::{Value, ValueRef}; use session::context::QueryContextRef; -use snafu::{ensure, ResultExt}; -use sql::parser::ParserContext; +use snafu::ensure; use store_api::storage::ConcreteDataType; use crate::handlers::FlowServiceHandlerRef; +use crate::helper::parse_catalog_flow; fn flush_signature() -> Signature { Signature::uniform( @@ -47,20 +45,6 @@ pub(crate) async fn flush_flow( query_ctx: &QueryContextRef, params: &[ValueRef<'_>], ) -> Result { - let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?; - - let res = flow_service_handler - .flush(&catalog_name, &flow_name, query_ctx.clone()) - .await?; - let affected_rows = res.affected_rows; - - Ok(Value::from(affected_rows)) -} - -fn parse_flush_flow( - params: &[ValueRef<'_>], - query_ctx: &QueryContextRef, -) -> Result<(String, String)> { ensure!( params.len() == 1, InvalidFuncArgsSnafu { @@ -70,7 +54,6 @@ fn parse_flush_flow( ), } ); - let ValueRef::String(flow_name) = params[0] else { return UnsupportedInputDataTypeSnafu { function: "flush_flow", @@ -78,27 +61,14 @@ fn parse_flush_flow( } .fail(); }; - let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect()) - .map_err(BoxedError::new) - .context(ExecuteSnafu)?; + let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?; - let (catalog_name, flow_name) = match &obj_name.0[..] { - [flow_name] => ( - query_ctx.current_catalog().to_string(), - flow_name.value.clone(), - ), - [catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()), - _ => { - return InvalidFuncArgsSnafu { - err_msg: format!( - "expect flow name to be . or , actual: {}", - obj_name - ), - } - .fail() - } - }; - Ok((catalog_name, flow_name)) + let res = flow_service_handler + .flush(&catalog_name, &flow_name, query_ctx.clone()) + .await?; + let affected_rows = res.affected_rows; + + Ok(Value::from(affected_rows)) } #[cfg(test)] @@ -154,10 +124,7 @@ mod test { ("catalog.flow_name", ("catalog", "flow_name")), ]; for (input, expected) in testcases.iter() { - let args = vec![*input]; - let args = args.into_iter().map(ValueRef::String).collect::>(); - - let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap(); + let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap(); assert_eq!(*expected, (result.0.as_str(), result.1.as_str())); } } diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index bcb6ce5460..ebf1f67c12 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync { flow: &str, ctx: QueryContextRef, ) -> Result; + + async fn adjust( + &self, + catalog: &str, + flow: &str, + min_run_interval_secs: u64, + max_filter_num_per_query: usize, + ctx: QueryContextRef, + ) -> Result; } pub type TableMutationHandlerRef = Arc; diff --git a/src/common/function/src/helper.rs b/src/common/function/src/helper.rs index e4a1cd1af8..30b3140d46 100644 --- a/src/common/function/src/helper.rs +++ b/src/common/function/src/helper.rs @@ -12,12 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_query::error::{InvalidInputTypeSnafu, Result}; +use common_error::ext::BoxedError; +use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result}; use common_query::prelude::{Signature, TypeSignature, Volatility}; use datatypes::prelude::ConcreteDataType; use datatypes::types::cast::cast; use datatypes::value::ValueRef; +use session::context::QueryContextRef; use snafu::ResultExt; +use sql::parser::ParserContext; /// Create a function signature with oneof signatures of interleaving two arguments. pub fn one_of_sigs2(args1: Vec, args2: Vec) -> Signature { @@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result> { }) .map(|v| v.as_u64()) } + +pub fn parse_catalog_flow( + flow_name: &str, + query_ctx: &QueryContextRef, +) -> Result<(String, String)> { + let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect()) + .map_err(BoxedError::new) + .context(ExecuteSnafu)?; + + let (catalog_name, flow_name) = match &obj_name.0[..] { + [flow_name] => ( + query_ctx.current_catalog().to_string(), + flow_name.value.clone(), + ), + [catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()), + _ => { + return InvalidFuncArgsSnafu { + err_msg: format!( + "expect flow name to be . or , actual: {}", + obj_name + ), + } + .fail() + } + }; + Ok((catalog_name, flow_name)) +} diff --git a/src/common/function/src/lib.rs b/src/common/function/src/lib.rs index 95b8b6a3b1..90b3876f90 100644 --- a/src/common/function/src/lib.rs +++ b/src/common/function/src/lib.rs @@ -15,6 +15,7 @@ #![feature(let_chains)] #![feature(try_blocks)] +mod adjust_flow; mod admin; mod flush_flow; mod macros; diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index 211f7e1438..448ac3c75d 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -148,6 +148,17 @@ impl FunctionState { ) -> Result { todo!() } + + async fn adjust( + &self, + _catalog: &str, + _flow: &str, + _min_run_interval_secs: u64, + _max_filter_num_per_query: usize, + _ctx: QueryContextRef, + ) -> Result { + todo!() + } } Self { diff --git a/src/flow/Cargo.toml b/src/flow/Cargo.toml index 82377868fb..e20891269c 100644 --- a/src/flow/Cargo.toml +++ b/src/flow/Cargo.toml @@ -61,6 +61,7 @@ prost.workspace = true query.workspace = true rand.workspace = true serde.workspace = true +serde_json.workspace = true servers.workspace = true session.workspace = true smallvec.workspace = true diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index eba72c26e9..bacc342d15 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool; use std::sync::Arc; use api::v1::flow::{ - flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow, + flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow, }; use api::v1::region::InsertRequests; use catalog::CatalogManager; @@ -33,6 +33,7 @@ use datatypes::value::Value; use futures::TryStreamExt; use greptime_proto::v1::flow::DirtyWindowRequest; use itertools::Itertools; +use serde::{Deserialize, Serialize}; use session::context::QueryContextBuilder; use snafu::{ensure, IntoError, OptionExt, ResultExt}; use store_api::storage::{RegionId, TableId}; @@ -823,6 +824,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine { ..Default::default() }) } + Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => { + #[derive(Debug, Serialize, Deserialize)] + struct Options { + min_run_interval_secs: u64, + max_filter_num_per_query: usize, + } + let options: Options = serde_json::from_str(&options).with_context(|_| { + common_meta::error::DeserializeFromJsonSnafu { input: options } + })?; + self.batching_engine + .adjust_flow( + flow_id.unwrap().id as u64, + options.min_run_interval_secs, + options.max_filter_num_per_query, + ) + .await + .map_err(to_meta_err(snafu::location!()))?; + Ok(Default::default()) + } other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(), } } diff --git a/src/flow/src/batching_mode/engine.rs b/src/flow/src/batching_mode/engine.rs index 61d8633886..02ad7fde8c 100644 --- a/src/flow/src/batching_mode/engine.rs +++ b/src/flow/src/batching_mode/engine.rs @@ -505,6 +505,20 @@ impl BatchingEngine { pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool { self.tasks.read().await.contains_key(&flow_id) } + + pub async fn adjust_flow( + &self, + flow_id: FlowId, + min_run_interval_secs: u64, + max_filter_num_per_query: usize, + ) -> Result<(), Error> { + let task = self.tasks.read().await.get(&flow_id).cloned(); + let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?; + debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query); + task.adjust(min_run_interval_secs, max_filter_num_per_query); + + Ok(()) + } } impl FlowEngine for BatchingEngine { diff --git a/src/flow/src/batching_mode/state.rs b/src/flow/src/batching_mode/state.rs index fd7520835c..261a0995e1 100644 --- a/src/flow/src/batching_mode/state.rs +++ b/src/flow/src/batching_mode/state.rs @@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::metrics::{ METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, + METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT, }; use crate::{Error, FlowId}; @@ -52,6 +53,11 @@ pub struct TaskState { pub(crate) shutdown_rx: oneshot::Receiver<()>, /// Task handle pub(crate) task_handle: Option>, + + /// min run interval in seconds + pub(crate) min_run_interval: Option, + /// max filter number per query + pub(crate) max_filter_num: Option, } impl TaskState { pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self { @@ -63,6 +69,8 @@ impl TaskState { exec_state: ExecState::Idle, shutdown_rx, task_handle: None, + min_run_interval: None, + max_filter_num: None, } } @@ -86,24 +94,21 @@ impl TaskState { /// TODO: Make this behavior configurable. pub fn get_next_start_query_time( &self, - flow_id: FlowId, - time_window_size: &Option, + _flow_id: FlowId, + _time_window_size: &Option, max_timeout: Option, ) -> Instant { - let last_duration = max_timeout + let next_duration = max_timeout .unwrap_or(self.last_query_duration) .min(self.last_query_duration) - .max(MIN_REFRESH_DURATION); - - let next_duration = time_window_size - .map(|t| { - let half = t / 2; - half.max(last_duration) - }) - .unwrap_or(last_duration); + .max( + self.min_run_interval + .map(Duration::from_secs) + .unwrap_or(MIN_REFRESH_DURATION), + ); // if have dirty time window, execute immediately to clean dirty time window - if self.dirty_time_windows.windows.is_empty() { + /*if self.dirty_time_windows.windows.is_empty() { self.last_update_time + next_duration } else { debug!( @@ -113,7 +118,10 @@ impl TaskState { self.dirty_time_windows.windows ); Instant::now() - } + }*/ + + // wait for next duration anyway + self.last_update_time + next_duration } } @@ -258,6 +266,10 @@ impl DirtyTimeWindows { .with_label_values(&[flow_id.to_string().as_str()]) .observe(to_be_query.len() as f64); + METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT + .with_label_values(&[flow_id.to_string().as_str()]) + .observe(self.windows.len() as f64); + let full_time_range = to_be_query .iter() .fold(chrono::Duration::zero(), |acc, (start, end)| { diff --git a/src/flow/src/batching_mode/task.rs b/src/flow/src/batching_mode/task.rs index 61e99f644e..942c7e0669 100644 --- a/src/flow/src/batching_mode/task.rs +++ b/src/flow/src/batching_mode/task.rs @@ -146,6 +146,12 @@ impl BatchingTask { }) } + pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) { + let mut state = self.state.write().unwrap(); + state.min_run_interval = Some(min_run_interval_secs); + state.max_filter_num = Some(max_filter_num_per_query); + } + /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set) /// /// useful for flush_flow to flush dirty time windows range @@ -592,19 +598,20 @@ impl BatchingTask { ), })?; - let expr = self - .state - .write() - .unwrap() - .dirty_time_windows - .gen_filter_exprs( + let expr = { + let mut state = self.state.write().unwrap(); + let max_window_cnt = state + .max_filter_num + .unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM); + state.dirty_time_windows.gen_filter_exprs( &col_name, Some(l), window_size, - DirtyTimeWindows::MAX_FILTER_NUM, + max_window_cnt, self.config.flow_id, Some(self), - )?; + )? + }; debug!( "Flow id={:?}, Generated filter expr: {:?}", diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index 2d629bab6a..6a3b45d588 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -42,6 +42,14 @@ lazy_static! { vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.] ) .unwrap(); + pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec = + register_histogram_vec!( + "greptime_flow_batching_engine_stalled_query_window_cnt", + "flow batching engine stalled query time window count", + &["flow_id"], + vec![0.0, 5., 10., 20., 40.] + ) + .unwrap(); pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec = register_histogram_vec!( "greptime_flow_batching_engine_query_window_cnt", diff --git a/src/operator/src/flow.rs b/src/operator/src/flow.rs index d21f7b2d9c..c687461585 100644 --- a/src/operator/src/flow.rs +++ b/src/operator/src/flow.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::flow::FlowRequestHeader; +use api::v1::flow::{AdjustFlow, FlowRequestHeader}; use async_trait::async_trait; use common_error::ext::BoxedError; use common_function::handlers::FlowServiceHandler; @@ -22,6 +22,7 @@ use common_query::error::Result; use common_telemetry::tracing_context::TracingContext; use futures::stream::FuturesUnordered; use futures::StreamExt; +use serde_json::json; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; @@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator { ) -> Result { self.flush_inner(catalog, flow, ctx).await } + + async fn adjust( + &self, + catalog: &str, + flow: &str, + min_run_interval_secs: u64, + max_filter_num_per_query: usize, + ctx: QueryContextRef, + ) -> Result { + self.adjust_inner( + catalog, + flow, + min_run_interval_secs, + max_filter_num_per_query, + ctx, + ) + .await + } } impl FlowServiceOperator { + async fn adjust_inner( + &self, + catalog: &str, + flow: &str, + min_run_interval_secs: u64, + max_filter_num_per_query: usize, + ctx: QueryContextRef, + ) -> Result { + let id = self + .flow_metadata_manager + .flow_name_manager() + .get(catalog, flow) + .await + .map_err(BoxedError::new) + .context(common_query::error::ExecuteSnafu)? + .context(common_meta::error::FlowNotFoundSnafu { + flow_name: format!("{}.{}", catalog, flow), + }) + .map_err(BoxedError::new) + .context(common_query::error::ExecuteSnafu)? + .flow_id(); + + let all_flownode_peers = self + .flow_metadata_manager + .flow_route_manager() + .routes(id) + .await + .map_err(BoxedError::new) + .context(common_query::error::ExecuteSnafu)?; + + // order of flownodes doesn't matter here + let all_flow_nodes = FuturesUnordered::from_iter( + all_flownode_peers + .iter() + .map(|(_key, peer)| self.node_manager.flownode(peer.peer())), + ) + .collect::>() + .await; + + // TODO(discord9): use proper type for flow options + let options = json!({ + "min_run_interval_secs": min_run_interval_secs, + "max_filter_num_per_query": max_filter_num_per_query, + }); + + for node in all_flow_nodes { + let _res = { + use api::v1::flow::{flow_request, FlowRequest}; + let flush_req = FlowRequest { + header: Some(FlowRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + query_context: Some( + common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(), + ), + }), + body: Some(flow_request::Body::Adjust(AdjustFlow { + flow_id: Some(api::v1::FlowId { id }), + options: options.to_string(), + })), + }; + node.handle(flush_req) + .await + .map_err(BoxedError::new) + .context(common_query::error::ExecuteSnafu)? + }; + } + Ok(Default::default()) + } + /// Flush the flownodes according to the flow id. async fn flush_inner( &self,