fix: quote&more info when time window too many

chore: even more warning

fix: filter first warn later
This commit is contained in:
discord9
2025-03-13 14:15:29 +08:00
parent 07c0f1d546
commit f04f16cec3
3 changed files with 161 additions and 47 deletions

View File

@@ -60,15 +60,13 @@ use crate::Error;
#[derive(Debug, Clone)]
pub struct TimeWindowExpr {
expr: PhysicalExprRef,
phy_expr: PhysicalExprRef,
column_name: String,
logical_expr: Expr,
df_schema: DFSchema,
}
impl TimeWindowExpr {
pub fn new(expr: PhysicalExprRef, column_name: String) -> Self {
Self { expr, column_name }
}
pub fn from_expr(expr: &Expr, column_name: &str, df_schema: &DFSchema) -> Result<Self, Error> {
let phy_planner = DefaultPhysicalPlanner::default();
@@ -80,11 +78,24 @@ impl TimeWindowExpr {
),
})?;
Ok(Self {
expr: phy_expr,
phy_expr,
column_name: column_name.to_string(),
logical_expr: expr.clone(),
df_schema: df_schema.clone(),
})
}
pub fn eval(
&self,
current: Timestamp,
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
let lower_bound =
find_expr_time_window_lower_bound(&self.logical_expr, &self.df_schema, current)?;
let upper_bound =
find_expr_time_window_upper_bound(&self.logical_expr, &self.df_schema, current)?;
Ok((lower_bound, upper_bound))
}
/// Find timestamps from rows using time window expr
pub async fn handle_rows(
&self,
@@ -131,12 +142,15 @@ impl TimeWindowExpr {
),
})?;
let eval_res = self.expr.evaluate(&rb).with_context(|_| DatafusionSnafu {
context: format!(
"Failed to evaluate physical expression {:?} on {rb:?}",
self.expr
),
})?;
let eval_res = self
.phy_expr
.evaluate(&rb)
.with_context(|_| DatafusionSnafu {
context: format!(
"Failed to evaluate physical expression {:?} on {rb:?}",
self.phy_expr
),
})?;
let res = columnar_to_ts_vector(&eval_res)?;
@@ -447,6 +461,7 @@ pub async fn find_plan_time_window_bound(
///
/// i.e. for `current="2021-07-01 00:01:01.000"` and `expr=date_bin(INTERVAL '5 minutes', ts) as time_window` and `ts_col=ts`,
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// return `Some("2021-07-01 00:00:00.000")` since it's the lower bound
/// of current time window given the current timestamp
///
/// if return None, meaning this time window have no lower bound
@@ -656,7 +671,19 @@ impl TreeNodeRewriter for AddFilterRewriter {
}
fn df_plan_to_sql(plan: &LogicalPlan) -> Result<String, Error> {
let unparser = Unparser::default();
/// A dialect that forces all identifiers to be quoted
struct ForceQuoteIdentifiers;
impl datafusion::sql::unparser::dialect::Dialect for ForceQuoteIdentifiers {
fn identifier_quote_style(&self, identifier: &str) -> Option<char> {
if identifier.to_lowercase() != identifier {
Some('"')
} else {
None
}
}
}
let unparser = Unparser::new(&ForceQuoteIdentifiers);
// first make all column qualified
let sql = unparser
.plan_to_sql(plan)
.with_context(|_e| DatafusionSnafu {
@@ -675,6 +702,22 @@ mod test {
use crate::recording_rules::{df_plan_to_sql, AddFilterRewriter};
use crate::test_utils::create_test_query_engine;
#[tokio::test]
async fn test_sql_plan_convert() {
let query_engine = create_test_query_engine();
let ctx = QueryContext::arc();
let old = r#"SELECT "NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#;
let new = sql_to_df_plan(ctx.clone(), query_engine.clone(), old, false)
.await
.unwrap();
let new_sql = df_plan_to_sql(&new).unwrap();
assert_eq!(
r#"SELECT "UPPERCASE_NUMBERS_WITH_TS"."NUMBER" FROM "UPPERCASE_NUMBERS_WITH_TS""#,
new_sql
);
}
#[tokio::test]
async fn test_add_filter() {
let testcases = vec![
@@ -723,7 +766,7 @@ mod test {
Some(Timestamp::new(1740394109000, TimeUnit::Millisecond)),
Some(Timestamp::new(1740394109001, TimeUnit::Millisecond)),
),
"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"
r#"SELECT arrow_cast(date_bin(INTERVAL '1 MINS', numbers_with_ts.ts), 'Timestamp(Second, None)') AS ts FROM numbers_with_ts WHERE ((ts >= CAST('2025-02-24 10:48:29' AS TIMESTAMP)) AND (ts <= CAST('2025-02-24 10:48:29.001' AS TIMESTAMP))) GROUP BY numbers_with_ts.ts"#
),
// complex time window index
(

View File

@@ -45,7 +45,7 @@ use crate::error::{
TimeSnafu, UnexpectedSnafu,
};
use crate::metrics::{METRIC_FLOW_RULE_ENGINE_QUERY_TIME, METRIC_FLOW_RULE_ENGINE_SLOW_QUERY};
use crate::recording_rules::{find_plan_time_window_bound, find_time_window_expr, sql_to_df_plan};
use crate::recording_rules::{find_time_window_expr, sql_to_df_plan};
use crate::Error;
/// TODO(discord9): make those constants configurable
@@ -214,6 +214,8 @@ impl RecordingRuleEngine {
.map(|expr| TimeWindowExpr::from_expr(&expr, &column_name, &df_schema))
.transpose()?;
info!("Flow id={}, found time window expr={:?}", flow_id, phy_expr);
let task = RecordingRuleTask::new(
flow_id,
&sql,
@@ -268,11 +270,11 @@ impl RecordingRuleEngine {
#[derive(Debug, Clone)]
pub struct RecordingRuleTask {
flow_id: FlowId,
pub flow_id: FlowId,
query: String,
time_window_expr: Option<TimeWindowExpr>,
pub time_window_expr: Option<TimeWindowExpr>,
/// in seconds
expire_after: Option<i64>,
pub expire_after: Option<i64>,
sink_table_name: [String; 3],
source_table_names: HashSet<[String; 3]>,
state: Arc<RwLock<RecordingRuleState>>,
@@ -413,30 +415,40 @@ impl RecordingRuleTask {
let low_bound = Timestamp::new_second(low_bound as i64);
let plan = sql_to_df_plan(query_ctx.clone(), engine.clone(), &self.query, true).await?;
// TODO(discord9): find more time window!
let (col_name, lower, upper) =
find_plan_time_window_bound(&plan, low_bound, query_ctx.clone(), engine.clone())
.await?;
// TODO(discord9): use time window expr to get the precise expire lower bound
let expire_time_window_bound = self
.time_window_expr
.as_ref()
.map(|expr| expr.eval(low_bound))
.transpose()?;
let new_sql = {
let expr = {
match (lower, upper) {
(Some(l), Some(u)) => {
match expire_time_window_bound {
Some((Some(l), Some(u))) => {
let window_size = u.sub(&l).with_context(|| UnexpectedSnafu {
reason: format!("Can't get window size from {u:?} - {l:?}"),
})?;
let col_name = self
.time_window_expr
.as_ref()
.map(|expr| expr.column_name.clone())
.with_context(|| UnexpectedSnafu {
reason: format!(
"Flow id={:?}, Failed to get column name from time window expr",
self.flow_id
),
})?;
self.state
.write()
.await
.dirty_time_windows
.gen_filter_exprs(&col_name, lower, window_size)?
.gen_filter_exprs(&col_name, Some(l), window_size, self)?
}
_ => {
warn!(
"Flow id = {:?}, can't get window size: lower={lower:?}, upper={upper:?}, using the same query", self.flow_id
"Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.flow_id
);
// since no time window lower/upper bound is found, just return the original query
return Ok(Some(self.query.clone()));
@@ -533,19 +545,28 @@ impl DirtyTimeWindows {
col_name: &str,
expire_lower_bound: Option<Timestamp>,
window_size: chrono::Duration,
task_ctx: &RecordingRuleTask,
) -> Result<Option<datafusion_expr::Expr>, Error> {
debug!(
"expire_lower_bound: {:?}, window_size: {:?}",
expire_lower_bound.map(|t| t.to_iso8601_string()),
window_size
);
self.merge_dirty_time_windows(window_size)?;
self.merge_dirty_time_windows(window_size, expire_lower_bound)?;
if self.windows.len() > Self::MAX_FILTER_NUM {
let first_time_window = self.windows.first_key_value();
let last_time_window = self.windows.last_key_value();
warn!(
"Too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong",
"Flow id = {:?}, too many time windows: {}, only the first {} are taken for this query, the group by expression might be wrong. Time window expr={:?}, expire_after={:?}, first_time_window={:?}, last_time_window={:?}, the original query: {:?}",
task_ctx.flow_id,
self.windows.len(),
Self::MAX_FILTER_NUM
Self::MAX_FILTER_NUM,
task_ctx.time_window_expr,
task_ctx.expire_after,
first_time_window,
last_time_window,
task_ctx.query
);
}
@@ -573,18 +594,6 @@ impl DirtyTimeWindows {
start.to_iso8601_string(),
end.map(|t| t.to_iso8601_string())
);
let start = if let Some(expire) = expire_lower_bound {
start.max(expire)
} else {
start
};
// filter out expired time window
if let Some(end) = end {
if end <= start {
continue;
}
}
use datafusion_expr::{col, lit};
let lower = to_df_literal(start)?;
@@ -603,11 +612,22 @@ impl DirtyTimeWindows {
}
/// Merge time windows that overlaps or get too close
pub fn merge_dirty_time_windows(&mut self, window_size: chrono::Duration) -> Result<(), Error> {
pub fn merge_dirty_time_windows(
&mut self,
window_size: chrono::Duration,
expire_lower_bound: Option<Timestamp>,
) -> Result<(), Error> {
let mut new_windows = BTreeMap::new();
let mut prev_tw = None;
for (lower_bound, upper_bound) in std::mem::take(&mut self.windows) {
// filter out expired time window
if let Some(expire_lower_bound) = expire_lower_bound {
if lower_bound <= expire_lower_bound {
continue;
}
}
let Some(prev_tw) = &mut prev_tw else {
prev_tw = Some((lower_bound, upper_bound));
continue;
@@ -708,7 +728,7 @@ mod test {
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
@@ -731,7 +751,7 @@ mod test {
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
@@ -760,7 +780,7 @@ mod test {
.into_iter(),
);
dirty
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60))
.merge_dirty_time_windows(chrono::Duration::seconds(5 * 60), None)
.unwrap();
// just enough to merge
assert_eq!(
@@ -772,5 +792,25 @@ mod test {
),]),
dirty.windows
);
// expired
let mut dirty = DirtyTimeWindows::default();
dirty.add_lower_bounds(
vec![
Timestamp::new_second(0),
Timestamp::new_second((DirtyTimeWindows::MERGE_DIST as i64) * 5 * 60),
]
.into_iter(),
);
dirty
.merge_dirty_time_windows(
chrono::Duration::seconds(5 * 60),
Some(Timestamp::new_second(
(DirtyTimeWindows::MERGE_DIST as i64) * 6 * 60,
)),
)
.unwrap();
// just enough to merge
assert_eq!(BTreeMap::from([]), dirty.windows);
}
}

View File

@@ -115,6 +115,37 @@ pub fn create_test_query_engine() -> Arc<dyn QueryEngine> {
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let schema = vec![
datatypes::schema::ColumnSchema::new("NUMBER", CDT::uint32_datatype(), false),
datatypes::schema::ColumnSchema::new("ts", CDT::timestamp_millisecond_datatype(), false)
.with_time_index(true),
];
let mut columns = vec![];
let numbers = (1..=10).collect_vec();
let column: VectorRef = Arc::new(<u32 as Scalar>::VectorType::from_vec(numbers));
columns.push(column);
let ts = (1..=10).collect_vec();
let mut builder = TimestampMillisecondVectorBuilder::with_capacity(10);
ts.into_iter()
.map(|v| builder.push(Some(TimestampMillisecond::new(v))))
.count();
let column: VectorRef = builder.to_vector_cloned();
columns.push(column);
let schema = Arc::new(Schema::new(schema));
let recordbatch = common_recordbatch::RecordBatch::new(schema, columns).unwrap();
let table = MemTable::table("UPPERCASE_NUMBERS_WITH_TS", recordbatch);
let req_with_ts = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: "UPPERCASE_NUMBERS_WITH_TS".to_string(),
table_id: 1025,
table,
};
catalog_list.register_table_sync(req_with_ts).unwrap();
let factory = query::QueryEngineFactory::new(catalog_list, None, None, None, None, false);
let engine = factory.query_engine();