test: better fuzz

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2025-12-22 18:15:02 +08:00
parent 2391ab1941
commit d20727f335

View File

@@ -50,7 +50,7 @@ use snafu::location;
use store_api::region_engine::PartitionRange;
use crate::error::Result;
use crate::window_sort::check_partition_range_monotonicity;
use crate::window_sort::{WindowedSortExec, check_partition_range_monotonicity};
use crate::{array_iter_helper, downcast_ts_array};
/// Get the primary end of a `PartitionRange` based on sort direction.
@@ -1078,7 +1078,6 @@ mod test {
TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
TimestampSecondArray,
};
use arrow::json::ArrayWriter;
use arrow_schema::{DataType, Field, Schema, SortOptions, TimeUnit};
use common_time::Timestamp;
use datafusion_physical_expr::expressions::Column;
@@ -1088,10 +1087,41 @@ mod test {
use super::*;
use crate::test_util::{MockInputExec, new_ts_array};
#[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"]
macro_rules! extract_ts_values_helper {
($t:ty, $unit:expr, $array:expr) => {
$array
.as_any()
.downcast_ref::<arrow::array::PrimitiveArray<$t>>()
.unwrap()
.values()
.iter()
.map(|&v| v as i64)
.collect::<Vec<_>>()
};
}
/// Helper function to extract timestamp values from timestamp arrays as Vec<i64>
fn extract_ts_values(array: &ArrayRef) -> Vec<i64> {
downcast_ts_array!(
array.data_type() => (extract_ts_values_helper, array),
_ => panic!("Expected timestamp array, got {:?}", array.data_type()),
)
}
/// TODO(discord9): move this to fuzz test folder
#[tokio::test]
async fn fuzzy_test() {
let test_cnt = 100;
async fn fuzzy_test_driver() {
let mut rng = fastrand::Rng::new();
loop {
let seed = rng.u64(..);
fuzzy_test(seed, true).await;
fuzzy_test(seed, false).await;
}
}
async fn fuzzy_test(seed: u64, chain_windowed_sort: bool) {
let test_cnt = 1;
// bound for total count of PartitionRange
let part_cnt_bound = 100;
// bound for timestamp range size and offset for each PartitionRange
@@ -1102,7 +1132,7 @@ mod test {
let batch_size_bound = 100;
let mut rng = fastrand::Rng::new();
rng.seed(1337);
rng.seed(seed);
let mut test_cases = Vec::new();
@@ -1227,13 +1257,8 @@ mod test {
let expected_output = if let Some(limit) = limit {
let mut accumulated = Vec::new();
let mut seen = 0usize;
for mut range_values in output_data {
seen += range_values.len();
accumulated.append(&mut range_values);
if seen >= limit {
break;
}
}
if accumulated.is_empty() {
@@ -1255,16 +1280,32 @@ mod test {
)
}
} else {
let batches = output_data
.into_iter()
.map(|a| {
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)]).unwrap()
})
.collect_vec();
if batches.is_empty() {
None
if chain_windowed_sort {
// if chain_windowed_sort is true, we expect a single output batch sorted across all ranges
let mut sorted = output_data.into_iter().flatten().collect_vec();
if descending {
sorted.sort_by(|a, b| b.cmp(a));
} else {
sorted.sort();
}
Some(
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, sorted)])
.unwrap(),
)
} else {
Some(concat_batches(&schema, &batches).unwrap())
let batches = output_data
.into_iter()
.map(|a| {
DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)])
.unwrap()
})
.collect_vec();
if batches.is_empty() {
None
} else {
Some(concat_batches(&schema, &batches).unwrap())
}
}
};
@@ -1280,6 +1321,11 @@ mod test {
}
for (case_id, _unit, input_ranged_data, schema, opt, limit, expected_output) in test_cases {
// Wrap test execution to catch panics and print seed only on failure
println!(
"Fuzzy test with seed {}, chain_ws={}",
seed, chain_windowed_sort
);
run_test(
case_id,
input_ranged_data,
@@ -1288,6 +1334,7 @@ mod test {
limit,
expected_output,
None,
chain_windowed_sort,
)
.await;
}
@@ -1447,6 +1494,7 @@ mod test {
limit,
expected_output,
None,
false,
)
.await;
}
@@ -1461,6 +1509,7 @@ mod test {
limit: Option<usize>,
expected_output: Option<DfRecordBatch>,
expected_polled_rows: Option<usize>,
chain_windowed_sort: bool,
) {
if let (Some(limit), Some(rb)) = (limit, &expected_output) {
assert!(
@@ -1480,7 +1529,7 @@ mod test {
let mock_input = Arc::new(MockInputExec::new(data_partition, schema.clone()));
let exec = PartSortExec::try_new(
let part_sort_exec = PartSortExec::try_new(
PhysicalSortExpr {
expr: Arc::new(Column::new("ts", 0)),
options: opt,
@@ -1491,10 +1540,27 @@ mod test {
)
.unwrap();
let exec: Arc<dyn ExecutionPlan> = if chain_windowed_sort {
Arc::new(
WindowedSortExec::try_new(
PhysicalSortExpr {
expr: Arc::new(Column::new("ts", 0)),
options: opt,
},
limit,
vec![ranges],
Arc::new(part_sort_exec),
)
.unwrap(),
)
} else {
Arc::new(part_sort_exec)
};
let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
let real_output = exec_stream.map(|r| r.unwrap()).collect::<Vec<_>>().await;
if limit.is_some() {
if limit.is_some() && !chain_windowed_sort {
assert!(
real_output.len() <= 1,
"case_{case_id} expects a single output batch when limit is set, got {}",
@@ -1517,37 +1583,36 @@ mod test {
(None, None) => {}
(Some(actual), Some(expected)) => {
if actual != expected {
let mut actual_json: Vec<u8> = Vec::new();
let mut writer = ArrayWriter::new(&mut actual_json);
writer.write(&actual).unwrap();
writer.finish().unwrap();
let mut expected_json: Vec<u8> = Vec::new();
let mut writer = ArrayWriter::new(&mut expected_json);
writer.write(&expected).unwrap();
writer.finish().unwrap();
// Extract timestamp values as Vec<i64> for comparison
let actual_ts_values = extract_ts_values(actual.column(0));
let expected_ts_values = extract_ts_values(expected.column(0));
panic!(
"case_{} failed (limit {limit:?}), opt: {:?},\nreal_output: {}\nexpected: {}",
case_id,
opt,
String::from_utf8_lossy(&actual_json),
String::from_utf8_lossy(&expected_json),
"case_{} failed (limit {limit:?}), opt: {:?},\nactual_ts_values: {:?}\nexpected_ts_values: {:?}",
case_id, opt, actual_ts_values, expected_ts_values,
);
}
}
(None, Some(expected)) => panic!(
"case_{} failed (limit {limit:?}), opt: {:?},\nreal output is empty, expected {} rows",
case_id,
opt,
expected.num_rows()
),
(Some(actual), None) => panic!(
"case_{} failed (limit {limit:?}), opt: {:?},\nreal output has {} rows, expected empty",
case_id,
opt,
actual.num_rows()
),
(None, Some(expected)) => {
if expected.num_rows() != 0 {
panic!(
"case_{} failed (limit {limit:?}), opt: {:?},\nreal output is empty, expected {} rows",
case_id,
opt,
expected.num_rows()
)
}
}
(Some(actual), None) => {
if actual.num_rows() != 0 {
panic!(
"case_{} failed (limit {limit:?}), opt: {:?},\nreal output has {} rows, expected empty",
case_id,
opt,
actual.num_rows()
)
}
}
}
}
@@ -1598,6 +1663,7 @@ mod test {
Some(3),
expected_output,
None,
false,
)
.await;
@@ -1656,6 +1722,7 @@ mod test {
Some(2),
expected_output,
None,
false,
)
.await;
@@ -1693,6 +1760,7 @@ mod test {
Some(2),
expected_output,
None,
false,
)
.await;
}
@@ -1794,6 +1862,7 @@ mod test {
Some(2),
expected_output,
Some(10),
false,
)
.await;
}
@@ -1868,6 +1937,7 @@ mod test {
Some(4),
expected_output,
None,
false,
)
.await;
}
@@ -1962,6 +2032,7 @@ mod test {
Some(4),
expected_output,
None,
false,
)
.await;
}
@@ -2036,6 +2107,7 @@ mod test {
Some(4),
expected_output,
Some(9), // Pull both batches since all rows fall within the first range
false,
)
.await;
}
@@ -2115,6 +2187,7 @@ mod test {
Some(4),
expected_output,
Some(9), // Pull both batches to detect boundary
false,
)
.await;
}
@@ -2214,6 +2287,7 @@ mod test {
Some(4),
expected_output,
Some(11), // Pull first two batches to detect boundary
false,
)
.await;
}
@@ -2322,6 +2396,7 @@ mod test {
Some(4),
expected_output,
Some(11), // Read first 4 ranges to confirm threshold boundary
false,
)
.await;
}
@@ -2411,6 +2486,7 @@ mod test {
Some(3),
expected_output,
Some(8), // Must read both batches to detect group boundary
false,
)
.await;
@@ -2481,6 +2557,7 @@ mod test {
Some(3),
expected_output,
Some(8), // Must read both batches to detect group boundary
false,
)
.await;
}
@@ -2551,6 +2628,7 @@ mod test {
Some(4),
expected_output,
Some(9), // Must read all batches since no early stop is possible
false,
)
.await;
}
@@ -2617,6 +2695,7 @@ mod test {
Some(3),
expected_output,
Some(7), // Must read both batches to detect boundary
false,
)
.await;
@@ -2672,6 +2751,7 @@ mod test {
Some(3),
expected_output,
Some(7), // Must read both batches since 20 is not < 20
false,
)
.await;
}
@@ -2763,6 +2843,7 @@ mod test {
Some(2),
expected_output,
Some(7), // Must read until finding actual data
false,
)
.await;
@@ -2843,6 +2924,7 @@ mod test {
Some(2),
expected_output,
Some(7), // Must read to detect early stop condition
false,
)
.await;
}
@@ -2939,6 +3021,7 @@ mod test {
Some(2),
expected_output,
Some(7), // Must read until finding actual data and detecting early stop
false,
)
.await;
}
@@ -3051,6 +3134,7 @@ mod test {
Some(3),
expected_output,
Some(12), // Read through all groups to detect boundaries correctly
false,
)
.await;
}
@@ -3168,6 +3252,7 @@ mod test {
Some(4),
expected_output,
Some(13), // Read through all groups to detect boundaries correctly
false,
)
.await;
}
@@ -3251,6 +3336,7 @@ mod test {
Some(3),
expected_output,
Some(9), // Read through both groups to detect boundaries correctly
false,
)
.await;
}
@@ -3350,6 +3436,7 @@ mod test {
Some(4),
expected_output,
Some(10), // Read through all groups because it can't detect boundaries due to data distribution
false,
)
.await;
}
@@ -3420,6 +3507,7 @@ mod test {
Some(3),
expected_output_case1,
Some(10), // Should read through first&second group and detect boundary
false,
)
.await;
@@ -3443,6 +3531,7 @@ mod test {
Some(7),
expected_output_case2,
Some(10), // Should read through both groups
false,
)
.await;
}