feat: shorten possible wrong query range (#5849)

* feat: shorten possible wrong query range

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add unit test case

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-04-08 21:51:50 +08:00
committed by GitHub
parent c26e165887
commit 7ea04817bd

View File

@@ -505,7 +505,7 @@ impl RangeManipulateStream {
pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
// calculate the range
let ranges = self.calculate_range(&input)?;
let (ranges, (start, end)) = self.calculate_range(&input)?;
// ignore this if all ranges are empty
if ranges.iter().all(|(_, len)| *len == 0) {
return Ok(None);
@@ -537,7 +537,12 @@ impl RangeManipulateStream {
new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
}
// replace timestamp with the aligned one
new_columns[self.time_index] = self.aligned_ts_array.clone();
let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
Self::build_aligned_ts_array(start, end, self.interval)
} else {
self.aligned_ts_array.clone()
};
new_columns[self.time_index] = new_time_index;
RecordBatch::try_new(self.output_schema.clone(), new_columns)
.map(Some)
@@ -550,7 +555,14 @@ impl RangeManipulateStream {
))
}
fn calculate_range(&self, input: &RecordBatch) -> DataFusionResult<Vec<(u32, u32)>> {
/// Return values:
/// - A vector of tuples where each tuple contains the start index and length of the range.
/// - A tuple of the actual start/end timestamp used to calculate the range.
#[allow(clippy::type_complexity)]
fn calculate_range(
&self,
input: &RecordBatch,
) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
let ts_column = input
.column(self.time_index)
.as_any()
@@ -563,9 +575,20 @@ impl RangeManipulateStream {
let mut ranges = vec![];
// shorten the range to calculate
let first_ts = ts_column.value(0);
let first_ts_aligned = (first_ts / self.interval) * self.interval;
let last_ts = ts_column.value(ts_column.len() - 1);
let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
let start = self.start.max(first_ts_aligned);
let end = self.end.min(last_ts_aligned);
if start > end {
return Ok((vec![], (start, end)));
}
// calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
let mut range_start_index = 0usize;
for curr_ts in (self.start..=self.end).step_by(self.interval as _) {
for curr_ts in (start..=end).step_by(self.interval as _) {
let mut range_start = ts_column.len();
let mut range_end = 0;
let mut cursor = range_start_index;
@@ -590,7 +613,7 @@ impl RangeManipulateStream {
}
}
Ok(ranges)
Ok((ranges, (start, end)))
}
}
@@ -735,7 +758,10 @@ mod test {
ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
}",
);
do_normalize_test(0, 310_000, 30_000, 90_000, expected).await;
do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
// dump large range
do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
}
#[tokio::test]