diff --git a/src/flow/src/recording_rules.rs b/src/flow/src/recording_rules.rs index 228c30c61e..37ea838d53 100644 --- a/src/flow/src/recording_rules.rs +++ b/src/flow/src/recording_rules.rs @@ -591,7 +591,22 @@ fn eval_ts_to_ts( df_schema: &DFSchema, input_value: Timestamp, ) -> Result { - let ts_vector = match input_value.unit() { + let schema_ty = df_schema.field(0).data_type(); + let schema_cdt = ConcreteDataType::from_arrow_type(schema_ty); + let schema_unit = if let ConcreteDataType::Timestamp(ts) = schema_cdt { + ts.unit() + } else { + return UnexpectedSnafu { + reason: format!("Expect Timestamp, found {:?}", schema_cdt), + } + .fail(); + }; + let input_value = input_value + .convert_to(schema_unit) + .with_context(|| UnexpectedSnafu { + reason: format!("Failed to convert timestamp {input_value:?} to {schema_unit}"), + })?; + let ts_vector = match schema_unit { TimeUnit::Second => { TimestampSecondVector::from_vec(vec![input_value.value()]).to_arrow_array() } diff --git a/src/flow/src/recording_rules/engine.rs b/src/flow/src/recording_rules/engine.rs index 3c5a3d12f4..2462c6ccc4 100644 --- a/src/flow/src/recording_rules/engine.rs +++ b/src/flow/src/recording_rules/engine.rs @@ -447,7 +447,7 @@ impl RecordingRuleTask { .gen_filter_exprs(&col_name, Some(l), window_size, self)? } _ => { - warn!( + debug!( "Flow id = {:?}, can't get window size: precise_lower_bound={expire_time_window_bound:?}, using the same query", self.flow_id ); // since no time window lower/upper bound is found, just return the original query