fix(WIP): choose

This commit is contained in:
discord9
2024-05-20 17:41:23 +08:00
parent e580ba63ec
commit d1ce436442
3 changed files with 135 additions and 7 deletions

View File

@@ -653,21 +653,20 @@ impl FlownodeManager {
///
/// TODO(discord9): better way to do it, and not expose flow tick even to other flow to avoid
/// TSO coord mess
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FlowTickManager {
start: Instant,
}
impl std::fmt::Debug for FlowTickManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FlowTickManager").finish()
}
start_timestamp: repr::Timestamp,
}
impl FlowTickManager {
pub fn new() -> Self {
FlowTickManager {
start: Instant::now(),
start_timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_millis() as repr::Timestamp,
}
}

View File

@@ -435,6 +435,133 @@ mod test {
use crate::repr::{self, ColumnType, RelationType};
use crate::transform::test::{create_test_ctx, create_test_query_engine, sql_to_substrait};
#[tokio::test]
async fn test_tumble_compsite() {
let engine = create_test_query_engine();
let sql =
"SELECT number, avg(number) FROM numbers_with_ts GROUP BY tumble(ts, '1 hour'), number";
let plan = sql_to_substrait(engine.clone(), sql).await;
let mut ctx = create_test_ctx();
let flow_plan = TypedPlan::from_substrait_plan(&mut ctx, &plan).unwrap();
let aggr_exprs = vec![
AggregateExpr {
func: AggregateFunc::SumUInt32,
expr: ScalarExpr::Column(0),
distinct: false,
},
AggregateExpr {
func: AggregateFunc::Count,
expr: ScalarExpr::Column(0),
distinct: false,
},
];
let avg_expr = ScalarExpr::If {
cond: Box::new(ScalarExpr::Column(4).call_binary(
ScalarExpr::Literal(Value::from(0i64), CDT::int64_datatype()),
BinaryFunc::NotEq,
)),
then: Box::new(ScalarExpr::Column(3).call_binary(
ScalarExpr::Column(4).call_unary(UnaryFunc::Cast(CDT::uint64_datatype())),
BinaryFunc::DivUInt64,
)),
els: Box::new(ScalarExpr::Literal(Value::Null, CDT::uint64_datatype())),
};
let expected = TypedPlan {
typ: RelationType::new(vec![
ColumnType::new(CDT::uint32_datatype(), false), // number
ColumnType::new(CDT::uint64_datatype(), true), // sum(number)
ColumnType::new(CDT::datetime_datatype(), false), // window start
ColumnType::new(CDT::datetime_datatype(), false), // window end
]),
// TODO(discord9): mfp indirectly ref to key columns
/*
.with_key(vec![1])
.with_time_index(Some(0)),*/
plan: Plan::Mfp {
input: Box::new(
Plan::Reduce {
input: Box::new(
Plan::Get {
id: crate::expr::Id::Global(GlobalId::User(1)),
}
.with_types(RelationType::new(vec![
ColumnType::new(ConcreteDataType::uint32_datatype(), false),
ColumnType::new(ConcreteDataType::datetime_datatype(), false),
])),
),
key_val_plan: KeyValPlan {
key_plan: MapFilterProject::new(2)
.map(vec![
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowFloor {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
start_time: None,
},
),
ScalarExpr::Column(1).call_unary(
UnaryFunc::TumbleWindowCeiling {
window_size: Interval::from_month_day_nano(
0,
0,
3_600_000_000_000,
),
start_time: None,
},
),
ScalarExpr::Column(0),
])
.unwrap()
.project(vec![2, 3, 4])
.unwrap()
.into_safe(),
val_plan: MapFilterProject::new(2)
.project(vec![0, 1])
.unwrap()
.into_safe(),
},
reduce_plan: ReducePlan::Accumulable(AccumulablePlan {
full_aggrs: aggr_exprs.clone(),
simple_aggrs: vec![
AggrWithIndex::new(aggr_exprs[0].clone(), 0, 0),
AggrWithIndex::new(aggr_exprs[1].clone(), 0, 1),
],
distinct_aggrs: vec![],
}),
}
.with_types(
RelationType::new(vec![
ColumnType::new(CDT::datetime_datatype(), false), // window start(time index)
ColumnType::new(CDT::datetime_datatype(), false), // window end(pk)
ColumnType::new(CDT::uint32_datatype(), false), // number(pk)
ColumnType::new(CDT::uint64_datatype(), true), // avg.sum(number)
ColumnType::new(CDT::uint64_datatype(), true), // avg.count(number)
])
.with_key(vec![1, 2])
.with_time_index(Some(0)),
),
),
mfp: MapFilterProject::new(5)
.map(vec![
avg_expr,
ScalarExpr::Column(2),
ScalarExpr::Column(5),
ScalarExpr::Column(0),
ScalarExpr::Column(1),
])
.unwrap()
.project(vec![6, 7, 8, 9])
.unwrap(),
},
};
assert_eq!(flow_plan, expected);
}
#[tokio::test]
async fn test_tumble_parse_optional() {
let engine = create_test_query_engine();

View File

@@ -208,7 +208,9 @@ impl Arrangement {
for ((key, val), update_ts, diff) in updates {
// check if the key is expired
if let Some(s) = &mut self.expire_state {
dbg!(now, &key, &s);
if let Some(expired_by) = s.update_event_ts(now, &key)? {
dbg!(expired_by, &key);
max_expired_by = max_expired_by.max(Some(expired_by));
continue;
}