mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
fix: allow empty expire when
This commit is contained in:
@@ -671,6 +671,13 @@ impl FlownodeManager {
|
||||
|
||||
// TODO(discord9): parse `expire_when`
|
||||
let expire_when = expire_when
|
||||
.and_then(|s| {
|
||||
if s.is_empty() || s.split_whitespace().join("").is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(s)
|
||||
}
|
||||
})
|
||||
.map(|d| {
|
||||
let d = d.as_ref();
|
||||
parse_fixed(d)
|
||||
|
||||
@@ -123,7 +123,7 @@ fn parse_item(input: &str) -> IResult<&str, Expr> {
|
||||
Ok((r, Expr::Col(name.to_string())))
|
||||
} else if let Ok((r, _now)) = parse_now(input) {
|
||||
Ok((r, Expr::Now))
|
||||
} else if let Ok((r, _num)) = parse_quality(input) {
|
||||
} else if let Ok((_r, _num)) = parse_quality(input) {
|
||||
todo!()
|
||||
} else {
|
||||
todo!()
|
||||
|
||||
@@ -239,6 +239,7 @@ impl<'s> Worker<'s> {
|
||||
expire_when: Option<repr::Duration>,
|
||||
create_if_not_exist: bool,
|
||||
) -> Result<Option<FlowId>, Error> {
|
||||
let _ = expire_when;
|
||||
if create_if_not_exist {
|
||||
// check if the task already exists
|
||||
if self.task_states.contains_key(&task_id) {
|
||||
|
||||
@@ -155,7 +155,7 @@ fn eval_mfp_core(
|
||||
) -> Vec<KeyValDiffRow> {
|
||||
let mut all_updates = Vec::new();
|
||||
for (mut row, _sys_time, diff) in input.into_iter() {
|
||||
// this updates is expected to be only zero to two rows
|
||||
// this updates is expected to be only zero, one or two rows
|
||||
let updates = mfp_plan.evaluate::<EvalError>(&mut row.inner, now, diff);
|
||||
// TODO(discord9): refactor error handling
|
||||
// Expect error in a single row to not interrupt the whole evaluation
|
||||
|
||||
Reference in New Issue
Block a user