fix: heartbeat&expire_after unit

This commit is contained in:
discord9
2025-02-24 21:23:40 +08:00
parent 0deb670ae7
commit f353a200c8
5 changed files with 117 additions and 144 deletions

View File

@@ -14,7 +14,6 @@
//! Send heartbeat from flownode to metasrv
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use api::v1::meta::{HeartbeatRequest, Peer};
@@ -25,7 +24,7 @@ use common_meta::heartbeat::handler::{
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_meta::key::flow::flow_state::FlowStat;
use common_telemetry::{debug, error, info, warn};
use common_telemetry::{debug, error, info};
use greptime_proto::v1::meta::NodeInfo;
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use servers::addrs;
@@ -66,7 +65,6 @@ pub struct HeartbeatTask {
report_interval: Duration,
retry_interval: Duration,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
running: Arc<AtomicBool>,
query_stat_size: Option<SizeReportSender>,
}
@@ -89,7 +87,6 @@ impl HeartbeatTask {
report_interval: heartbeat_opts.interval,
retry_interval: heartbeat_opts.retry_interval,
resp_handler_executor,
running: Arc::new(AtomicBool::new(false)),
query_stat_size: None,
}
}
@@ -130,13 +127,6 @@ impl HeartbeatTask {
pub fn shutdown(&self) {
info!("Close heartbeat task for flownode");
if self
.running
.compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
warn!("Call close heartbeat task multiple times");
}
}
fn new_heartbeat_request(

View File

@@ -294,7 +294,7 @@ fn find_expr_time_window_lower_bound(
reason: format!(" unit mismatch for time window expression {expr:?}, found {lower_bound:?} and {upper_bound:?}"),
});
let output_unit = lower_bound
let input_time_unit = lower_bound
.context(UnexpectedSnafu {
reason: "should have lower bound",
})?
@@ -310,9 +310,10 @@ fn find_expr_time_window_lower_bound(
reason: "should have upper bound",
})?
.value();
while low < high {
let mid = (low + high) / 2;
let mid_probe = common_time::Timestamp::new(mid, output_unit);
let mid_probe = common_time::Timestamp::new(mid, input_time_unit);
let mid_time_window = eval_ts_to_ts(&phy_expr, df_schema, mid_probe)?;
match mid_time_window.cmp(&cur_time_window) {
@@ -325,7 +326,7 @@ fn find_expr_time_window_lower_bound(
}
}
let final_lower_bound_for_time_window = common_time::Timestamp::new(low, output_unit);
let final_lower_bound_for_time_window = common_time::Timestamp::new(low, input_time_unit);
Ok(Some(final_lower_bound_for_time_window))
}
@@ -434,18 +435,20 @@ fn find_expr_time_window_upper_bound(
fn eval_ts_to_ts(
phy: &PhysicalExprRef,
df_schema: &DFSchema,
value: Timestamp,
input_value: Timestamp,
) -> Result<Timestamp, Error> {
let ts_vector = match value.unit() {
TimeUnit::Second => TimestampSecondVector::from_vec(vec![value.value()]).to_arrow_array(),
let ts_vector = match input_value.unit() {
TimeUnit::Second => {
TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Millisecond => {
TimestampMillisecondVector::from_vec(vec![value.value()]).to_arrow_array()
TimestampMillisecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Microsecond => {
TimestampMicrosecondVector::from_vec(vec![value.value()]).to_arrow_array()
TimestampMicrosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
TimeUnit::Nanosecond => {
TimestampNanosecondVector::from_vec(vec![value.value()]).to_arrow_array()
TimestampNanosecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
};
@@ -617,6 +620,28 @@ mod test {
let ctx = QueryContext::arc();
let testcases = [
// same alias is not same column
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts GROUP BY ts;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"
),
// complex time window index
(
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts GROUP BY time_window;",
Timestamp::new(1740394109, TimeUnit::Second),
(
"ts".to_string(),
Some(Timestamp::new(1740394080, TimeUnit::Second)),
Some(Timestamp::new(1740394140, TimeUnit::Second)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS time_window FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:00' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:49:00' AS TIMESTAMP))) GROUP BY arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)')"
),
// no time index
(
"SELECT date_bin('5 minutes', ts) FROM numbers_with_ts;",
@@ -690,7 +715,7 @@ mod test {
find_plan_time_window_bound(&plan, current, ctx.clone(), query_engine.clone())
.await
.unwrap();
assert_eq!(real, expected);
assert_eq!(expected, real);
let plan = sql_to_df_plan(ctx.clone(), query_engine.clone(), sql, false)
.await

View File

@@ -17,14 +17,14 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use common_meta::ddl::create_flow::FlowType;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use common_telemetry::{debug, info};
use common_time::Timestamp;
use datafusion_common::tree_node::TreeNode;
use datatypes::value::Value;
use query::QueryEngineRef;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use snafu::{ensure, ResultExt};
use tokio::sync::oneshot::error::TryRecvError;
use tokio::sync::{oneshot, RwLock};
use tokio::time::Instant;
@@ -32,7 +32,7 @@ use tokio::time::Instant;
use super::frontend_client::FrontendClient;
use super::{df_plan_to_sql, AddFilterRewriter};
use crate::adapter::{CreateFlowArgs, FlowId};
use crate::error::{DatafusionSnafu, DatatypesSnafu, FlowNotFoundSnafu, UnexpectedSnafu};
use crate::error::{DatafusionSnafu, DatatypesSnafu, UnexpectedSnafu};
use crate::metrics::METRIC_FLOW_RULE_ENGINE_QUERY_TIME;
use crate::recording_rules::{find_plan_time_window_bound, sql_to_df_plan};
use crate::Error;
@@ -138,45 +138,13 @@ impl RecordingRuleEngine {
}
Ok(())
}
async fn gen_and_exec_query_for_flow(
&self,
flow_id: FlowId,
need_upper_bound: bool,
) -> Result<(), Error> {
let (new_query, state_ref) = {
let tasks = self.tasks.read().await;
let task = tasks
.get(&flow_id)
.context(FlowNotFoundSnafu { id: flow_id })?;
let new_query = task
.gen_query_with_time_window(self.engine.clone(), need_upper_bound)
.await?;
task.state.write().await.exec_state = ExecState::Executing;
(new_query, task.state.clone())
};
let instant = Instant::now();
info!("Executing flow {flow_id} with query {new_query}");
let res = self.frontend_client.sql(&new_query).await;
let elapsed = instant.elapsed();
info!(
"Flow {flow_id} executed, result: {res:?}, elapsed: {:?}",
elapsed
);
{
let mut state = state_ref.write().await;
state.set_next_sleep(elapsed, res.is_ok());
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct RecordingRuleTask {
flow_id: FlowId,
query: String,
/// in millisecond
/// in seconds
expire_after: Option<i64>,
sink_table_name: [String; 3],
state: Arc<RwLock<RecordingRuleState>>,
@@ -230,7 +198,11 @@ impl RecordingRuleTask {
let instant = Instant::now();
let flow_id = self.flow_id;
info!("Executing flow {flow_id} with query {insert_into}");
let db_client = frontend_client.get_database_client().await?;
debug!(
"Executing flow {flow_id}(expire_after={:?} secs) on {:?} with query {insert_into}",
self.expire_after, db_client.peer.addr
);
let timer = METRIC_FLOW_RULE_ENGINE_QUERY_TIME
.with_label_values(&[flow_id.to_string().as_str()])
@@ -240,14 +212,23 @@ impl RecordingRuleTask {
drop(timer);
let elapsed = instant.elapsed();
info!(
"Flow {flow_id} executed, result: {res:?}, elapsed: {:?}",
elapsed
);
{
let mut state = self.state.write().await;
state.set_next_sleep(elapsed, res.is_ok());
if res.is_ok() {
debug!(
"Flow {flow_id} executed, result: {res:?}, elapsed: {:?}",
elapsed
);
} else {
warn!(
"Failed to execute Flow {flow_id}, result: {res:?}, elapsed: {:?} with query: {insert_into}",
elapsed
);
}
self.state
.write()
.await
.after_query_exec(elapsed, res.is_ok());
let sleep_until = {
let mut state = self.state.write().await;
match state.shutdown_rx.try_recv() {
@@ -276,13 +257,13 @@ impl RecordingRuleTask {
.expect("Time went backwards");
let low_bound = self
.expire_after
.map(|e| since_the_epoch.as_millis() - e as u128);
.map(|e| since_the_epoch.as_secs() - e as u64);
let Some(low_bound) = low_bound else {
return Ok(self.query.clone());
};
let low_bound = Timestamp::new_millisecond(low_bound as i64);
let low_bound = Timestamp::new_second(low_bound as i64);
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
@@ -363,7 +344,7 @@ impl RecordingRuleState {
/// called after last query is done
/// `is_succ` indicate whether the last query is successful
pub fn set_next_sleep(&mut self, elapsed: Duration, _is_succ: bool) {
pub fn after_query_exec(&mut self, elapsed: Duration, _is_succ: bool) {
self.exec_state = ExecState::Idle;
self.last_query_duration = elapsed;
self.last_update_time = Instant::now();

View File

@@ -14,7 +14,6 @@
//! Frontend client to run flow as recording rule which is time-window-aware normal query triggered every tick set by user
use std::collections::BTreeMap;
use std::sync::Arc;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
@@ -24,8 +23,7 @@ use common_meta::peer::Peer;
use common_meta::rpc::store::RangeRequest;
use common_query::Output;
use meta_client::client::MetaClient;
use snafu::{OptionExt, ResultExt};
use tokio::sync::Mutex;
use snafu::ResultExt;
use crate::error::{ExternalSnafu, UnexpectedSnafu};
use crate::Error;
@@ -35,60 +33,50 @@ use crate::Error;
pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
/// list of frontend node and connection to it
database_clients: Mutex<RoundRobinClients>,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
/// notice the client here should all be lazy, so that can wait after frontend is booted then make conn
/// TODO(discord9): not use grpc under standalone mode
database_client: Database,
database_client: DatabaseWithPeer,
},
}
#[derive(Debug, Default)]
pub struct RoundRobinClients {
clients: BTreeMap<u64, (Peer, Database)>,
next: usize,
#[derive(Debug, Clone)]
pub struct DatabaseWithPeer {
pub database: Database,
pub peer: Peer,
}
impl RoundRobinClients {
fn get_next_client(&mut self) -> Option<Database> {
if self.clients.is_empty() {
return None;
}
let idx = self.next % self.clients.len();
self.next = (self.next + 1) % self.clients.len();
Some(self.clients.iter().nth(idx).unwrap().1 .1.clone())
impl DatabaseWithPeer {
fn new(database: Database, peer: Peer) -> Self {
Self { database, peer }
}
}
impl FrontendClient {
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed {
meta_client,
database_clients: Default::default(),
}
Self::Distributed { meta_client }
}
pub fn from_static_grpc_addr(addr: String) -> Self {
let peer = Peer {
id: 0,
addr: addr.clone(),
};
let client = Client::with_urls(vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: database,
database_client: DatabaseWithPeer::new(database, peer),
}
}
}
impl FrontendClient {
async fn update_frontend_addr(&self) -> Result<(), Error> {
// TODO(discord9): better error handling
let Self::Distributed {
meta_client,
database_clients,
} = self
else {
return Ok(());
async fn scan_for_frontend(&self) -> Result<Vec<(NodeInfoKey, NodeInfo)>, Error> {
let Self::Distributed { meta_client, .. } = self else {
return Ok(vec![]);
};
let cluster_client = meta_client
.cluster_client()
@@ -97,14 +85,13 @@ impl FrontendClient {
let cluster_id = meta_client.id().0;
let prefix = NodeInfoKey::key_prefix_with_role(cluster_id, Role::Frontend);
let req = RangeRequest::new().with_prefix(prefix);
let res = cluster_client
let resp = cluster_client
.range(req)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let mut clients = database_clients.lock().await;
for kv in res.kvs {
let mut res = Vec::with_capacity(resp.kvs.len());
for kv in resp.kvs {
let key = NodeInfoKey::try_from(kv.key)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
@@ -112,60 +99,49 @@ impl FrontendClient {
let val = NodeInfo::try_from(kv.value)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
res.push((key, val));
}
Ok(res)
}
if key.role != Role::Frontend {
return UnexpectedSnafu {
reason: format!(
"Unexpected role(should be frontend) in key: {:?}, val: {:?}",
key, val
),
}
.fail();
}
if let Some((peer, database)) = clients.clients.get_mut(&key.node_id) {
if val.peer != *peer {
// update only if not the same
*peer = val.peer.clone();
let client = Client::with_urls(vec![peer.addr.clone()]);
*database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
}
} else {
let peer = val.peer;
let client = Client::with_urls(vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
clients
.clients
.insert(key.node_id, (peer.clone(), database.clone()));
/// Get the database with max `last_activity_ts`
async fn get_last_active_frontend(&self) -> Result<DatabaseWithPeer, Error> {
if let Self::Standalone { database_client } = self {
return Ok(database_client.clone());
}
let frontends = self.scan_for_frontend().await?;
let mut last_activity_ts = i64::MIN;
let mut peer = None;
for (_key, val) in frontends.iter() {
if val.last_activity_ts > last_activity_ts {
last_activity_ts = val.last_activity_ts;
peer = Some(val.peer.clone());
}
}
drop(clients);
Ok(())
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client = Client::with_urls(vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
}
/// Get a database client, and possibly update it before returning.
async fn get_database_client(&self) -> Result<Database, Error> {
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
match self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed {
meta_client: _,
database_clients,
} => {
self.update_frontend_addr().await?;
database_clients
.lock()
.await
.get_next_client()
.context(UnexpectedSnafu {
reason: "Can't get any database client",
})
}
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
}
}
pub async fn sql(&self, sql: &str) -> Result<Output, Error> {
let db = self.get_database_client().await?;
db.sql(sql)
db.database
.sql(sql)
.await
.map_err(BoxedError::new)
.context(ExternalSnafu)

View File

@@ -246,6 +246,7 @@ impl FlownodeInstance {
self.server.shutdown().await.context(ShutdownServerSnafu)?;
if let Some(task) = &self.heartbeat_task {
info!("Close heartbeat task for flownode");
task.shutdown();
}