feat: parse expire when

This commit is contained in:
discord9
2024-05-08 20:39:46 +08:00
parent 46d0b3cd64
commit 8f6462c0b0
3 changed files with 116 additions and 23 deletions

View File

@@ -50,6 +50,7 @@ use tokio::task::LocalSet;
use crate::adapter::error::{
EvalSnafu, ExternalSnafu, TableNotFoundMetaSnafu, TableNotFoundSnafu, UnexpectedSnafu,
};
use crate::adapter::parse_expr::{parse_duration, parse_fixed};
use crate::adapter::util::column_schemas_to_proto;
use crate::adapter::worker::{create_worker, Worker, WorkerHandle};
use crate::compute::{Context, DataflowState, ErrCollector};
@@ -507,7 +508,7 @@ impl FlownodeManager {
(wout_ts, with_ts)
};
let proto_schema_wout_ts = column_schemas_to_proto(schema_wout_ts, &primary_keys)?;
let _proto_schema_wout_ts = column_schemas_to_proto(schema_wout_ts, &primary_keys)?;
let proto_schema_with_ts = column_schemas_to_proto(with_ts, &primary_keys)?;
@@ -669,7 +670,15 @@ impl FlownodeManager {
node_ctx.assign_table_schema(&sink_table_name, flow_plan.typ.clone())?;
// TODO(discord9): parse `expire_when`
let _ = expire_when;
let expire_when = expire_when
.map(|d| {
let d = d.as_ref();
parse_fixed(d)
.map(|(_, n)| n)
.map_err(|err| err.to_string())
})
.transpose()
.map_err(|err| UnexpectedSnafu { reason: err }.build())?;
let _ = comment;
let _ = flow_options;
@@ -695,6 +704,7 @@ impl FlownodeManager {
sink_sender,
&source_ids,
source_senders,
expire_when,
create_if_not_exist,
)
.await?;

View File

@@ -12,18 +12,74 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! parse expr like "ts <= now() - interval '5 min'"
//! parse expr like "ts <= now() - interval '5 m'"
use nom::branch::alt;
use nom::bytes::complete::{tag, tag_no_case};
use nom::character::complete::{alphanumeric1, digit0, multispace0};
use nom::character::complete::{alphanumeric1, digit0, multispace0, multispace1};
use nom::combinator::peek;
use nom::sequence::tuple;
use nom::IResult;
use nom::{multi, IResult};
use crate::expr::ScalarExpr;
use crate::repr;
#[test]
fn test_parse_duration() {
let input = "1 h 5 m 42 second";
let (remain, ttl) = parse_duration(input).unwrap();
assert_eq!(remain, "");
assert_eq!(ttl, (3600 + 5 * 60 + 42) * 1000);
}
#[test]
fn test_parse_fixed() {
let input = "timestamp < now() - INTERVAL '5m 42s'";
let (remain, ttl) = parse_fixed(input).unwrap();
assert_eq!(remain, "");
assert_eq!(ttl, (5 * 60 + 42) * 1000);
}
pub fn parse_fixed(input: &str) -> IResult<&str, i64> {
let (r, _) = tuple((
multispace0,
tag_no_case("timestamp"),
multispace0,
tag("<"),
multispace0,
tag_no_case("now()"),
multispace0,
tag("-"),
multispace0,
tag_no_case("interval"),
multispace0,
))(input)?;
tuple((tag("'"), parse_duration, tag("'")))(r).map(|(r, (_, ttl, _))| (r, ttl))
}
/// parse duration and return ttl, currently only support time part of psql interval type
pub fn parse_duration(input: &str) -> IResult<&str, i64> {
let mut intervals = vec![];
let mut remain = input;
while peek(parse_quality)(remain).is_ok() {
let (r, number) = parse_quality(remain)?;
let (r, unit) = parse_time_unit(r)?;
intervals.push((number, unit));
remain = r;
}
let mut total = 0;
for (number, unit) in intervals {
let number = match unit {
TimeUnit::Second => number,
TimeUnit::Minute => number * 60,
TimeUnit::Hour => number * 60 * 60,
};
total += number;
}
total *= 1000;
Ok((remain, total))
}
enum Expr {
Col(String),
Now,
@@ -68,7 +124,6 @@ fn parse_item(input: &str) -> IResult<&str, Expr> {
} else if let Ok((r, _now)) = parse_now(input) {
Ok((r, Expr::Now))
} else if let Ok((r, _num)) = parse_quality(input) {
let _ = parse_unit(r)?;
todo!()
} else {
todo!()
@@ -112,7 +167,7 @@ fn parse_cmp(input: &str) -> IResult<&str, &str> {
}
/// parse a number with optional sign
fn parse_quality(input: &str) -> IResult<&str, isize> {
fn parse_quality(input: &str) -> IResult<&str, repr::Duration> {
tuple((
multispace0,
alt((tag("+"), tag("-"), tag(""))),
@@ -121,7 +176,7 @@ fn parse_quality(input: &str) -> IResult<&str, isize> {
))(input)
.map(|(r, (_, sign, name, _))| (r, sign, name))
.and_then(|(r, sign, name)| {
let num = name.parse::<isize>().map_err(|_| {
let num = name.parse::<repr::Duration>().map_err(|_| {
nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
})?;
let num = match sign {
@@ -133,37 +188,57 @@ fn parse_quality(input: &str) -> IResult<&str, isize> {
})
}
#[derive(Debug, Clone)]
enum TimeUnit {
Second,
Minute,
Hour,
}
#[derive(Debug, Clone)]
enum DateUnit {
Day,
Month,
Year,
}
fn parse_unit(input: &str) -> IResult<&str, &str> {
tuple((
multispace0,
fn parse_time_unit(input: &str) -> IResult<&str, TimeUnit> {
fn to_second(input: &str) -> IResult<&str, TimeUnit> {
alt((
tag_no_case("second"),
tag_no_case("seconds"),
tag_no_case("S"),
))(input)
.map(move |(r, _)| (r, TimeUnit::Second))
}
fn to_minute(input: &str) -> IResult<&str, TimeUnit> {
alt((
tag_no_case("minute"),
tag_no_case("minutes"),
tag_no_case("m"),
tag_no_case("hour"),
tag_no_case("hours"),
tag_no_case("h"),
tag_no_case("day"),
tag_no_case("days"),
tag_no_case("d"),
tag_no_case("month"),
tag_no_case("months"),
tag_no_case("m"),
tag_no_case("year"),
tag_no_case("years"),
tag_no_case("y"),
))(input)
.map(move |(r, _)| (r, TimeUnit::Minute))
}
fn to_hour(input: &str) -> IResult<&str, TimeUnit> {
alt((tag_no_case("hour"), tag_no_case("hours"), tag_no_case("h")))(input)
.map(move |(r, _)| (r, TimeUnit::Hour))
}
tuple((
multispace0,
alt((
to_second, to_minute,
to_hour, /*
tag_no_case("day"),
tag_no_case("days"),
tag_no_case("d"),
tag_no_case("month"),
tag_no_case("months"),
tag_no_case("m"),
tag_no_case("year"),
tag_no_case("years"),
tag_no_case("y"),
*/
)),
multispace0,
))(input)

View File

@@ -125,6 +125,7 @@ impl WorkerHandle {
sink_sender: mpsc::UnboundedSender<DiffRow>,
source_ids: &[GlobalId],
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
) -> Result<Option<FlowId>, Error> {
let req = Request::Create {
@@ -134,6 +135,7 @@ impl WorkerHandle {
sink_sender,
source_ids: source_ids.to_vec(),
src_recvs,
expire_when,
create_if_not_exist,
};
@@ -233,6 +235,8 @@ impl<'s> Worker<'s> {
sink_sender: mpsc::UnboundedSender<DiffRow>,
source_ids: &[GlobalId],
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
// TODO(discord9): set expire duration for all arrangment and compare to sys timestamp instead
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
) -> Result<Option<FlowId>, Error> {
if create_if_not_exist {
@@ -298,6 +302,7 @@ impl<'s> Worker<'s> {
sink_sender,
source_ids,
src_recvs,
expire_when,
create_if_not_exist,
} => {
let task_create_result = self.create_flow(
@@ -307,6 +312,7 @@ impl<'s> Worker<'s> {
sink_sender,
&source_ids,
src_recvs,
expire_when,
create_if_not_exist,
);
Some((
@@ -343,6 +349,7 @@ enum Request {
sink_sender: mpsc::UnboundedSender<DiffRow>,
source_ids: Vec<GlobalId>,
src_recvs: Vec<broadcast::Receiver<DiffRow>>,
expire_when: Option<repr::Duration>,
create_if_not_exist: bool,
},
Remove {
@@ -484,6 +491,7 @@ mod test {
sink_tx,
&src_ids,
vec![rx],
None,
true,
)
.await