diff --git a/src/flow/src/compute/render/reduce.rs b/src/flow/src/compute/render/reduce.rs index f0003fe258..27287f0d77 100644 --- a/src/flow/src/compute/render/reduce.rs +++ b/src/flow/src/compute/render/reduce.rs @@ -395,17 +395,8 @@ fn reduce_accum_subgraph( None => continue, } }; - let (accums, old_ts, _) = arrange.get(now, &key).unwrap_or_default(); - if !accums.is_empty() { - info!("Get old_ts as: {old_ts}"); - // first explictly remove old key(because arrange use pk as only pk, but - // database use pk+ts as true pk for dedup, so we need to manualy remove old values first) - let mut del_key_val = key.clone(); - // adding null as values so the output length are correct - del_key_val.extend(null_vals.clone()); + let (accums, _, _) = arrange.get(now, &key).unwrap_or_default(); - all_outputs.push((del_key_val, now, -1)); - } let accums = accums.inner; // deser accums from offsets diff --git a/src/flow/src/transform.rs b/src/flow/src/transform.rs index 95eae6a52c..4c2a9678d4 100644 --- a/src/flow/src/transform.rs +++ b/src/flow/src/transform.rs @@ -143,13 +143,18 @@ mod test { pub fn create_test_ctx() -> FlowNodeContext { let gid = GlobalId::User(0); - let name = ["".to_string(), "".to_string(), "numbers".to_string()]; + let name = [ + "greptime".to_string(), + "public".to_string(), + "numbers".to_string(), + ]; let schema = RelationType::new(vec![ColumnType::new(CDT::uint32_datatype(), false)]); let mut tri_map = TriMap::new(); tri_map.insert(Some(name.clone()), Some(0), gid); FlowNodeContext { schema: HashMap::from([(gid, schema)]), table_repr: tri_map, + query_context: Some(QueryContext::with("greptime", "public")), ..Default::default() } }