From 8f6462c0b04bf254b84eb74f574fb10a49bf3c0b Mon Sep 17 00:00:00 2001 From: discord9 Date: Wed, 8 May 2024 20:39:46 +0800 Subject: [PATCH] feat: parse expire when --- src/flow/src/adapter.rs | 14 +++- src/flow/src/adapter/parse_expr.rs | 117 +++++++++++++++++++++++------ src/flow/src/adapter/worker.rs | 8 ++ 3 files changed, 116 insertions(+), 23 deletions(-) diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 13e6135818..1aee36f8ed 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -50,6 +50,7 @@ use tokio::task::LocalSet; use crate::adapter::error::{ EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu, }; +use crate::adapter::parse_expr::{parse_duration, parse_fixed}; use crate::adapter::util::column_schemas_to_proto; use crate::adapter::worker::{create_worker, Worker, WorkerHandle}; use crate::compute::{Context, DataflowState, ErrCollector}; @@ -507,7 +508,7 @@ impl FlownodeManager { (wout_ts, with_ts) }; - let proto_schema_wout_ts = column_schemas_to_proto(schema_wout_ts, &primary_keys)?; + let _proto_schema_wout_ts = column_schemas_to_proto(schema_wout_ts, &primary_keys)?; let proto_schema_with_ts = column_schemas_to_proto(with_ts, &primary_keys)?; @@ -669,7 +670,15 @@ impl FlownodeManager { node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?; // TODO(discord9): parse `expire_when` - let _ = expire_when; + let expire_when = expire_when + .map(|d| { + let d = d.as_ref(); + parse_fixed(d) + .map(|(_, n)| n) + .map_err(|err| err.to_string()) + }) + .transpose() + .map_err(|err| UnexpectedSnafu { reason: err }.build())?; let _ = comment; let _ = flow_options; @@ -695,6 +704,7 @@ impl FlownodeManager { sink_sender, &source_ids, source_senders, + expire_when, create_if_not_exist, ) .await?; diff --git a/src/flow/src/adapter/parse_expr.rs b/src/flow/src/adapter/parse_expr.rs index efbf3b54c5..818e5150ff 100644 --- a/src/flow/src/adapter/parse_expr.rs +++ b/src/flow/src/adapter/parse_expr.rs @@ -12,18 +12,74 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! parse expr like "ts <= now() - interval '5 min'" +//! parse expr like "ts <= now() - interval '5 m'" use nom::branch::alt; use nom::bytes::complete::{tag, tag_no_case}; -use nom::character::complete::{alphanumeric1, digit0, multispace0}; +use nom::character::complete::{alphanumeric1, digit0, multispace0, multispace1}; use nom::combinator::peek; use nom::sequence::tuple; -use nom::IResult; +use nom::{multi, IResult}; use crate::expr::ScalarExpr; use crate::repr; +#[test] +fn test_parse_duration() { + let input = "1 h 5 m 42 second"; + let (remain, ttl) = parse_duration(input).unwrap(); + assert_eq!(remain, ""); + assert_eq!(ttl, (3600 + 5 * 60 + 42) * 1000); +} + +#[test] +fn test_parse_fixed() { + let input = "timestamp < now() - INTERVAL '5m 42s'"; + let (remain, ttl) = parse_fixed(input).unwrap(); + assert_eq!(remain, ""); + assert_eq!(ttl, (5 * 60 + 42) * 1000); +} + +pub fn parse_fixed(input: &str) -> IResult<&str, i64> { + let (r, _) = tuple(( + multispace0, + tag_no_case("timestamp"), + multispace0, + tag("<"), + multispace0, + tag_no_case("now()"), + multispace0, + tag("-"), + multispace0, + tag_no_case("interval"), + multispace0, + ))(input)?; + tuple((tag("'"), parse_duration, tag("'")))(r).map(|(r, (_, ttl, _))| (r, ttl)) +} + +/// parse duration and return ttl, currently only support time part of psql interval type +pub fn parse_duration(input: &str) -> IResult<&str, i64> { + let mut intervals = vec![]; + let mut remain = input; + while peek(parse_quality)(remain).is_ok() { + let (r, number) = parse_quality(remain)?; + let (r, unit) = parse_time_unit(r)?; + intervals.push((number, unit)); + remain = r; + } + let mut total = 0; + for (number, unit) in intervals { + let number = match unit { + TimeUnit::Second => number, + TimeUnit::Minute => number * 60, + TimeUnit::Hour => number * 60 * 60, + }; + total += number; + } + total *= 1000; + Ok((remain, total)) +} + enum Expr { Col(String), Now, @@ -68,7 +124,6 @@ fn parse_item(input: &str) -> IResult<&str, Expr> { } else if let Ok((r, _now)) = parse_now(input) { Ok((r, Expr::Now)) } else if let Ok((r, _num)) = parse_quality(input) { - let _ = parse_unit(r)?; todo!() } else { todo!() @@ -112,7 +167,7 @@ fn parse_cmp(input: &str) -> IResult<&str, &str> { } /// parse a number with optional sign -fn parse_quality(input: &str) -> IResult<&str, isize> { +fn parse_quality(input: &str) -> IResult<&str, repr::Duration> { tuple(( multispace0, alt((tag("+"), tag("-"), tag(""))), @@ -121,7 +176,7 @@ fn parse_quality(input: &str) -> IResult<&str, isize> { ))(input) .map(|(r, (_, sign, name, _))| (r, sign, name)) .and_then(|(r, sign, name)| { - let num = name.parse::().map_err(|_| { + let num = name.parse::().map_err(|_| { nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit)) })?; let num = match sign { @@ -133,37 +188,57 @@ fn parse_quality(input: &str) -> IResult<&str, isize> { }) } +#[derive(Debug, Clone)] enum TimeUnit { Second, Minute, Hour, +} + +#[derive(Debug, Clone)] +enum DateUnit { Day, Month, Year, } -fn parse_unit(input: &str) -> IResult<&str, &str> { - tuple(( - multispace0, +fn parse_time_unit(input: &str) -> IResult<&str, TimeUnit> { + fn to_second(input: &str) -> IResult<&str, TimeUnit> { alt(( tag_no_case("second"), tag_no_case("seconds"), tag_no_case("S"), + ))(input) + .map(move |(r, _)| (r, TimeUnit::Second)) + } + fn to_minute(input: &str) -> IResult<&str, TimeUnit> { + alt(( tag_no_case("minute"), tag_no_case("minutes"), tag_no_case("m"), - tag_no_case("hour"), - tag_no_case("hours"), - tag_no_case("h"), - tag_no_case("day"), - tag_no_case("days"), - tag_no_case("d"), - tag_no_case("month"), - tag_no_case("months"), - tag_no_case("m"), - tag_no_case("year"), - tag_no_case("years"), - tag_no_case("y"), + ))(input) + .map(move |(r, _)| (r, TimeUnit::Minute)) + } + fn to_hour(input: &str) -> IResult<&str, TimeUnit> { + alt((tag_no_case("hour"), tag_no_case("hours"), tag_no_case("h")))(input) + .map(move |(r, _)| (r, TimeUnit::Hour)) + } + + tuple(( + multispace0, + alt(( + to_second, to_minute, + to_hour, /* + tag_no_case("day"), + tag_no_case("days"), + tag_no_case("d"), + tag_no_case("month"), + tag_no_case("months"), + tag_no_case("m"), + tag_no_case("year"), + tag_no_case("years"), + tag_no_case("y"), + */ )), multispace0, ))(input) diff --git a/src/flow/src/adapter/worker.rs b/src/flow/src/adapter/worker.rs index d91fbd2205..124e96ec0a 100644 --- a/src/flow/src/adapter/worker.rs +++ b/src/flow/src/adapter/worker.rs @@ -125,6 +125,7 @@ impl WorkerHandle { sink_sender: mpsc::UnboundedSender, source_ids: &[GlobalId], src_recvs: Vec>, + expire_when: Option, create_if_not_exist: bool, ) -> Result, Error> { let req = Request::Create { @@ -134,6 +135,7 @@ impl WorkerHandle { sink_sender, source_ids: source_ids.to_vec(), src_recvs, + expire_when, create_if_not_exist, }; @@ -233,6 +235,8 @@ impl<'s> Worker<'s> { sink_sender: mpsc::UnboundedSender, source_ids: &[GlobalId], src_recvs: Vec>, + // TODO(discord9): set expire duration for all arrangment and compare to sys timestamp instead + expire_when: Option, create_if_not_exist: bool, ) -> Result, Error> { if create_if_not_exist { @@ -298,6 +302,7 @@ impl<'s> Worker<'s> { sink_sender, source_ids, src_recvs, + expire_when, create_if_not_exist, } => { let task_create_result = self.create_flow( @@ -307,6 +312,7 @@ impl<'s> Worker<'s> { sink_sender, &source_ids, src_recvs, + expire_when, create_if_not_exist, ); Some(( @@ -343,6 +349,7 @@ enum Request { sink_sender: mpsc::UnboundedSender, source_ids: Vec, src_recvs: Vec>, + expire_when: Option, create_if_not_exist: bool, }, Remove { @@ -484,6 +491,7 @@ mod test { sink_tx, &src_ids, vec![rx], + None, true, ) .await