Compare commits

..

12 Commits

Author SHA1 Message Date
discord9
7f7b974e8a fix: conn pool leak & placeholder feature so ci can compile 2025-04-10 15:01:07 +08:00
discord9
4875ace0d0 fix: placeholder feature so ci can compile 2025-04-08 14:37:55 +08:00
discord9
a847d96649 fix: time window filter expr use OR 2025-04-07 16:50:17 +08:00
discord9
23a0a54e18 fix: convert timestamp unit too 2025-04-07 16:50:17 +08:00
discord9
78eb8b53f6 fix: quote&more info when time window too many
chore: even more warning

fix: filter first warn later
2025-04-07 16:50:17 +08:00
discord9
2455f39e8e fix: subquery&cte time window expr 2025-04-07 16:46:53 +08:00
discord9
7fe0074202 refactor: even finer&limit time window num 2025-04-07 16:46:53 +08:00
discord9
e16bc203d0 feat: basic time window aware 2025-04-07 16:46:53 +08:00
discord9
9a3c26bb0a metrics: better bucket&longer timeout 2025-04-07 16:46:53 +08:00
discord9
e1ff398c32 fix: timeout 2025-04-07 16:46:53 +08:00
discord9
780e3000de fix: heartbeat&expire_after unit 2025-04-07 16:46:53 +08:00
discord9
2b5ddf8427 feat: time window in df plan
WIP

test: found out time window expr

chore: pub

tests: also unparsed

tests: rm dup code

feat: frontend client for recording rule

fix: bound edgecase

WIP

WIP

feat: rule engine

feat: add init options& tmp rerounte to rule

fix: dist client get

fix: also not handle mirror write in flownode

chore: clippy
2025-04-07 16:46:47 +08:00
6 changed files with 222 additions and 71 deletions

View File

@@ -445,10 +445,16 @@ impl Pool {
async fn recycle_channel_in_loop(pool: Arc<Pool>, interval_secs: u64) {
let mut interval = tokio::time::interval(Duration::from_secs(interval_secs));
// use weak ref here to prevent pool being leaked
let pool_weak = Arc::downgrade(&pool);
loop {
let _ = interval.tick().await;
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
if let Some(pool) = pool_weak.upgrade() {
pool.retain_channel(|_, c| c.access.swap(0, Ordering::Relaxed) != 0)
} else {
// no one is using this pool, so we can also let go
break;
}
}
}

View File

@@ -60,15 +60,13 @@ use crate::Error;
#[derive(Debug, Clone)]
pub struct TimeWindowExpr {
expr: PhysicalExprRef,
phy_expr: PhysicalExprRef,
column_name: String,
logical_expr: Expr,
df_schema: DFSchema,
}
impl TimeWindowExpr {
pub fn new(expr: PhysicalExprRef, column_name: String) -> Self {
Self { expr, column_name }
}
pub fn from_expr(expr: &Expr, column_name: &str, df_schema: &DFSchema) -> Result<Self, Error> {
let phy_planner = DefaultPhysicalPlanner::default();
@@ -80,11 +78,24 @@ impl TimeWindowExpr {
),
})?;
Ok(Self {
expr: phy_expr,
phy_expr,
column_name: column_name.to_string(),
logical_expr: expr.clone(),
df_schema: df_schema.clone(),
})
}
pub fn eval(
&self,
current: Timestamp,
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
let lower_bound =
find_expr_time_window_lower_bound(&self.logical_expr, &self.df_schema, current)?;
let upper_bound =
find_expr_time_window_upper_bound(&self.logical_expr, &self.df_schema, current)?;
Ok((lower_bound, upper_bound))
}
/// Find timestamps from rows using time window expr
pub async fn handle_rows(
&self,
@@ -131,12 +142,15 @@ impl TimeWindowExpr {
),
})?;
let eval_res = self.expr.evaluate(&rb).with_context(|_| DatafusionSnafu {
context: format!(
"Failed to evaluate physical expression {:?} on {rb:?}",
self.expr
),
})?;
let eval_res = self
.phy_expr
.evaluate(&rb)
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to evaluate physical expression {:?} on {rb:?}",
self.phy_expr
),
})?;
let res = columnar_to_ts_vector(&eval_res)?;
@@ -447,6 +461,7 @@ pub async fn find_plan_time_window_bound(
///
/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// of current time window given the current timestamp
///
/// if return None, meaning this time window have no lower bound
@@ -576,7 +591,22 @@ fn eval_ts_to_ts(
df_schema: &DFSchema,
input_value: Timestamp,
) -> Result<Timestamp, Error> {
let ts_vector = match input_value.unit() {
let schema_ty = df_schema.field(0).data_type();
let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty);
let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt {
ts.unit()
} else {
return UnexpectedSnafu {
reason: format!("Expect Timestamp, found {:?}", schema_cdt),
}
.fail();
};
let input_value = input_value
.convert_to(schema_unit)
.with_context(|| UnexpectedSnafu {
reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"),
})?;
let ts_vector = match schema_unit {
TimeUnit::Second => {
TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array()
}
@@ -656,7 +686,19 @@ impl TreeNodeRewriter for AddFilterRewriter {
}
fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
let unparser = Unparser::default();
/// A dialect that forces all identifiers to be quoted
struct ForceQuoteIdentifiers;
impl datafusion::sql::unparser::dialect::Dialect for ForceQuoteIdentifiers {
fn identifier_quote_style(&self, identifier: &str) -> Option<char> {
if identifier.to_lowercase() != identifier {
Some('"')
} else {
None
}
}
}
let unparser = Unparser::new(&ForceQuoteIdentifiers);
// first make all column qualified
let sql = unparser
.plan_to_sql(plan)
.with_context(|_e| DatafusionSnafu {
@@ -675,6 +717,22 @@ mod test {
use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter};
use crate::test_utils::create_test_query_engine;
#[tokio::test]
async fn test_sql_plan_convert() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let old = r#"SELECT "NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#;
let new = sql_to_df_plan(ctx.clone(), query_engine.clone(), old, false)
.await
.unwrap();
let new_sql = df_plan_to_sql(&new).unwrap();
assert_eq!(
r#"SELECT "UPPERCASE_NUMBERS_WITH_TS"."NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#,
new_sql
);
}
#[tokio::test]
async fn test_add_filter() {
let testcases = vec![
@@ -723,7 +781,7 @@ mod test {
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"
r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#
),
// complex time window index
(

View File

@@ -45,7 +45,7 @@ use crate::error::{
TimeSnafu, UnexpectedSnafu,
};
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
use crate::recording_rules::{find_plan_time_window_bound, find_time_window_expr, sql_to_df_plan};
use crate::recording_rules::{find_time_window_expr, sql_to_df_plan};
use crate::Error;
/// TODO(discord9): make those constants configurable
@@ -214,6 +214,8 @@ impl RecordingRuleEngine {
.map(|expr| TimeWindowExpr::from_expr(&expr, &column_name, &df_schema))
.transpose()?;
info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr);
let task = RecordingRuleTask::new(
flow_id,
&sql,
@@ -268,11 +270,11 @@ impl RecordingRuleEngine {
#[derive(Debug, Clone)]
pub struct RecordingRuleTask {
flow_id: FlowId,
pub flow_id: FlowId,
query: String,
time_window_expr: Option<TimeWindowExpr>,
pub time_window_expr: Option<TimeWindowExpr>,
/// in seconds
expire_after: Option<i64>,
pub expire_after: Option<i64>,
sink_table_name: [String; 3],
source_table_names: HashSet<[String; 3]>,
state: Arc<RwLock<RecordingRuleState>>,
@@ -376,6 +378,8 @@ impl RecordingRuleTask {
.write()
.await
.after_query_exec(elapsed, res.is_ok());
// drop the result to free client-related resources
drop(res);
let sleep_until = {
let mut state = self.state.write().await;
@@ -410,30 +414,40 @@ impl RecordingRuleTask {
let low_bound = Timestamp::new_second(low_bound as i64);
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
// TODO(discord9): find more time window!
let (col_name, lower, upper) =
find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone())
.await?;
// TODO(discord9): use time window expr to get the precise expire lower bound
let expire_time_window_bound = self
.time_window_expr
.as_ref()
.map(|expr| expr.eval(low_bound))
.transpose()?;
let new_sql = {
let expr = {
match (lower, upper) {
(Some(l), Some(u)) => {
match expire_time_window_bound {
Some((Some(l), Some(u))) => {
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
reason: format!("Can't get window size from {u:?} - {l:?}"),
})?;
let col_name = self
.time_window_expr
.as_ref()
.map(|expr| expr.column_name.clone())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Flow id={:?}, Failed to get column name from time window expr",
self.flow_id
),
})?;
self.state
.write()
.await
.dirty_time_windows
.gen_filter_exprs(&col_name, lower, window_size)?
.gen_filter_exprs(&col_name, Some(l), window_size, self)?
}
_ => {
warn!(
"Flow id = {:?}, can't get window size: lower={lower:?}, upper={upper:?}, using the same query", self.flow_id
debug!(
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.flow_id
);
// since no time window lower/upper bound is found, just return the original query
return Ok(Some(self.query.clone()));
@@ -530,19 +544,28 @@ impl DirtyTimeWindows {
col_name: &str,
expire_lower_bound: Option<Timestamp>,
window_size: chrono::Duration,
task_ctx: &RecordingRuleTask,
) -> Result<Option<datafusion_expr::Expr>, Error> {
debug!(
"expire_lower_bound: {:?}, window_size: {:?}",
expire_lower_bound.map(|t| t.to_iso8601_string()),
window_size
);
self.merge_dirty_time_windows(window_size)?;
self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
if self.windows.len() > Self::MAX_FILTER_NUM {
let first_time_window = self.windows.first_key_value();
let last_time_window = self.windows.last_key_value();
warn!(
"Too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong",
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
task_ctx.flow_id,
self.windows.len(),
Self::MAX_FILTER_NUM
Self::MAX_FILTER_NUM,
task_ctx.time_window_expr,
task_ctx.expire_after,
first_time_window,
last_time_window,
task_ctx.query
);
}
@@ -570,18 +593,6 @@ impl DirtyTimeWindows {
start.to_iso8601_string(),
end.map(|t| t.to_iso8601_string())
);
let start = if let Some(expire) = expire_lower_bound {
start.max(expire)
} else {
start
};
// filter out expired time window
if let Some(end) = end {
if end <= start {
continue;
}
}
use datafusion_expr::{col, lit};
let lower = to_df_literal(start)?;
@@ -600,11 +611,22 @@ impl DirtyTimeWindows {
}
/// Merge time windows that overlaps or get too close
pub fn merge_dirty_time_windows(&mut self, window_size: chrono::Duration) -> Result<(), Error> {
pub fn merge_dirty_time_windows(
&mut self,
window_size: chrono::Duration,
expire_lower_bound: Option<Timestamp>,
) -> Result<(), Error> {
let mut new_windows = BTreeMap::new();
let mut prev_tw = None;
for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
// filter out expired time window
if let Some(expire_lower_bound) = expire_lower_bound {
if lower_bound <= expire_lower_bound {
continue;
}
}
let Some(prev_tw) = &mut prev_tw else {
prev_tw = Some((lower_bound, upper_bound));
continue;
@@ -705,7 +727,7 @@ mod test {
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
@@ -728,7 +750,7 @@ mod test {
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
@@ -757,7 +779,7 @@ mod test {
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
@@ -769,5 +791,25 @@ mod test {
),]),
dirty.windows
);
// expired
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(
chrono::Duration::seconds(5 * 60),
Some(Timestamp::new_second(
(DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
)),
)
.unwrap();
// just enough to merge
assert_eq!(BTreeMap::from([]), dirty.windows);
}
}

View File

@@ -43,6 +43,7 @@ fn client_from_urls(addrs: Vec<String>) -> Client {
pub enum FrontendClient {
Distributed {
meta_client: Arc<MetaClient>,
channel_mgr: ChannelManager,
},
Standalone {
/// for the sake of simplicity still use grpc even in standalone mode
@@ -66,7 +67,10 @@ impl DatabaseWithPeer {
impl FrontendClient {
pub fn from_meta_client(meta_client: Arc<MetaClient>) -> Self {
Self::Distributed { meta_client }
Self::Distributed {
meta_client,
channel_mgr: default_channel_mgr(),
}
}
pub fn from_static_grpc_addr(addr: String) -> Self {
@@ -75,7 +79,8 @@ impl FrontendClient {
addr: addr.clone(),
};
let client = client_from_urls(vec![addr]);
let mgr = default_channel_mgr();
let client = Client::with_manager_and_urls(mgr.clone(), vec![addr]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Self::Standalone {
database_client: DatabaseWithPeer::new(database, peer),
@@ -119,32 +124,40 @@ impl FrontendClient {
if let Self::Standalone { database_client } = self {
return Ok(database_client.clone());
}
let frontends = self.scan_for_frontend().await?;
let mut last_activity_ts = i64::MIN;
let mut peer = None;
for (_key, val) in frontends.iter() {
if val.last_activity_ts > last_activity_ts {
last_activity_ts = val.last_activity_ts;
peer = Some(val.peer.clone());
match &self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed {
meta_client: _,
channel_mgr,
} => {
let frontends = self.scan_for_frontend().await?;
let mut last_activity_ts = i64::MIN;
let mut peer = None;
for (_key, val) in frontends.iter() {
if val.last_activity_ts > last_activity_ts {
last_activity_ts = val.last_activity_ts;
peer = Some(val.peer.clone());
}
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client =
Client::with_manager_and_urls(channel_mgr.clone(), vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
}
}
let Some(peer) = peer else {
UnexpectedSnafu {
reason: format!("No frontend available: {:?}", frontends),
}
.fail()?
};
let client = client_from_urls(vec![peer.addr.clone()]);
let database = Database::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, client);
Ok(DatabaseWithPeer::new(database, peer))
}
/// Get a database client, and possibly update it before returning.
pub async fn get_database_client(&self) -> Result<DatabaseWithPeer, Error> {
match self {
Self::Standalone { database_client } => Ok(database_client.clone()),
Self::Distributed { meta_client: _ } => self.get_last_active_frontend().await,
Self::Distributed { meta_client: _, .. } => self.get_last_active_frontend().await,
}
}
}

View File

@@ -115,6 +115,37 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let schema = vec![
datatypes::schema::ColumnSchema::new("NUMBER", CDT::uint32_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
];
let mut columns = vec![];
let numbers = (1..=10).collect_vec();
let column: VectorRef = Arc::new(<u32 as Scalar>::VectorType::from_vec(numbers));
columns.push(column);
let ts = (1..=10).collect_vec();
let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10);
ts.into_iter()
.map(|v| builder.push(Some(TimestampMillisecond::new(v))))
.count();
let column: VectorRef = builder.to_vector_cloned();
columns.push(column);
let schema = Arc::new(Schema::new(schema));
let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap();
let table = MemTable::table("UPPERCASE_NUMBERS_WITH_TS", recordbatch);
let req_with_ts = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "UPPERCASE_NUMBERS_WITH_TS".to_string(),
table_id: 1025,
table,
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();

View File

@@ -7,6 +7,7 @@ license.workspace = true
[features]
mock = []
pg_kvbackend = ["dep:tokio-postgres", "common-meta/pg_kvbackend"]
mysql_kvbackend = [] # placeholder features so CI can compile
[lints]
workspace = true