fix: timeout

This commit is contained in:
discord9
2025-02-26 13:05:55 +08:00
parent d5f9630641
commit c3bc69a784
3 changed files with 70 additions and 24 deletions

View File

@@ -34,6 +34,12 @@ lazy_static! {
&["flow_id"]
)
.unwrap();
pub static ref METRIC_FLOW_RULE_ENGINE_SLOW_QUERY: HistogramVec = register_histogram_vec!(
"greptime_flow_rule_engine_slow_query",
"flow rule engine slow query",
&["flow_id", "sql", "peer"]
)
.unwrap();
pub static ref METRIC_FLOW_RUN_INTERVAL_MS: IntGauge =
register_int_gauge!("greptime_flow_run_interval_ms", "flow run interval in ms").unwrap();
pub static ref METRIC_FLOW_ROWS: IntCounterVec = register_int_counter_vec!(

View File

@@ -32,11 +32,18 @@ use tokio::time::Instant;
use super::frontend_client::FrontendClient;
use super::{df_plan_to_sql, AddFilterRewriter};
use crate::adapter::{CreateFlowArgs, FlowId};
use crate::error::{DatafusionSnafu, DatatypesSnafu, UnexpectedSnafu};
use crate::metrics::METRIC_FLOW_RULE_ENGINE_QUERY_TIME;
use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowAlreadyExistSnafu, UnexpectedSnafu};
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan};
use crate::Error;
/// TODO(discord9): make those constants configurable
/// The default rule engine query timeout is 5 minutes
pub const DEFAULT_RULE_ENGINE_QUERY_TIMEOUT: Duration = Duration::from_secs(5 * 60);
/// will output a warn log for any query that runs for more that 1 minutes, and also every 1 minutes when that query is still running
pub const SLOW_QUERY_THRESHOLD: Duration = Duration::from_secs(60);
/// TODO(discord9): determine how to configure refresh rate
pub struct RecordingRuleEngine {
tasks: RwLock<BTreeMap<FlowId, RecordingRuleTask>>,
@@ -64,8 +71,8 @@ impl RecordingRuleEngine {
flow_id,
sink_table_name,
source_table_ids: _,
create_if_not_exists: _,
or_replace: _,
create_if_not_exists,
or_replace,
expire_after,
comment: _,
sql,
@@ -73,6 +80,26 @@ impl RecordingRuleEngine {
query_ctx,
} = args;
// or replace logic
{
let is_exist = self.tasks.read().await.contains_key(&flow_id);
match (create_if_not_exists, or_replace, is_exist) {
// if replace, ignore that old flow exists
(_, true, true) => {
info!("Replacing flow with id={}", flow_id);
}
(false, false, true) => FlowAlreadyExistSnafu { id: flow_id }.fail()?,
// already exists, and not replace, return None
(true, false, true) => {
info!("Flow with id={} already exists, do nothing", flow_id);
return Ok(None);
}
// continue as normal
(_, _, false) => (),
}
}
let flow_type = flow_options.get(FlowType::FLOW_TYPE_KEY);
ensure!(
@@ -199,31 +226,43 @@ impl RecordingRuleTask {
let instant = Instant::now();
let flow_id = self.flow_id;
let db_client = frontend_client.get_database_client().await?;
let peer_addr = db_client.peer.addr;
debug!(
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {insert_into}",
self.expire_after, db_client.peer.addr
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {}",
self.expire_after, peer_addr, &insert_into
);
let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
.start_timer();
let res = frontend_client.sql(&insert_into).await;
let res = db_client.database.sql(&insert_into).await;
drop(timer);
let elapsed = instant.elapsed();
if res.is_ok() {
if let Ok(res1) = &res {
debug!(
"Flow {flow_id} executed, result: {res:?}, elapsed: {:?}",
"Flow {flow_id} executed, result: {res1:?}, elapsed: {:?}",
elapsed
);
} else {
} else if let Err(res) = &res {
warn!(
"Failed to execute Flow {flow_id}, result: {res:?}, elapsed: {:?} with query: {insert_into}",
elapsed
"Failed to execute Flow {flow_id} on frontend {}, result: {res:?}, elapsed: {:?} with query: {}",
peer_addr, elapsed, &insert_into
);
}
// record slow query
if elapsed >= SLOW_QUERY_THRESHOLD {
warn!(
"Flow {flow_id} on frontend {} executed for {:?} before complete, query: {}",
peer_addr, elapsed, &insert_into
);
METRIC_FLOW_RULE_ENGINE_SLOW_QUERY
.with_label_values(&[flow_id.to_string().as_str(), &insert_into, &peer_addr])
.observe(elapsed.as_secs_f64());
}
self.state
.write()
.await

View File

@@ -18,16 +18,26 @@ use std::sync::Arc;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use meta_client::client::MetaClient;
use snafu::ResultExt;
use crate::error::{ExternalSnafu, UnexpectedSnafu};
use crate::recording_rules::engine::DEFAULT_RULE_ENGINE_QUERY_TIMEOUT;
use crate::Error;
fn default_channel_mgr() -> ChannelManager {
let cfg = ChannelConfig::new().timeout(DEFAULT_RULE_ENGINE_QUERY_TIMEOUT);
ChannelManager::with_config(cfg)
}
fn client_from_urls(addrs: Vec<String>) -> Client {
Client::with_manager_and_urls(default_channel_mgr(), addrs)
}
/// A simple frontend client able to execute sql using grpc protocol
#[derive(Debug)]
pub enum FrontendClient {
@@ -65,7 +75,7 @@ impl FrontendClient {
addr: addr.clone(),
};
let client = Client::with_urls(vec![addr]);
let client = client_from_urls(vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: DatabaseWithPeer::new(database, peer),
@@ -125,7 +135,7 @@ impl FrontendClient {
}
.fail()?
};
let client = Client::with_urls(vec![peer.addr.clone()]);
let client = client_from_urls(vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
}
@@ -137,13 +147,4 @@ impl FrontendClient {
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
}
}
pub async fn sql(&self, sql: &str) -> Result<Output, Error> {
let db = self.get_database_client().await?;
db.database
.sql(sql)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
}