mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-27 08:29:59 +00:00
Compare commits
3 Commits
feat/impl-
...
flow/faste
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67a60646b4 | ||
|
|
1c3bde7e4e | ||
|
|
e045a0dbdf |
@@ -14,7 +14,7 @@
|
|||||||
|
|
||||||
//! Batching mode engine
|
//! Batching mode engine
|
||||||
|
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
|
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
|
||||||
@@ -142,7 +142,7 @@ impl BatchingEngine {
|
|||||||
|
|
||||||
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||||
let src_table_names = &task.config.source_table_names;
|
let src_table_names = &task.config.source_table_names;
|
||||||
let mut all_dirty_windows = vec![];
|
let mut all_dirty_windows = HashSet::new();
|
||||||
for src_table_name in src_table_names {
|
for src_table_name in src_table_names {
|
||||||
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
|
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
|
||||||
let Some(expr) = &task.config.time_window_expr else {
|
let Some(expr) = &task.config.time_window_expr else {
|
||||||
@@ -155,7 +155,7 @@ impl BatchingEngine {
|
|||||||
.context(UnexpectedSnafu {
|
.context(UnexpectedSnafu {
|
||||||
reason: "Failed to eval start value",
|
reason: "Failed to eval start value",
|
||||||
})?;
|
})?;
|
||||||
all_dirty_windows.push(align_start);
|
all_dirty_windows.insert(align_start);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,7 +50,8 @@ use snafu::{ensure, OptionExt, ResultExt};
|
|||||||
|
|
||||||
use crate::adapter::util::from_proto_to_data_type;
|
use crate::adapter::util::from_proto_to_data_type;
|
||||||
use crate::error::{
|
use crate::error::{
|
||||||
ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, UnexpectedSnafu,
|
ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, TimeSnafu,
|
||||||
|
UnexpectedSnafu,
|
||||||
};
|
};
|
||||||
use crate::expr::error::DataTypeSnafu;
|
use crate::expr::error::DataTypeSnafu;
|
||||||
use crate::Error;
|
use crate::Error;
|
||||||
@@ -74,6 +75,7 @@ pub struct TimeWindowExpr {
|
|||||||
logical_expr: Expr,
|
logical_expr: Expr,
|
||||||
df_schema: DFSchema,
|
df_schema: DFSchema,
|
||||||
eval_time_window_size: Option<std::time::Duration>,
|
eval_time_window_size: Option<std::time::Duration>,
|
||||||
|
eval_time_original: Option<Timestamp>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for TimeWindowExpr {
|
impl std::fmt::Display for TimeWindowExpr {
|
||||||
@@ -106,10 +108,11 @@ impl TimeWindowExpr {
|
|||||||
logical_expr: expr.clone(),
|
logical_expr: expr.clone(),
|
||||||
df_schema: df_schema.clone(),
|
df_schema: df_schema.clone(),
|
||||||
eval_time_window_size: None,
|
eval_time_window_size: None,
|
||||||
|
eval_time_original: None,
|
||||||
};
|
};
|
||||||
let test_ts = DEFAULT_TEST_TIMESTAMP;
|
let test_ts = DEFAULT_TEST_TIMESTAMP;
|
||||||
let (l, u) = zelf.eval(test_ts)?;
|
let (lower, upper) = zelf.eval(test_ts)?;
|
||||||
let time_window_size = match (l, u) {
|
let time_window_size = match (lower, upper) {
|
||||||
(Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
|
(Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
|
||||||
UnexpectedSnafu {
|
UnexpectedSnafu {
|
||||||
reason: format!(
|
reason: format!(
|
||||||
@@ -121,13 +124,59 @@ impl TimeWindowExpr {
|
|||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
zelf.eval_time_window_size = time_window_size;
|
zelf.eval_time_window_size = time_window_size;
|
||||||
|
zelf.eval_time_original = lower;
|
||||||
|
|
||||||
Ok(zelf)
|
Ok(zelf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// TODO(discord9): add `eval_batch` too
|
||||||
pub fn eval(
|
pub fn eval(
|
||||||
&self,
|
&self,
|
||||||
current: Timestamp,
|
current: Timestamp,
|
||||||
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
|
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
|
||||||
|
fn compute_distance(time_diff_ns: i64, stride_ns: i64) -> i64 {
|
||||||
|
if stride_ns == 0 {
|
||||||
|
return time_diff_ns;
|
||||||
|
}
|
||||||
|
// a - (a % n) impl ceil to nearest n * stride
|
||||||
|
let time_delta = time_diff_ns - (time_diff_ns % stride_ns);
|
||||||
|
|
||||||
|
if time_diff_ns < 0 && time_delta != time_diff_ns {
|
||||||
|
// The origin is later than the source timestamp, round down to the previous bin
|
||||||
|
|
||||||
|
time_delta - stride_ns
|
||||||
|
} else {
|
||||||
|
time_delta
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// FAST PATH: if we have eval_time_original and eval_time_window_size,
|
||||||
|
// we can compute the bounds directly
|
||||||
|
if let (Some(original), Some(window_size)) =
|
||||||
|
(self.eval_time_original, self.eval_time_window_size)
|
||||||
|
{
|
||||||
|
// date_bin align current to lower bound
|
||||||
|
let time_diff_ns = current.sub(&original).and_then(|s|s.num_nanoseconds()).with_context(||UnexpectedSnafu {
|
||||||
|
reason: format!(
|
||||||
|
"Failed to compute time difference between current {current:?} and original {original:?}"
|
||||||
|
),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let window_size_ns = window_size.as_nanos() as i64;
|
||||||
|
|
||||||
|
let distance_ns = compute_distance(time_diff_ns, window_size_ns);
|
||||||
|
|
||||||
|
let lower_bound = if distance_ns >= 0 {
|
||||||
|
original.add_duration(std::time::Duration::from_nanos(distance_ns as u64))
|
||||||
|
} else {
|
||||||
|
original.sub_duration(std::time::Duration::from_nanos((-distance_ns) as u64))
|
||||||
|
}
|
||||||
|
.context(TimeSnafu)?;
|
||||||
|
let upper_bound = lower_bound.add_duration(window_size).context(TimeSnafu)?;
|
||||||
|
|
||||||
|
return Ok((Some(lower_bound), Some(upper_bound)));
|
||||||
|
}
|
||||||
|
|
||||||
let lower_bound =
|
let lower_bound =
|
||||||
calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
|
calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
|
||||||
let upper_bound =
|
let upper_bound =
|
||||||
|
|||||||
Reference in New Issue
Block a user