fix: ts<start time correct window

This commit is contained in:
discord9
2024-05-23 21:02:07 +08:00
parent 5eaf9816b9
commit 9fda415b0d

View File

@@ -278,9 +278,9 @@ impl UnaryFunc {
start_time,
} => {
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
let start_time = start_time.map(|t| t.val());
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let window_start = start_time + (ts - start_time) / window_size * window_size;
let window_start = get_window_start(ts, window_size, start_time);
let ret = Timestamp::new_millisecond(window_start);
Ok(Value::from(ret))
@@ -290,9 +290,9 @@ impl UnaryFunc {
start_time,
} => {
let ts = get_ts_as_millisecond(arg)?;
let start_time = start_time.map(|t| t.val()).unwrap_or(0);
let start_time = start_time.map(|t| t.val());
let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond
let window_start = start_time + (ts - start_time) / window_size * window_size;
let window_start = get_window_start(ts, window_size, start_time);
let window_end = window_start + window_size;
let ret = Timestamp::new_millisecond(window_end);
@@ -302,6 +302,31 @@ impl UnaryFunc {
}
}
fn get_window_start(
ts: repr::Timestamp,
window_size: repr::Duration,
start_time: Option<repr::Timestamp>,
) -> repr::Timestamp {
let start_time = start_time.unwrap_or(0);
// left close right open
if ts >= start_time {
start_time + (ts - start_time) / window_size * window_size
} else {
start_time + (ts - start_time) / window_size * window_size
- (start_time - ts) % window_size * window_size
}
}
#[test]
fn test_get_window_start() {
assert_eq!(get_window_start(1, 2, None), 0);
assert_eq!(get_window_start(2, 2, None), 2);
assert_eq!(get_window_start(0, 2, None), 0);
assert_eq!(get_window_start(-1, 2, None), -2);
assert_eq!(get_window_start(-2, 2, None), -2);
}
fn get_ts_as_millisecond(arg: Value) -> Result<repr::Timestamp, EvalError> {
let ts = if let Some(ts) = arg.as_timestamp() {
ts.convert_to(TimeUnit::Millisecond)