From f353a200c84d1e1c746f9987f030ea7e81eb7728 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 24 Feb 2025 21:23:40 +0800 Subject: [PATCH] fix: heartbeat&expire_after unit --- src/flow/src/heartbeat.rs | 12 +- src/flow/src/recording_rules.rs | 45 ++++-- src/flow/src/recording_rules/engine.rs | 75 ++++------ .../src/recording_rules/frontend_client.rs | 128 +++++++----------- src/flow/src/server.rs | 1 + 5 files changed, 117 insertions(+), 144 deletions(-) diff --git a/src/flow/src/heartbeat.rs b/src/flow/src/heartbeat.rs index 45786a4d80..ae98e015bc 100644 --- a/src/flow/src/heartbeat.rs +++ b/src/flow/src/heartbeat.rs @@ -14,7 +14,6 @@ //! Send heartbeat from flownode to metasrv -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use api::v1::meta::{HeartbeatRequest, Peer}; @@ -25,7 +24,7 @@ use common_meta::heartbeat::handler::{ use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_meta::key::flow::flow_state::FlowStat; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::{debug, error, info}; use greptime_proto::v1::meta::NodeInfo; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use servers::addrs; @@ -66,7 +65,6 @@ pub struct HeartbeatTask { report_interval: Duration, retry_interval: Duration, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, - running: Arc, query_stat_size: Option, } @@ -89,7 +87,6 @@ impl HeartbeatTask { report_interval: heartbeat_opts.interval, retry_interval: heartbeat_opts.retry_interval, resp_handler_executor, - running: Arc::new(AtomicBool::new(false)), query_stat_size: None, } } @@ -130,13 +127,6 @@ impl HeartbeatTask { pub fn shutdown(&self) { info!("Close heartbeat task for flownode"); - if self - .running - .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire) - .is_err() - { - warn!("Call close heartbeat task multiple times"); - } } fn new_heartbeat_request( diff --git a/src/flow/src/recording_rules.rs b/src/flow/src/recording_rules.rs index 943f9b80d0..51b9211e23 100644 --- a/src/flow/src/recording_rules.rs +++ b/src/flow/src/recording_rules.rs @@ -294,7 +294,7 @@ fn find_expr_time_window_lower_bound( reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"), }); - let output_unit = lower_bound + let input_time_unit = lower_bound .context(UnexpectedSnafu { reason: "should have lower bound", })? @@ -310,9 +310,10 @@ fn find_expr_time_window_lower_bound( reason: "should have upper bound", })? .value(); + while low < high { let mid = (low + high) / 2; - let mid_probe = common_time::Timestamp::new(mid, output_unit); + let mid_probe = common_time::Timestamp::new(mid, input_time_unit); let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?; match mid_time_window.cmp(&cur_time_window) { @@ -325,7 +326,7 @@ fn find_expr_time_window_lower_bound( } } - let final_lower_bound_for_time_window = common_time::Timestamp::new(low, output_unit); + let final_lower_bound_for_time_window = common_time::Timestamp::new(low, input_time_unit); Ok(Some(final_lower_bound_for_time_window)) } @@ -434,18 +435,20 @@ fn find_expr_time_window_upper_bound( fn eval_ts_to_ts( phy: &PhysicalExprRef, df_schema: &DFSchema, - value: Timestamp, + input_value: Timestamp, ) -> Result { - let ts_vector = match value.unit() { - TimeUnit::Second => TimestampSecondVector::from_vec(vec![value.value()]).to_arrow_array(), + let ts_vector = match input_value.unit() { + TimeUnit::Second => { + TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array() + } TimeUnit::Millisecond => { - TimestampMillisecondVector::from_vec(vec![value.value()]).to_arrow_array() + TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array() } TimeUnit::Microsecond => { - TimestampMicrosecondVector::from_vec(vec![value.value()]).to_arrow_array() + TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array() } TimeUnit::Nanosecond => { - TimestampNanosecondVector::from_vec(vec![value.value()]).to_arrow_array() + TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array() } }; @@ -617,6 +620,28 @@ mod test { let ctx = QueryContext::arc(); let testcases = [ + // same alias is not same column + ( + "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;", + Timestamp::new(1740394109, TimeUnit::Second), + ( + "ts".to_string(), + Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)), + Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)), + ), + "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts" + ), + // complex time window index + ( + "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;", + Timestamp::new(1740394109, TimeUnit::Second), + ( + "ts".to_string(), + Some(Timestamp::new(1740394080, TimeUnit::Second)), + Some(Timestamp::new(1740394140, TimeUnit::Second)), + ), + "SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')" + ), // no time index ( "SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;", @@ -690,7 +715,7 @@ mod test { find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone()) .await .unwrap(); - assert_eq!(real, expected); + assert_eq!(expected, real); let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false) .await diff --git a/src/flow/src/recording_rules/engine.rs b/src/flow/src/recording_rules/engine.rs index 61ad064021..3bfc43e39a 100644 --- a/src/flow/src/recording_rules/engine.rs +++ b/src/flow/src/recording_rules/engine.rs @@ -17,14 +17,14 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use common_meta::ddl::create_flow::FlowType; -use common_telemetry::info; use common_telemetry::tracing::warn; +use common_telemetry::{debug, info}; use common_time::Timestamp; use datafusion_common::tree_node::TreeNode; use datatypes::value::Value; use query::QueryEngineRef; use session::context::QueryContextRef; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{ensure, ResultExt}; use tokio::sync::oneshot::error::TryRecvError; use tokio::sync::{oneshot, RwLock}; use tokio::time::Instant; @@ -32,7 +32,7 @@ use tokio::time::Instant; use super::frontend_client::FrontendClient; use super::{df_plan_to_sql, AddFilterRewriter}; use crate::adapter::{CreateFlowArgs, FlowId}; -use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowNotFoundSnafu, UnexpectedSnafu}; +use crate::error::{DatafusionSnafu, DatatypesSnafu, UnexpectedSnafu}; use crate::metrics::METRIC_FLOW_RULE_ENGINE_QUERY_TIME; use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan}; use crate::Error; @@ -138,45 +138,13 @@ impl RecordingRuleEngine { } Ok(()) } - - async fn gen_and_exec_query_for_flow( - &self, - flow_id: FlowId, - need_upper_bound: bool, - ) -> Result<(), Error> { - let (new_query, state_ref) = { - let tasks = self.tasks.read().await; - let task = tasks - .get(&flow_id) - .context(FlowNotFoundSnafu { id: flow_id })?; - let new_query = task - .gen_query_with_time_window(self.engine.clone(), need_upper_bound) - .await?; - task.state.write().await.exec_state = ExecState::Executing; - (new_query, task.state.clone()) - }; - - let instant = Instant::now(); - info!("Executing flow {flow_id} with query {new_query}"); - let res = self.frontend_client.sql(&new_query).await; - let elapsed = instant.elapsed(); - info!( - "Flow {flow_id} executed, result: {res:?}, elapsed: {:?}", - elapsed - ); - { - let mut state = state_ref.write().await; - state.set_next_sleep(elapsed, res.is_ok()); - } - Ok(()) - } } #[derive(Debug, Clone)] pub struct RecordingRuleTask { flow_id: FlowId, query: String, - /// in millisecond + /// in seconds expire_after: Option, sink_table_name: [String; 3], state: Arc>, @@ -230,7 +198,11 @@ impl RecordingRuleTask { let instant = Instant::now(); let flow_id = self.flow_id; - info!("Executing flow {flow_id} with query {insert_into}"); + let db_client = frontend_client.get_database_client().await?; + debug!( + "Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {insert_into}", + self.expire_after, db_client.peer.addr + ); let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME .with_label_values(&[flow_id.to_string().as_str()]) @@ -240,14 +212,23 @@ impl RecordingRuleTask { drop(timer); let elapsed = instant.elapsed(); - info!( - "Flow {flow_id} executed, result: {res:?}, elapsed: {:?}", - elapsed - ); - { - let mut state = self.state.write().await; - state.set_next_sleep(elapsed, res.is_ok()); + if res.is_ok() { + debug!( + "Flow {flow_id} executed, result: {res:?}, elapsed: {:?}", + elapsed + ); + } else { + warn!( + "Failed to execute Flow {flow_id}, result: {res:?}, elapsed: {:?} with query: {insert_into}", + elapsed + ); } + + self.state + .write() + .await + .after_query_exec(elapsed, res.is_ok()); + let sleep_until = { let mut state = self.state.write().await; match state.shutdown_rx.try_recv() { @@ -276,13 +257,13 @@ impl RecordingRuleTask { .expect("Time went backwards"); let low_bound = self .expire_after - .map(|e| since_the_epoch.as_millis() - e as u128); + .map(|e| since_the_epoch.as_secs() - e as u64); let Some(low_bound) = low_bound else { return Ok(self.query.clone()); }; - let low_bound = Timestamp::new_millisecond(low_bound as i64); + let low_bound = Timestamp::new_second(low_bound as i64); let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?; @@ -363,7 +344,7 @@ impl RecordingRuleState { /// called after last query is done /// `is_succ` indicate whether the last query is successful - pub fn set_next_sleep(&mut self, elapsed: Duration, _is_succ: bool) { + pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) { self.exec_state = ExecState::Idle; self.last_query_duration = elapsed; self.last_update_time = Instant::now(); diff --git a/src/flow/src/recording_rules/frontend_client.rs b/src/flow/src/recording_rules/frontend_client.rs index 3af09d515a..7e82df7860 100644 --- a/src/flow/src/recording_rules/frontend_client.rs +++ b/src/flow/src/recording_rules/frontend_client.rs @@ -14,7 +14,6 @@ //! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user -use std::collections::BTreeMap; use std::sync::Arc; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -24,8 +23,7 @@ use common_meta::peer::Peer; use common_meta::rpc::store::RangeRequest; use common_query::Output; use meta_client::client::MetaClient; -use snafu::{OptionExt, ResultExt}; -use tokio::sync::Mutex; +use snafu::ResultExt; use crate::error::{ExternalSnafu, UnexpectedSnafu}; use crate::Error; @@ -35,60 +33,50 @@ use crate::Error; pub enum FrontendClient { Distributed { meta_client: Arc, - /// list of frontend node and connection to it - database_clients: Mutex, }, Standalone { /// for the sake of simplicity still use grpc even in standalone mode /// notice the client here should all be lazy, so that can wait after frontend is booted then make conn /// TODO(discord9): not use grpc under standalone mode - database_client: Database, + database_client: DatabaseWithPeer, }, } -#[derive(Debug, Default)] -pub struct RoundRobinClients { - clients: BTreeMap, - next: usize, +#[derive(Debug, Clone)] +pub struct DatabaseWithPeer { + pub database: Database, + pub peer: Peer, } -impl RoundRobinClients { - fn get_next_client(&mut self) -> Option { - if self.clients.is_empty() { - return None; - } - let idx = self.next % self.clients.len(); - self.next = (self.next + 1) % self.clients.len(); - Some(self.clients.iter().nth(idx).unwrap().1 .1.clone()) +impl DatabaseWithPeer { + fn new(database: Database, peer: Peer) -> Self { + Self { database, peer } } } impl FrontendClient { pub fn from_meta_client(meta_client: Arc) -> Self { - Self::Distributed { - meta_client, - database_clients: Default::default(), - } + Self::Distributed { meta_client } } pub fn from_static_grpc_addr(addr: String) -> Self { + let peer = Peer { + id: 0, + addr: addr.clone(), + }; + let client = Client::with_urls(vec![addr]); let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); Self::Standalone { - database_client: database, + database_client: DatabaseWithPeer::new(database, peer), } } } impl FrontendClient { - async fn update_frontend_addr(&self) -> Result<(), Error> { - // TODO(discord9): better error handling - let Self::Distributed { - meta_client, - database_clients, - } = self - else { - return Ok(()); + async fn scan_for_frontend(&self) -> Result, Error> { + let Self::Distributed { meta_client, .. } = self else { + return Ok(vec![]); }; let cluster_client = meta_client .cluster_client() @@ -97,14 +85,13 @@ impl FrontendClient { let cluster_id = meta_client.id().0; let prefix = NodeInfoKey::key_prefix_with_role(cluster_id, Role::Frontend); let req = RangeRequest::new().with_prefix(prefix); - let res = cluster_client + let resp = cluster_client .range(req) .await .map_err(BoxedError::new) .context(ExternalSnafu)?; - - let mut clients = database_clients.lock().await; - for kv in res.kvs { + let mut res = Vec::with_capacity(resp.kvs.len()); + for kv in resp.kvs { let key = NodeInfoKey::try_from(kv.key) .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -112,60 +99,49 @@ impl FrontendClient { let val = NodeInfo::try_from(kv.value) .map_err(BoxedError::new) .context(ExternalSnafu)?; + res.push((key, val)); + } + Ok(res) + } - if key.role != Role::Frontend { - return UnexpectedSnafu { - reason: format!( - "Unexpected role(should be frontend) in key: {:?}, val: {:?}", - key, val - ), - } - .fail(); - } - if let Some((peer, database)) = clients.clients.get_mut(&key.node_id) { - if val.peer != *peer { - // update only if not the same - *peer = val.peer.clone(); - let client = Client::with_urls(vec![peer.addr.clone()]); - *database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - } - } else { - let peer = val.peer; - let client = Client::with_urls(vec![peer.addr.clone()]); - let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); - clients - .clients - .insert(key.node_id, (peer.clone(), database.clone())); + /// Get the database with max `last_activity_ts` + async fn get_last_active_frontend(&self) -> Result { + if let Self::Standalone { database_client } = self { + return Ok(database_client.clone()); + } + + let frontends = self.scan_for_frontend().await?; + let mut last_activity_ts = i64::MIN; + let mut peer = None; + for (_key, val) in frontends.iter() { + if val.last_activity_ts > last_activity_ts { + last_activity_ts = val.last_activity_ts; + peer = Some(val.peer.clone()); } } - drop(clients); - - Ok(()) + let Some(peer) = peer else { + UnexpectedSnafu { + reason: format!("No frontend available: {:?}", frontends), + } + .fail()? + }; + let client = Client::with_urls(vec![peer.addr.clone()]); + let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client); + Ok(DatabaseWithPeer::new(database, peer)) } /// Get a database client, and possibly update it before returning. - async fn get_database_client(&self) -> Result { + pub async fn get_database_client(&self) -> Result { match self { Self::Standalone { database_client } => Ok(database_client.clone()), - Self::Distributed { - meta_client: _, - database_clients, - } => { - self.update_frontend_addr().await?; - database_clients - .lock() - .await - .get_next_client() - .context(UnexpectedSnafu { - reason: "Can't get any database client", - }) - } + Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await, } } pub async fn sql(&self, sql: &str) -> Result { let db = self.get_database_client().await?; - db.sql(sql) + db.database + .sql(sql) .await .map_err(BoxedError::new) .context(ExternalSnafu) diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 8810689855..dcdc5643bd 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -246,6 +246,7 @@ impl FlownodeInstance { self.server.shutdown().await.context(ShutdownServerSnafu)?; if let Some(task) = &self.heartbeat_task { + info!("Close heartbeat task for flownode"); task.shutdown(); }