From 23a0a54e1805888701af26c7cdac91ae1b1d58bb Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 13 Mar 2025 16:30:03 +0800 Subject: [PATCH] fix: convert timestamp unit too --- src/flow/src/recording_rules.rs | 17 ++++++++++++++++- src/flow/src/recording_rules/engine.rs | 2 +- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/flow/src/recording_rules.rs b/src/flow/src/recording_rules.rs index 228c30c61e..37ea838d53 100644 --- a/src/flow/src/recording_rules.rs +++ b/src/flow/src/recording_rules.rs @@ -591,7 +591,22 @@ fn eval_ts_to_ts( df_schema: &DFSchema, input_value: Timestamp, ) -> Result { - let ts_vector = match input_value.unit() { + let schema_ty = df_schema.field(0).data_type(); + let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty); + let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt { + ts.unit() + } else { + return UnexpectedSnafu { + reason: format!("Expect Timestamp, found {:?}", schema_cdt), + } + .fail(); + }; + let input_value = input_value + .convert_to(schema_unit) + .with_context(|| UnexpectedSnafu { + reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"), + })?; + let ts_vector = match schema_unit { TimeUnit::Second => { TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array() } diff --git a/src/flow/src/recording_rules/engine.rs b/src/flow/src/recording_rules/engine.rs index 3c5a3d12f4..2462c6ccc4 100644 --- a/src/flow/src/recording_rules/engine.rs +++ b/src/flow/src/recording_rules/engine.rs @@ -447,7 +447,7 @@ impl RecordingRuleTask { .gen_filter_exprs(&col_name, Some(l), window_size, self)? } _ => { - warn!( + debug!( "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.flow_id ); // since no time window lower/upper bound is found, just return the original query