From d20727f335e7304dbd4c7474ccbb6fbb2fe89fd0 Mon Sep 17 00:00:00 2001 From: discord9 Date: Mon, 22 Dec 2025 18:15:02 +0800 Subject: [PATCH] test: better fuzz Signed-off-by: discord9 --- src/query/src/part_sort.rs | 185 +++++++++++++++++++++++++++---------- 1 file changed, 137 insertions(+), 48 deletions(-) diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 6484f4e102..1829045e55 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -50,7 +50,7 @@ use snafu::location; use store_api::region_engine::PartitionRange; use crate::error::Result; -use crate::window_sort::check_partition_range_monotonicity; +use crate::window_sort::{WindowedSortExec, check_partition_range_monotonicity}; use crate::{array_iter_helper, downcast_ts_array}; /// Get the primary end of a `PartitionRange` based on sort direction. @@ -1078,7 +1078,6 @@ mod test { TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, }; - use arrow::json::ArrayWriter; use arrow_schema::{DataType, Field, Schema, SortOptions, TimeUnit}; use common_time::Timestamp; use datafusion_physical_expr::expressions::Column; @@ -1088,10 +1087,41 @@ mod test { use super::*; use crate::test_util::{MockInputExec, new_ts_array}; - #[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"] + macro_rules! extract_ts_values_helper { + ($t:ty, $unit:expr, $array:expr) => { + $array + .as_any() + .downcast_ref::>() + .unwrap() + .values() + .iter() + .map(|&v| v as i64) + .collect::>() + }; + } + + /// Helper function to extract timestamp values from timestamp arrays as Vec + fn extract_ts_values(array: &ArrayRef) -> Vec { + downcast_ts_array!( + array.data_type() => (extract_ts_values_helper, array), + _ => panic!("Expected timestamp array, got {:?}", array.data_type()), + ) + } + + /// TODO(discord9): move this to fuzz test folder #[tokio::test] - async fn fuzzy_test() { - let test_cnt = 100; + async fn fuzzy_test_driver() { + let mut rng = fastrand::Rng::new(); + + loop { + let seed = rng.u64(..); + fuzzy_test(seed, true).await; + fuzzy_test(seed, false).await; + } + } + + async fn fuzzy_test(seed: u64, chain_windowed_sort: bool) { + let test_cnt = 1; // bound for total count of PartitionRange let part_cnt_bound = 100; // bound for timestamp range size and offset for each PartitionRange @@ -1102,7 +1132,7 @@ mod test { let batch_size_bound = 100; let mut rng = fastrand::Rng::new(); - rng.seed(1337); + rng.seed(seed); let mut test_cases = Vec::new(); @@ -1227,13 +1257,8 @@ mod test { let expected_output = if let Some(limit) = limit { let mut accumulated = Vec::new(); - let mut seen = 0usize; for mut range_values in output_data { - seen += range_values.len(); accumulated.append(&mut range_values); - if seen >= limit { - break; - } } if accumulated.is_empty() { @@ -1255,16 +1280,32 @@ mod test { ) } } else { - let batches = output_data - .into_iter() - .map(|a| { - DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)]).unwrap() - }) - .collect_vec(); - if batches.is_empty() { - None + if chain_windowed_sort { + // if chain_windowed_sort is true, we expect a single output batch sorted across all ranges + let mut sorted = output_data.into_iter().flatten().collect_vec(); + if descending { + sorted.sort_by(|a, b| b.cmp(a)); + } else { + sorted.sort(); + } + + Some( + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, sorted)]) + .unwrap(), + ) } else { - Some(concat_batches(&schema, &batches).unwrap()) + let batches = output_data + .into_iter() + .map(|a| { + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)]) + .unwrap() + }) + .collect_vec(); + if batches.is_empty() { + None + } else { + Some(concat_batches(&schema, &batches).unwrap()) + } } }; @@ -1280,6 +1321,11 @@ mod test { } for (case_id, _unit, input_ranged_data, schema, opt, limit, expected_output) in test_cases { + // Wrap test execution to catch panics and print seed only on failure + println!( + "Fuzzy test with seed {}, chain_ws={}", + seed, chain_windowed_sort + ); run_test( case_id, input_ranged_data, @@ -1288,6 +1334,7 @@ mod test { limit, expected_output, None, + chain_windowed_sort, ) .await; } @@ -1447,6 +1494,7 @@ mod test { limit, expected_output, None, + false, ) .await; } @@ -1461,6 +1509,7 @@ mod test { limit: Option, expected_output: Option, expected_polled_rows: Option, + chain_windowed_sort: bool, ) { if let (Some(limit), Some(rb)) = (limit, &expected_output) { assert!( @@ -1480,7 +1529,7 @@ mod test { let mock_input = Arc::new(MockInputExec::new(data_partition, schema.clone())); - let exec = PartSortExec::try_new( + let part_sort_exec = PartSortExec::try_new( PhysicalSortExpr { expr: Arc::new(Column::new("ts", 0)), options: opt, @@ -1491,10 +1540,27 @@ mod test { ) .unwrap(); + let exec: Arc = if chain_windowed_sort { + Arc::new( + WindowedSortExec::try_new( + PhysicalSortExpr { + expr: Arc::new(Column::new("ts", 0)), + options: opt, + }, + limit, + vec![ranges], + Arc::new(part_sort_exec), + ) + .unwrap(), + ) + } else { + Arc::new(part_sort_exec) + }; + let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap(); let real_output = exec_stream.map(|r| r.unwrap()).collect::>().await; - if limit.is_some() { + if limit.is_some() && !chain_windowed_sort { assert!( real_output.len() <= 1, "case_{case_id} expects a single output batch when limit is set, got {}", @@ -1517,37 +1583,36 @@ mod test { (None, None) => {} (Some(actual), Some(expected)) => { if actual != expected { - let mut actual_json: Vec = Vec::new(); - let mut writer = ArrayWriter::new(&mut actual_json); - writer.write(&actual).unwrap(); - writer.finish().unwrap(); - - let mut expected_json: Vec = Vec::new(); - let mut writer = ArrayWriter::new(&mut expected_json); - writer.write(&expected).unwrap(); - writer.finish().unwrap(); + // Extract timestamp values as Vec for comparison + let actual_ts_values = extract_ts_values(actual.column(0)); + let expected_ts_values = extract_ts_values(expected.column(0)); panic!( - "case_{} failed (limit {limit:?}), opt: {:?},\nreal_output: {}\nexpected: {}", - case_id, - opt, - String::from_utf8_lossy(&actual_json), - String::from_utf8_lossy(&expected_json), + "case_{} failed (limit {limit:?}), opt: {:?},\nactual_ts_values: {:?}\nexpected_ts_values: {:?}", + case_id, opt, actual_ts_values, expected_ts_values, ); } } - (None, Some(expected)) => panic!( - "case_{} failed (limit {limit:?}), opt: {:?},\nreal output is empty, expected {} rows", - case_id, - opt, - expected.num_rows() - ), - (Some(actual), None) => panic!( - "case_{} failed (limit {limit:?}), opt: {:?},\nreal output has {} rows, expected empty", - case_id, - opt, - actual.num_rows() - ), + (None, Some(expected)) => { + if expected.num_rows() != 0 { + panic!( + "case_{} failed (limit {limit:?}), opt: {:?},\nreal output is empty, expected {} rows", + case_id, + opt, + expected.num_rows() + ) + } + } + (Some(actual), None) => { + if actual.num_rows() != 0 { + panic!( + "case_{} failed (limit {limit:?}), opt: {:?},\nreal output has {} rows, expected empty", + case_id, + opt, + actual.num_rows() + ) + } + } } } @@ -1598,6 +1663,7 @@ mod test { Some(3), expected_output, None, + false, ) .await; @@ -1656,6 +1722,7 @@ mod test { Some(2), expected_output, None, + false, ) .await; @@ -1693,6 +1760,7 @@ mod test { Some(2), expected_output, None, + false, ) .await; } @@ -1794,6 +1862,7 @@ mod test { Some(2), expected_output, Some(10), + false, ) .await; } @@ -1868,6 +1937,7 @@ mod test { Some(4), expected_output, None, + false, ) .await; } @@ -1962,6 +2032,7 @@ mod test { Some(4), expected_output, None, + false, ) .await; } @@ -2036,6 +2107,7 @@ mod test { Some(4), expected_output, Some(9), // Pull both batches since all rows fall within the first range + false, ) .await; } @@ -2115,6 +2187,7 @@ mod test { Some(4), expected_output, Some(9), // Pull both batches to detect boundary + false, ) .await; } @@ -2214,6 +2287,7 @@ mod test { Some(4), expected_output, Some(11), // Pull first two batches to detect boundary + false, ) .await; } @@ -2322,6 +2396,7 @@ mod test { Some(4), expected_output, Some(11), // Read first 4 ranges to confirm threshold boundary + false, ) .await; } @@ -2411,6 +2486,7 @@ mod test { Some(3), expected_output, Some(8), // Must read both batches to detect group boundary + false, ) .await; @@ -2481,6 +2557,7 @@ mod test { Some(3), expected_output, Some(8), // Must read both batches to detect group boundary + false, ) .await; } @@ -2551,6 +2628,7 @@ mod test { Some(4), expected_output, Some(9), // Must read all batches since no early stop is possible + false, ) .await; } @@ -2617,6 +2695,7 @@ mod test { Some(3), expected_output, Some(7), // Must read both batches to detect boundary + false, ) .await; @@ -2672,6 +2751,7 @@ mod test { Some(3), expected_output, Some(7), // Must read both batches since 20 is not < 20 + false, ) .await; } @@ -2763,6 +2843,7 @@ mod test { Some(2), expected_output, Some(7), // Must read until finding actual data + false, ) .await; @@ -2843,6 +2924,7 @@ mod test { Some(2), expected_output, Some(7), // Must read to detect early stop condition + false, ) .await; } @@ -2939,6 +3021,7 @@ mod test { Some(2), expected_output, Some(7), // Must read until finding actual data and detecting early stop + false, ) .await; } @@ -3051,6 +3134,7 @@ mod test { Some(3), expected_output, Some(12), // Read through all groups to detect boundaries correctly + false, ) .await; } @@ -3168,6 +3252,7 @@ mod test { Some(4), expected_output, Some(13), // Read through all groups to detect boundaries correctly + false, ) .await; } @@ -3251,6 +3336,7 @@ mod test { Some(3), expected_output, Some(9), // Read through both groups to detect boundaries correctly + false, ) .await; } @@ -3350,6 +3436,7 @@ mod test { Some(4), expected_output, Some(10), // Read through all groups because it can't detect boundaries due to data distribution + false, ) .await; } @@ -3420,6 +3507,7 @@ mod test { Some(3), expected_output_case1, Some(10), // Should read through first&second group and detect boundary + false, ) .await; @@ -3443,6 +3531,7 @@ mod test { Some(7), expected_output_case2, Some(10), // Should read through both groups + false, ) .await; }