Compare commits

...

1 Commits

Author SHA1 Message Date
discord9
8ca3d72e3f feat(exp): adjust_flow admin function 2025-06-05 13:52:08 +08:00
16 changed files with 324 additions and 154 deletions

2
Cargo.lock generated
View File

@@ -4876,7 +4876,7 @@ dependencies = [
[[package]] [[package]]
name = "greptime-proto" name = "greptime-proto"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=454c52634c3bac27de10bf0d85d5533eed1cf03f#454c52634c3bac27de10bf0d85d5533eed1cf03f" source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2dca1dc67862d7b410838aef81232274c019b3f6#2dca1dc67862d7b410838aef81232274c019b3f6"
dependencies = [ dependencies = [
"prost 0.13.5", "prost 0.13.5",
"serde", "serde",

View File

@@ -132,7 +132,7 @@ etcd-client = "0.14"
fst = "0.4.7" fst = "0.4.7"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "454c52634c3bac27de10bf0d85d5533eed1cf03f" } greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2dca1dc67862d7b410838aef81232274c019b3f6" }
hex = "0.4" hex = "0.4"
http = "1" http = "1"
humantime = "2.1" humantime = "2.1"

View File

@@ -0,0 +1,90 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn adjust_signature() -> Signature {
Signature::exact(
vec![
ConcreteDataType::string_datatype(), // flow name
ConcreteDataType::uint64_datatype(), // min_run_interval in seconds
ConcreteDataType::uint64_datatype(), // max filter number per query
],
Volatility::Immutable,
)
}
#[admin_fn(
name = AdjustFlowFunction,
display_name = adjust_flow,
sig_fn = adjust_signature,
ret = uint64
)]
pub(crate) async fn adjust_flow(
flow_service_handler: &FlowServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 3,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 3, have: {}",
params.len()
),
}
);
let (flow_name, min_run_interval, max_filter_num) = match (params[0], params[1], params[2]) {
(
ValueRef::String(flow_name),
ValueRef::UInt64(min_run_interval),
ValueRef::UInt64(max_filter_num),
) => (flow_name, min_run_interval, max_filter_num),
_ => {
return UnsupportedInputDataTypeSnafu {
function: "adjust_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
}
};
let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
let res = flow_service_handler
.adjust(
&catalog_name,
&flow_name,
min_run_interval,
max_filter_num as usize,
query_ctx.clone(),
)
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}

View File

@@ -26,6 +26,7 @@ use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction; use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction; use remove_region_follower::RemoveRegionFollowerFunction;
use crate::adjust_flow::AdjustFlowFunction;
use crate::flush_flow::FlushFlowFunction; use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry; use crate::function_registry::FunctionRegistry;
@@ -43,5 +44,6 @@ impl AdminFunction {
registry.register_async(Arc::new(FlushTableFunction)); registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction)); registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction)); registry.register_async(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(AdjustFlowFunction));
} }
} }

View File

@@ -12,21 +12,19 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use common_error::ext::BoxedError;
use common_macro::admin_fn; use common_macro::admin_fn;
use common_query::error::{ use common_query::error::{
ExecuteSnafu, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, InvalidFuncArgsSnafu, MissingFlowServiceHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
UnsupportedInputDataTypeSnafu,
}; };
use common_query::prelude::Signature; use common_query::prelude::Signature;
use datafusion::logical_expr::Volatility; use datafusion::logical_expr::Volatility;
use datatypes::value::{Value, ValueRef}; use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{ensure, ResultExt}; use snafu::ensure;
use sql::parser::ParserContext;
use store_api::storage::ConcreteDataType; use store_api::storage::ConcreteDataType;
use crate::handlers::FlowServiceHandlerRef; use crate::handlers::FlowServiceHandlerRef;
use crate::helper::parse_catalog_flow;
fn flush_signature() -> Signature { fn flush_signature() -> Signature {
Signature::uniform( Signature::uniform(
@@ -47,20 +45,6 @@ pub(crate) async fn flush_flow(
query_ctx: &QueryContextRef, query_ctx: &QueryContextRef,
params: &[ValueRef<'_>], params: &[ValueRef<'_>],
) -> Result<Value> { ) -> Result<Value> {
let (catalog_name, flow_name) = parse_flush_flow(params, query_ctx)?;
let res = flow_service_handler
.flush(&catalog_name, &flow_name, query_ctx.clone())
.await?;
let affected_rows = res.affected_rows;
Ok(Value::from(affected_rows))
}
fn parse_flush_flow(
params: &[ValueRef<'_>],
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
ensure!( ensure!(
params.len() == 1, params.len() == 1,
InvalidFuncArgsSnafu { InvalidFuncArgsSnafu {
@@ -70,7 +54,6 @@ fn parse_flush_flow(
), ),
} }
); );
let ValueRef::String(flow_name) = params[0] else { let ValueRef::String(flow_name) = params[0] else {
return UnsupportedInputDataTypeSnafu { return UnsupportedInputDataTypeSnafu {
function: "flush_flow", function: "flush_flow",
@@ -78,27 +61,14 @@ fn parse_flush_flow(
} }
.fail(); .fail();
}; };
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect()) let (catalog_name, flow_name) = parse_catalog_flow(flow_name, query_ctx)?;
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = match &obj_name.0[..] { let res = flow_service_handler
[flow_name] => ( .flush(&catalog_name, &flow_name, query_ctx.clone())
query_ctx.current_catalog().to_string(), .await?;
flow_name.value.clone(), let affected_rows = res.affected_rows;
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()), Ok(Value::from(affected_rows))
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
} }
#[cfg(test)] #[cfg(test)]
@@ -154,10 +124,7 @@ mod test {
("catalog.flow_name", ("catalog", "flow_name")), ("catalog.flow_name", ("catalog", "flow_name")),
]; ];
for (input, expected) in testcases.iter() { for (input, expected) in testcases.iter() {
let args = vec![*input]; let result = parse_catalog_flow(input, &QueryContext::arc()).unwrap();
let args = args.into_iter().map(ValueRef::String).collect::<Vec<_>>();
let result = parse_flush_flow(&args, &QueryContext::arc()).unwrap();
assert_eq!(*expected, (result.0.as_str(), result.1.as_str())); assert_eq!(*expected, (result.0.as_str(), result.1.as_str()));
} }
} }

View File

@@ -87,6 +87,15 @@ pub trait FlowServiceHandler: Send + Sync {
flow: &str, flow: &str,
ctx: QueryContextRef, ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>; ) -> Result<api::v1::flow::FlowResponse>;
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse>;
} }
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>; pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;

View File

@@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use common_query::error::{InvalidInputTypeSnafu, Result}; use common_error::ext::BoxedError;
use common_query::error::{ExecuteSnafu, InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility}; use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType; use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast; use datatypes::types::cast::cast;
use datatypes::value::ValueRef; use datatypes::value::ValueRef;
use session::context::QueryContextRef;
use snafu::ResultExt; use snafu::ResultExt;
use sql::parser::ParserContext;
/// Create a function signature with oneof signatures of interleaving two arguments. /// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature { pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -43,3 +46,30 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
}) })
.map(|v| v.as_u64()) .map(|v| v.as_u64())
} }
pub fn parse_catalog_flow(
flow_name: &str,
query_ctx: &QueryContextRef,
) -> Result<(String, String)> {
let obj_name = ParserContext::parse_table_name(flow_name, query_ctx.sql_dialect())
.map_err(BoxedError::new)
.context(ExecuteSnafu)?;
let (catalog_name, flow_name) = match &obj_name.0[..] {
[flow_name] => (
query_ctx.current_catalog().to_string(),
flow_name.value.clone(),
),
[catalog, flow_name] => (catalog.value.clone(), flow_name.value.clone()),
_ => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"expect flow name to be <catalog>.<flow-name> or <flow-name>, actual: {}",
obj_name
),
}
.fail()
}
};
Ok((catalog_name, flow_name))
}

View File

@@ -15,6 +15,7 @@
#![feature(let_chains)] #![feature(let_chains)]
#![feature(try_blocks)] #![feature(try_blocks)]
mod adjust_flow;
mod admin; mod admin;
mod flush_flow; mod flush_flow;
mod macros; mod macros;

View File

@@ -148,6 +148,17 @@ impl FunctionState {
) -> Result<api::v1::flow::FlowResponse> { ) -> Result<api::v1::flow::FlowResponse> {
todo!() todo!()
} }
async fn adjust(
&self,
_catalog: &str,
_flow: &str,
_min_run_interval_secs: u64,
_max_filter_num_per_query: usize,
_ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
todo!()
}
} }
Self { Self {

View File

@@ -61,6 +61,7 @@ prost.workspace = true
query.workspace = true query.workspace = true
rand.workspace = true rand.workspace = true
serde.workspace = true serde.workspace = true
serde_json.workspace = true
servers.workspace = true servers.workspace = true
session.workspace = true session.workspace = true
smallvec.workspace = true smallvec.workspace = true

View File

@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicBool;
use std::sync::Arc; use std::sync::Arc;
use api::v1::flow::{ use api::v1::flow::{
flow_request, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow, flow_request, AdjustFlow, CreateRequest, DropRequest, FlowRequest, FlowResponse, FlushFlow,
}; };
use api::v1::region::InsertRequests; use api::v1::region::InsertRequests;
use catalog::CatalogManager; use catalog::CatalogManager;
@@ -32,6 +32,7 @@ use common_telemetry::{error, info, trace, warn};
use datatypes::value::Value; use datatypes::value::Value;
use futures::TryStreamExt; use futures::TryStreamExt;
use itertools::Itertools; use itertools::Itertools;
use serde::{Deserialize, Serialize};
use session::context::QueryContextBuilder; use session::context::QueryContextBuilder;
use snafu::{ensure, IntoError, OptionExt, ResultExt}; use snafu::{ensure, IntoError, OptionExt, ResultExt};
use store_api::storage::{RegionId, TableId}; use store_api::storage::{RegionId, TableId};
@@ -809,6 +810,25 @@ impl common_meta::node_manager::Flownode for FlowDualEngine {
..Default::default() ..Default::default()
}) })
} }
Some(flow_request::Body::Adjust(AdjustFlow { flow_id, options })) => {
#[derive(Debug, Serialize, Deserialize)]
struct Options {
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
}
let options: Options = serde_json::from_str(&options).with_context(|_| {
common_meta::error::DeserializeFromJsonSnafu { input: options }
})?;
self.batching_engine
.adjust_flow(
flow_id.unwrap().id as u64,
options.min_run_interval_secs,
options.max_filter_num_per_query,
)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(Default::default())
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(), other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
} }
} }
@@ -841,93 +861,6 @@ fn to_meta_err(
} }
} }
#[async_trait::async_trait]
impl common_meta::node_manager::Flownode for StreamingEngine {
async fn handle(&self, request: FlowRequest) -> MetaResult<FlowResponse> {
let query_ctx = request
.header
.and_then(|h| h.query_context)
.map(|ctx| ctx.into());
match request.body {
Some(flow_request::Body::Create(CreateRequest {
flow_id: Some(task_id),
source_table_ids,
sink_table_name: Some(sink_table_name),
create_if_not_exists,
expire_after,
comment,
sql,
flow_options,
or_replace,
})) => {
let source_table_ids = source_table_ids.into_iter().map(|id| id.id).collect_vec();
let sink_table_name = [
sink_table_name.catalog_name,
sink_table_name.schema_name,
sink_table_name.table_name,
];
let expire_after = expire_after.map(|e| e.value);
let args = CreateFlowArgs {
flow_id: task_id.id as u64,
sink_table_name,
source_table_ids,
create_if_not_exists,
or_replace,
expire_after,
comment: Some(comment),
sql: sql.clone(),
flow_options,
query_ctx,
};
let ret = self
.create_flow(args)
.await
.map_err(BoxedError::new)
.with_context(|_| CreateFlowSnafu { sql: sql.clone() })
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.inc();
Ok(FlowResponse {
affected_flows: ret
.map(|id| greptime_proto::v1::FlowId { id: id as u32 })
.into_iter()
.collect_vec(),
..Default::default()
})
}
Some(flow_request::Body::Drop(DropRequest {
flow_id: Some(flow_id),
})) => {
self.remove_flow(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
METRIC_FLOW_TASK_COUNT.dec();
Ok(Default::default())
}
Some(flow_request::Body::Flush(FlushFlow {
flow_id: Some(flow_id),
})) => {
let row = self
.flush_flow_inner(flow_id.id as u64)
.await
.map_err(to_meta_err(snafu::location!()))?;
Ok(FlowResponse {
affected_flows: vec![flow_id],
affected_rows: row as u64,
..Default::default()
})
}
other => common_meta::error::InvalidFlowRequestBodySnafu { body: other }.fail(),
}
}
async fn handle_inserts(&self, request: InsertRequests) -> MetaResult<FlowResponse> {
self.handle_inserts_inner(request)
.await
.map(|_| Default::default())
.map_err(to_meta_err(snafu::location!()))
}
}
impl FlowEngine for StreamingEngine { impl FlowEngine for StreamingEngine {
async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> { async fn create_flow(&self, args: CreateFlowArgs) -> Result<Option<FlowId>, Error> {
self.create_flow_inner(args).await self.create_flow_inner(args).await

View File

@@ -388,6 +388,20 @@ impl BatchingEngine {
pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool { pub async fn flow_exist_inner(&self, flow_id: FlowId) -> bool {
self.tasks.read().await.contains_key(&flow_id) self.tasks.read().await.contains_key(&flow_id)
} }
pub async fn adjust_flow(
&self,
flow_id: FlowId,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
) -> Result<(), Error> {
let task = self.tasks.read().await.get(&flow_id).cloned();
let task = task.with_context(|| FlowNotFoundSnafu { id: flow_id })?;
debug!("Adjusting flow {flow_id} with min_run_interval_secs={} and max_filter_num_per_query={}", min_run_interval_secs, max_filter_num_per_query);
task.adjust(min_run_interval_secs, max_filter_num_per_query);
Ok(())
}
} }
impl FlowEngine for BatchingEngine { impl FlowEngine for BatchingEngine {

View File

@@ -32,6 +32,7 @@ use crate::batching_mode::MIN_REFRESH_DURATION;
use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu}; use crate::error::{DatatypesSnafu, InternalSnafu, TimeSnafu, UnexpectedSnafu};
use crate::metrics::{ use crate::metrics::{
METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT, METRIC_FLOW_BATCHING_ENGINE_QUERY_TIME_RANGE, METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT,
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT,
}; };
use crate::{Error, FlowId}; use crate::{Error, FlowId};
@@ -52,6 +53,11 @@ pub struct TaskState {
pub(crate) shutdown_rx: oneshot::Receiver<()>, pub(crate) shutdown_rx: oneshot::Receiver<()>,
/// Task handle /// Task handle
pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>, pub(crate) task_handle: Option<tokio::task::JoinHandle<()>>,
/// min run interval in seconds
pub(crate) min_run_interval: Option<u64>,
/// max filter number per query
pub(crate) max_filter_num: Option<usize>,
} }
impl TaskState { impl TaskState {
pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self { pub fn new(query_ctx: QueryContextRef, shutdown_rx: oneshot::Receiver<()>) -> Self {
@@ -63,6 +69,8 @@ impl TaskState {
exec_state: ExecState::Idle, exec_state: ExecState::Idle,
shutdown_rx, shutdown_rx,
task_handle: None, task_handle: None,
min_run_interval: None,
max_filter_num: None,
} }
} }
@@ -87,20 +95,17 @@ impl TaskState {
pub fn get_next_start_query_time( pub fn get_next_start_query_time(
&self, &self,
flow_id: FlowId, flow_id: FlowId,
time_window_size: &Option<Duration>, _time_window_size: &Option<Duration>,
max_timeout: Option<Duration>, max_timeout: Option<Duration>,
) -> Instant { ) -> Instant {
let last_duration = max_timeout let next_duration = max_timeout
.unwrap_or(self.last_query_duration) .unwrap_or(self.last_query_duration)
.min(self.last_query_duration) .min(self.last_query_duration)
.max(MIN_REFRESH_DURATION); .max(
self.min_run_interval
let next_duration = time_window_size .map(|s| Duration::from_secs(s))
.map(|t| { .unwrap_or(MIN_REFRESH_DURATION),
let half = t / 2; );
half.max(last_duration)
})
.unwrap_or(last_duration);
// if have dirty time window, execute immediately to clean dirty time window // if have dirty time window, execute immediately to clean dirty time window
if self.dirty_time_windows.windows.is_empty() { if self.dirty_time_windows.windows.is_empty() {
@@ -246,6 +251,10 @@ impl DirtyTimeWindows {
.with_label_values(&[flow_id.to_string().as_str()]) .with_label_values(&[flow_id.to_string().as_str()])
.observe(first_nth.len() as f64); .observe(first_nth.len() as f64);
METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT
.with_label_values(&[flow_id.to_string().as_str()])
.observe(self.windows.len() as f64);
let full_time_range = first_nth let full_time_range = first_nth
.iter() .iter()
.fold(chrono::Duration::zero(), |acc, (start, end)| { .fold(chrono::Duration::zero(), |acc, (start, end)| {

View File

@@ -144,6 +144,12 @@ impl BatchingTask {
}) })
} }
pub fn adjust(&self, min_run_interval_secs: u64, max_filter_num_per_query: usize) {
let mut state = self.state.write().unwrap();
state.min_run_interval = Some(min_run_interval_secs);
state.max_filter_num = Some(max_filter_num_per_query);
}
/// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set) /// mark time window range (now - expire_after, now) as dirty (or (0, now) if expire_after not set)
/// ///
/// useful for flush_flow to flush dirty time windows range /// useful for flush_flow to flush dirty time windows range
@@ -580,19 +586,20 @@ impl BatchingTask {
), ),
})?; })?;
let expr = self let expr = {
.state let mut state = self.state.write().unwrap();
.write() let max_window_cnt = state
.unwrap() .max_filter_num
.dirty_time_windows .unwrap_or(DirtyTimeWindows::MAX_FILTER_NUM);
.gen_filter_exprs( state.dirty_time_windows.gen_filter_exprs(
&col_name, &col_name,
Some(l), Some(l),
window_size, window_size,
DirtyTimeWindows::MAX_FILTER_NUM, max_window_cnt,
self.config.flow_id, self.config.flow_id,
Some(self), Some(self),
)?; )?
};
debug!( debug!(
"Flow id={:?}, Generated filter expr: {:?}", "Flow id={:?}, Generated filter expr: {:?}",

View File

@@ -42,6 +42,14 @@ lazy_static! {
vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.] vec![60., 2. * 60., 3. * 60., 5. * 60., 10. * 60.]
) )
.unwrap(); .unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_STALLED_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!(
"greptime_flow_batching_engine_stalled_query_window_cnt",
"flow batching engine stalled query time window count",
&["flow_id"],
vec![0.0, 5., 10., 20., 40.]
)
.unwrap();
pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec = pub static ref METRIC_FLOW_BATCHING_ENGINE_QUERY_WINDOW_CNT: HistogramVec =
register_histogram_vec!( register_histogram_vec!(
"greptime_flow_batching_engine_query_window_cnt", "greptime_flow_batching_engine_query_window_cnt",

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use api::v1::flow::FlowRequestHeader; use api::v1::flow::{AdjustFlow, FlowRequestHeader};
use async_trait::async_trait; use async_trait::async_trait;
use common_error::ext::BoxedError; use common_error::ext::BoxedError;
use common_function::handlers::FlowServiceHandler; use common_function::handlers::FlowServiceHandler;
@@ -22,6 +22,7 @@ use common_query::error::Result;
use common_telemetry::tracing_context::TracingContext; use common_telemetry::tracing_context::TracingContext;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use serde_json::json;
use session::context::QueryContextRef; use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt}; use snafu::{OptionExt, ResultExt};
@@ -57,9 +58,96 @@ impl FlowServiceHandler for FlowServiceOperator {
) -> Result<api::v1::flow::FlowResponse> { ) -> Result<api::v1::flow::FlowResponse> {
self.flush_inner(catalog, flow, ctx).await self.flush_inner(catalog, flow, ctx).await
} }
async fn adjust(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
self.adjust_inner(
catalog,
flow,
min_run_interval_secs,
max_filter_num_per_query,
ctx,
)
.await
}
} }
impl FlowServiceOperator { impl FlowServiceOperator {
async fn adjust_inner(
&self,
catalog: &str,
flow: &str,
min_run_interval_secs: u64,
max_filter_num_per_query: usize,
ctx: QueryContextRef,
) -> Result<api::v1::flow::FlowResponse> {
let id = self
.flow_metadata_manager
.flow_name_manager()
.get(catalog, flow)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.context(common_meta::error::FlowNotFoundSnafu {
flow_name: format!("{}.{}", catalog, flow),
})
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
.flow_id();
let all_flownode_peers = self
.flow_metadata_manager
.flow_route_manager()
.routes(id)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?;
// order of flownodes doesn't matter here
let all_flow_nodes = FuturesUnordered::from_iter(
all_flownode_peers
.iter()
.map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
)
.collect::<Vec<_>>()
.await;
// TODO(discord9): use proper type for flow options
let options = json!({
"min_run_interval_secs": min_run_interval_secs,
"max_filter_num_per_query": max_filter_num_per_query,
});
for node in all_flow_nodes {
let _res = {
use api::v1::flow::{flow_request, FlowRequest};
let flush_req = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
),
}),
body: Some(flow_request::Body::Adjust(AdjustFlow {
flow_id: Some(api::v1::FlowId { id }),
options: options.to_string(),
})),
};
node.handle(flush_req)
.await
.map_err(BoxedError::new)
.context(common_query::error::ExecuteSnafu)?
};
}
Ok(Default::default())
}
/// Flush the flownodes according to the flow id. /// Flush the flownodes according to the flow id.
async fn flush_inner( async fn flush_inner(
&self, &self,