From c3bc69a784301e243331aef4e4542559a7dc31b4 Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 26 Feb 2025 13:05:55 +0800 Subject: [PATCH] fix: timeout --- src/flow/src/metrics.rs | 6 ++ src/flow/src/recording_rules/engine.rs | 63 +++++++++++++++---- .../src/recording_rules/frontend_client.rs | 25 ++++---- 3 files changed, 70 insertions(+), 24 deletions(-) diff --git a/src/flow/src/metrics.rs b/src/flow/src/metrics.rs index b20cc9acb4..21a02683c9 100644 --- a/src/flow/src/metrics.rs +++ b/src/flow/src/metrics.rs @@ -34,6 +34,12 @@ lazy_static! { &["flow_id"] ) .unwrap(); + pub static ref METRIC_FLOW_RULE_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!( + "greptime_flow_rule_engine_slow_query", + "flow rule engine slow query", + &["flow_id", "sql", "peer"] + ) + .unwrap(); pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge = register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap(); pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!( diff --git a/src/flow/src/recording_rules/engine.rs b/src/flow/src/recording_rules/engine.rs index 3bfc43e39a..d3c0fa0d40 100644 --- a/src/flow/src/recording_rules/engine.rs +++ b/src/flow/src/recording_rules/engine.rs @@ -32,11 +32,18 @@ use tokio::time::Instant; use super::frontend_client::FrontendClient; use super::{df_plan_to_sql, AddFilterRewriter}; use crate::adapter::{CreateFlowArgs, FlowId}; -use crate::error::{DatafusionSnafu, DatatypesSnafu, UnexpectedSnafu}; -use crate::metrics::METRIC_FLOW_RULE_ENGINE_QUERY_TIME; +use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowAlreadyExistSnafu, UnexpectedSnafu}; +use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY}; use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan}; use crate::Error; +/// TODO(discord9): make those constants configurable +/// The default rule engine query timeout is 5 minutes +pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(5 * 60); + +/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running +pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60); + /// TODO(discord9): determine how to configure refresh rate pub struct RecordingRuleEngine { tasks: RwLock>, @@ -64,8 +71,8 @@ impl RecordingRuleEngine { flow_id, sink_table_name, source_table_ids: _, - create_if_not_exists: _, - or_replace: _, + create_if_not_exists, + or_replace, expire_after, comment: _, sql, @@ -73,6 +80,26 @@ impl RecordingRuleEngine { query_ctx, } = args; + // or replace logic + { + let is_exist = self.tasks.read().await.contains_key(&flow_id); + match (create_if_not_exists, or_replace, is_exist) { + // if replace, ignore that old flow exists + (_, true, true) => { + info!("Replacing flow with id={}", flow_id); + } + (false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?, + // already exists, and not replace, return None + (true, false, true) => { + info!("Flow with id={} already exists, do nothing", flow_id); + return Ok(None); + } + + // continue as normal + (_, _, false) => (), + } + } + let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY); ensure!( @@ -199,31 +226,43 @@ impl RecordingRuleTask { let instant = Instant::now(); let flow_id = self.flow_id; let db_client = frontend_client.get_database_client().await?; + let peer_addr = db_client.peer.addr; debug!( - "Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {insert_into}", - self.expire_after, db_client.peer.addr + "Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}", + self.expire_after, peer_addr, &insert_into ); let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME .with_label_values(&[flow_id.to_string().as_str()]) .start_timer(); - let res = frontend_client.sql(&insert_into).await; + let res = db_client.database.sql(&insert_into).await; drop(timer); let elapsed = instant.elapsed(); - if res.is_ok() { + if let Ok(res1) = &res { debug!( - "Flow {flow_id} executed, result: {res:?}, elapsed: {:?}", + "Flow {flow_id} executed, result: {res1:?}, elapsed: {:?}", elapsed ); - } else { + } else if let Err(res) = &res { warn!( - "Failed to execute Flow {flow_id}, result: {res:?}, elapsed: {:?} with query: {insert_into}", - elapsed + "Failed to execute Flow {flow_id} on frontend {}, result: {res:?}, elapsed: {:?} with query: {}", + peer_addr, elapsed, &insert_into ); } + // record slow query + if elapsed >= SLOW_QUERY_THRESHOLD { + warn!( + "Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}", + peer_addr, elapsed, &insert_into + ); + METRIC_FLOW_RULE_ENGINE_SLOW_QUERY + .with_label_values(&[flow_id.to_string().as_str(), &insert_into, &peer_addr]) + .observe(elapsed.as_secs_f64()); + } + self.state .write() .await diff --git a/src/flow/src/recording_rules/frontend_client.rs b/src/flow/src/recording_rules/frontend_client.rs index 7e82df7860..c709bc3858 100644 --- a/src/flow/src/recording_rules/frontend_client.rs +++ b/src/flow/src/recording_rules/frontend_client.rs @@ -18,16 +18,26 @@ use std::sync::Arc; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_error::ext::BoxedError; +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::cluster::{NodeInfo, NodeInfoKey, Role}; use common_meta::peer::Peer; use common_meta::rpc::store::RangeRequest; -use common_query::Output; use meta_client::client::MetaClient; use snafu::ResultExt; use crate::error::{ExternalSnafu, UnexpectedSnafu}; +use crate::recording_rules::engine::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT; use crate::Error; +fn default_channel_mgr() -> ChannelManager { + let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT); + ChannelManager::with_config(cfg) +} + +fn client_from_urls(addrs: Vec) -> Client { + Client::with_manager_and_urls(default_channel_mgr(), addrs) +} + /// A simple frontend client able to execute sql using grpc protocol #[derive(Debug)] pub enum FrontendClient { @@ -65,7 +75,7 @@ impl FrontendClient { addr: addr.clone(), }; - let client = Client::with_urls(vec![addr]); + let client = client_from_urls(vec![addr]); let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); Self::Standalone { database_client: DatabaseWithPeer::new(database, peer), @@ -125,7 +135,7 @@ impl FrontendClient { } .fail()? }; - let client = Client::with_urls(vec![peer.addr.clone()]); + let client = client_from_urls(vec![peer.addr.clone()]); let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); Ok(DatabaseWithPeer::new(database, peer)) } @@ -137,13 +147,4 @@ impl FrontendClient { Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await, } } - - pub async fn sql(&self, sql: &str) -> Result { - let db = self.get_database_client().await?; - db.database - .sql(sql) - .await - .map_err(BoxedError::new) - .context(ExternalSnafu) - } }