From 9fda415b0dcdd6105f171faea8b3a040ee43be1c Mon Sep 17 00:00:00 2001 From: discord9 Date: Thu, 23 May 2024 21:02:07 +0800 Subject: [PATCH] fix: ts { let ts = get_ts_as_millisecond(arg)?; - let start_time = start_time.map(|t| t.val()).unwrap_or(0); + let start_time = start_time.map(|t| t.val()); let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond - let window_start = start_time + (ts - start_time) / window_size * window_size; + let window_start = get_window_start(ts, window_size, start_time); let ret = Timestamp::new_millisecond(window_start); Ok(Value::from(ret)) @@ -290,9 +290,9 @@ impl UnaryFunc { start_time, } => { let ts = get_ts_as_millisecond(arg)?; - let start_time = start_time.map(|t| t.val()).unwrap_or(0); + let start_time = start_time.map(|t| t.val()); let window_size = (window_size.to_nanosecond() / 1_000_000) as repr::Duration; // nanosecond to millisecond - let window_start = start_time + (ts - start_time) / window_size * window_size; + let window_start = get_window_start(ts, window_size, start_time); let window_end = window_start + window_size; let ret = Timestamp::new_millisecond(window_end); @@ -302,6 +302,31 @@ impl UnaryFunc { } } +fn get_window_start( + ts: repr::Timestamp, + window_size: repr::Duration, + start_time: Option, +) -> repr::Timestamp { + let start_time = start_time.unwrap_or(0); + // left close right open + if ts >= start_time { + start_time + (ts - start_time) / window_size * window_size + } else { + start_time + (ts - start_time) / window_size * window_size + - (start_time - ts) % window_size * window_size + } +} + +#[test] +fn test_get_window_start() { + assert_eq!(get_window_start(1, 2, None), 0); + assert_eq!(get_window_start(2, 2, None), 2); + assert_eq!(get_window_start(0, 2, None), 0); + + assert_eq!(get_window_start(-1, 2, None), -2); + assert_eq!(get_window_start(-2, 2, None), -2); +} + fn get_ts_as_millisecond(arg: Value) -> Result { let ts = if let Some(ts) = arg.as_timestamp() { ts.convert_to(TimeUnit::Millisecond)