mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
Compare commits
3 Commits
docs/vecto
...
flow/faste
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
67a60646b4 | ||
|
|
1c3bde7e4e | ||
|
|
e045a0dbdf |
@@ -14,7 +14,7 @@
|
||||
|
||||
//! Batching mode engine
|
||||
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::flow::{DirtyWindowRequests, FlowResponse};
|
||||
@@ -142,7 +142,7 @@ impl BatchingEngine {
|
||||
|
||||
let handle: JoinHandle<Result<(), Error>> = tokio::spawn(async move {
|
||||
let src_table_names = &task.config.source_table_names;
|
||||
let mut all_dirty_windows = vec![];
|
||||
let mut all_dirty_windows = HashSet::new();
|
||||
for src_table_name in src_table_names {
|
||||
if let Some((timestamps, unit)) = group_by_table_name.get(src_table_name) {
|
||||
let Some(expr) = &task.config.time_window_expr else {
|
||||
@@ -155,7 +155,7 @@ impl BatchingEngine {
|
||||
.context(UnexpectedSnafu {
|
||||
reason: "Failed to eval start value",
|
||||
})?;
|
||||
all_dirty_windows.push(align_start);
|
||||
all_dirty_windows.insert(align_start);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,7 +50,8 @@ use snafu::{ensure, OptionExt, ResultExt};
|
||||
|
||||
use crate::adapter::util::from_proto_to_data_type;
|
||||
use crate::error::{
|
||||
ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, UnexpectedSnafu,
|
||||
ArrowSnafu, DatafusionSnafu, DatatypesSnafu, ExternalSnafu, PlanSnafu, TimeSnafu,
|
||||
UnexpectedSnafu,
|
||||
};
|
||||
use crate::expr::error::DataTypeSnafu;
|
||||
use crate::Error;
|
||||
@@ -74,6 +75,7 @@ pub struct TimeWindowExpr {
|
||||
logical_expr: Expr,
|
||||
df_schema: DFSchema,
|
||||
eval_time_window_size: Option<std::time::Duration>,
|
||||
eval_time_original: Option<Timestamp>,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for TimeWindowExpr {
|
||||
@@ -106,10 +108,11 @@ impl TimeWindowExpr {
|
||||
logical_expr: expr.clone(),
|
||||
df_schema: df_schema.clone(),
|
||||
eval_time_window_size: None,
|
||||
eval_time_original: None,
|
||||
};
|
||||
let test_ts = DEFAULT_TEST_TIMESTAMP;
|
||||
let (l, u) = zelf.eval(test_ts)?;
|
||||
let time_window_size = match (l, u) {
|
||||
let (lower, upper) = zelf.eval(test_ts)?;
|
||||
let time_window_size = match (lower, upper) {
|
||||
(Some(l), Some(u)) => u.sub(&l).map(|r| r.to_std()).transpose().map_err(|_| {
|
||||
UnexpectedSnafu {
|
||||
reason: format!(
|
||||
@@ -121,13 +124,59 @@ impl TimeWindowExpr {
|
||||
_ => None,
|
||||
};
|
||||
zelf.eval_time_window_size = time_window_size;
|
||||
zelf.eval_time_original = lower;
|
||||
|
||||
Ok(zelf)
|
||||
}
|
||||
|
||||
/// TODO(discord9): add `eval_batch` too
|
||||
pub fn eval(
|
||||
&self,
|
||||
current: Timestamp,
|
||||
) -> Result<(Option<Timestamp>, Option<Timestamp>), Error> {
|
||||
fn compute_distance(time_diff_ns: i64, stride_ns: i64) -> i64 {
|
||||
if stride_ns == 0 {
|
||||
return time_diff_ns;
|
||||
}
|
||||
// a - (a % n) impl ceil to nearest n * stride
|
||||
let time_delta = time_diff_ns - (time_diff_ns % stride_ns);
|
||||
|
||||
if time_diff_ns < 0 && time_delta != time_diff_ns {
|
||||
// The origin is later than the source timestamp, round down to the previous bin
|
||||
|
||||
time_delta - stride_ns
|
||||
} else {
|
||||
time_delta
|
||||
}
|
||||
}
|
||||
|
||||
// FAST PATH: if we have eval_time_original and eval_time_window_size,
|
||||
// we can compute the bounds directly
|
||||
if let (Some(original), Some(window_size)) =
|
||||
(self.eval_time_original, self.eval_time_window_size)
|
||||
{
|
||||
// date_bin align current to lower bound
|
||||
let time_diff_ns = current.sub(&original).and_then(|s|s.num_nanoseconds()).with_context(||UnexpectedSnafu {
|
||||
reason: format!(
|
||||
"Failed to compute time difference between current {current:?} and original {original:?}"
|
||||
),
|
||||
})?;
|
||||
|
||||
let window_size_ns = window_size.as_nanos() as i64;
|
||||
|
||||
let distance_ns = compute_distance(time_diff_ns, window_size_ns);
|
||||
|
||||
let lower_bound = if distance_ns >= 0 {
|
||||
original.add_duration(std::time::Duration::from_nanos(distance_ns as u64))
|
||||
} else {
|
||||
original.sub_duration(std::time::Duration::from_nanos((-distance_ns) as u64))
|
||||
}
|
||||
.context(TimeSnafu)?;
|
||||
let upper_bound = lower_bound.add_duration(window_size).context(TimeSnafu)?;
|
||||
|
||||
return Ok((Some(lower_bound), Some(upper_bound)));
|
||||
}
|
||||
|
||||
let lower_bound =
|
||||
calc_expr_time_window_lower_bound(&self.phy_expr, &self.df_schema, current)?;
|
||||
let upper_bound =
|
||||
|
||||
Reference in New Issue
Block a user