From 138a2aba7fe2193f2b2ffcc3ca1b45e247a24831 Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 9 May 2024 11:04:35 +0800 Subject: [PATCH] fix: allow empty expire when --- src/flow/src/adapter.rs | 7 +++++++ src/flow/src/adapter/parse_expr.rs | 2 +- src/flow/src/adapter/worker.rs | 1 + src/flow/src/compute/render/map.rs | 2 +- 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 1aee36f8ed..94ad7f8baa 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -671,6 +671,13 @@ impl FlownodeManager { // TODO(discord9): parse `expire_when` let expire_when = expire_when + .and_then(|s| { + if s.is_empty() || s.split_whitespace().join("").is_empty() { + None + } else { + Some(s) + } + }) .map(|d| { let d = d.as_ref(); parse_fixed(d) diff --git a/src/flow/src/adapter/parse_expr.rs b/src/flow/src/adapter/parse_expr.rs index 818e5150ff..acd2880584 100644 --- a/src/flow/src/adapter/parse_expr.rs +++ b/src/flow/src/adapter/parse_expr.rs @@ -123,7 +123,7 @@ fn parse_item(input: &str) -> IResult<&str, Expr> { Ok((r, Expr::Col(name.to_string()))) } else if let Ok((r, _now)) = parse_now(input) { Ok((r, Expr::Now)) - } else if let Ok((r, _num)) = parse_quality(input) { + } else if let Ok((_r, _num)) = parse_quality(input) { todo!() } else { todo!() diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index 124e96ec0a..0fbd9e6b68 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -239,6 +239,7 @@ impl<'s> Worker<'s> { expire_when: Option, create_if_not_exist: bool, ) -> Result, Error> { + let _ = expire_when; if create_if_not_exist { // check if the task already exists if self.task_states.contains_key(&task_id) { diff --git a/src/flow/src/compute/render/map.rs b/src/flow/src/compute/render/map.rs index 8be2b570ed..f94125c4a1 100644 --- a/src/flow/src/compute/render/map.rs +++ b/src/flow/src/compute/render/map.rs @@ -155,7 +155,7 @@ fn eval_mfp_core( ) -> Vec { let mut all_updates = Vec::new(); for (mut row, _sys_time, diff) in input.into_iter() { - // this updates is expected to be only zero to two rows + // this updates is expected to be only zero, one or two rows let updates = mfp_plan.evaluate::(&mut row.inner, now, diff); // TODO(discord9): refactor error handling // Expect error in a single row to not interrupt the whole evaluation