diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index e4e527a477..7eb82a1e60 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -27,6 +27,7 @@ pub mod executor; pub mod metrics; mod optimizer; pub mod parser; +mod part_sort; pub mod physical_wrapper; pub mod plan; pub mod planner; @@ -36,8 +37,10 @@ mod range_select; pub mod region_query; pub mod sql; pub mod stats; -mod window_sort; +pub(crate) mod window_sort; +#[cfg(test)] +pub(crate) mod test_util; #[cfg(test)] mod tests; diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs new file mode 100644 index 0000000000..965dfa9c5a --- /dev/null +++ b/src/query/src/part_sort.rs @@ -0,0 +1,545 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::compute::{concat, take_record_batch}; +use arrow_schema::SchemaRef; +use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; +use datafusion::common::arrow::compute::sort_to_indices; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion::execution::{RecordBatchStream, TaskContext}; +use datafusion::physical_plan::coalesce_batches::concat_batches; +use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, +}; +use datafusion_common::{internal_err, DataFusionError}; +use datafusion_physical_expr::PhysicalSortExpr; +use futures::Stream; +use itertools::Itertools; +use snafu::location; + +use crate::error::Result; + +/// Sort input within given PartitionRange +/// +/// Input is assumed to be segmented by empty RecordBatch, which indicates a new `PartitionRange` is starting +/// +/// and this operator will sort each partition independently within the partition. +#[derive(Debug, Clone)] +pub struct PartSortExec { + /// Physical sort expressions(that is, sort by timestamp) + expression: PhysicalSortExpr, + input: Arc, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + properties: PlanProperties, +} + +impl PartSortExec { + pub fn try_new(expression: PhysicalSortExpr, input: Arc) -> Result { + let metrics = ExecutionPlanMetricsSet::new(); + let properties = PlanProperties::new( + input.equivalence_properties().clone(), + input.output_partitioning().clone(), + input.execution_mode(), + ); + + Ok(Self { + expression, + input, + metrics, + properties, + }) + } + + pub fn to_stream( + &self, + context: Arc, + partition: usize, + ) -> datafusion_common::Result { + let input_stream: DfSendableRecordBatchStream = + self.input.execute(partition, context.clone())?; + + let df_stream = Box::pin(PartSortStream::new(context, self, input_stream)) as _; + + Ok(df_stream) + } +} + +impl DisplayAs for PartSortExec { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "PartSortExec {}", self.expression) + } +} + +impl ExecutionPlan for PartSortExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.input.schema() + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.input] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> datafusion_common::Result> { + let new_input = if let Some(first) = children.first() { + first + } else { + internal_err!("No children found")? + }; + Ok(Arc::new(Self::try_new( + self.expression.clone(), + new_input.clone(), + )?)) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> datafusion_common::Result { + self.to_stream(context, partition) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } +} + +struct PartSortStream { + /// Memory pool for this stream + reservation: MemoryReservation, + buffer: Vec, + expression: PhysicalSortExpr, + produced: usize, + input: DfSendableRecordBatchStream, + input_complete: bool, + schema: SchemaRef, +} + +impl PartSortStream { + fn new( + context: Arc, + sort: &PartSortExec, + input: DfSendableRecordBatchStream, + ) -> Self { + Self { + reservation: MemoryConsumer::new("PartSortStream".to_string()) + .register(&context.runtime_env().memory_pool), + buffer: Vec::new(), + expression: sort.expression.clone(), + produced: 0, + input, + input_complete: false, + schema: sort.input.schema(), + } + } +} + +impl PartSortStream { + /// Sort and clear the buffer and return the sorted record batch + /// + /// this function should return None if RecordBatch is empty + fn sort_buffer(&mut self) -> datafusion_common::Result> { + if self.buffer.iter().map(|r| r.num_rows()).sum::() == 0 { + return Ok(None); + } + let mut sort_columns = Vec::with_capacity(self.buffer.len()); + let mut opt = None; + for batch in self.buffer.iter() { + let sort_column = self.expression.evaluate_to_sort_column(batch)?; + opt = opt.or(sort_column.options); + sort_columns.push(sort_column.values); + } + + let sort_column = + concat(&sort_columns.iter().map(|a| a.as_ref()).collect_vec()).map_err(|e| { + DataFusionError::ArrowError( + e, + Some(format!("Fail to concat sort columns at {}", location!())), + ) + })?; + + let indices = sort_to_indices(&sort_column, opt, None).map_err(|e| { + DataFusionError::ArrowError( + e, + Some(format!("Fail to sort to indices at {}", location!())), + ) + })?; + + // reserve memory for the concat input and sorted output + let total_mem: usize = self.buffer.iter().map(|r| r.get_array_memory_size()).sum(); + self.reservation.try_grow(total_mem * 2)?; + + let full_input = concat_batches( + &self.schema, + &self.buffer, + self.buffer.iter().map(|r| r.num_rows()).sum(), + ) + .map_err(|e| { + DataFusionError::ArrowError( + e, + Some(format!( + "Fail to concat input batches when sorting at {}", + location!() + )), + ) + })?; + + let sorted = take_record_batch(&full_input, &indices).map_err(|e| { + DataFusionError::ArrowError( + e, + Some(format!( + "Fail to take result record batch when sorting at {}", + location!() + )), + ) + })?; + + // only clear after sorted for better debugging + self.buffer.clear(); + self.produced += sorted.num_rows(); + drop(full_input); + // here remove both buffer and full_input memory + self.reservation.shrink(2 * total_mem); + Ok(Some(sorted)) + } + + pub fn poll_next_inner( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + loop { + if self.input_complete { + if self.buffer.is_empty() { + return Poll::Ready(None); + } else { + return Poll::Ready(self.sort_buffer().transpose()); + } + } + let res = self.input.as_mut().poll_next(cx); + match res { + Poll::Ready(Some(Ok(batch))) => { + if batch.num_rows() == 0 { + // mark end of current PartitionRange + return Poll::Ready(self.sort_buffer().transpose()); + } + self.buffer.push(batch); + // keep polling until boundary(a empty RecordBatch) is reached + continue; + } + // input stream end, sort the buffer and return + Poll::Ready(None) => { + self.input_complete = true; + continue; + } + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Pending => return Poll::Pending, + } + } + } +} + +impl Stream for PartSortStream { + type Item = datafusion_common::Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_next_inner(cx) + } +} + +impl RecordBatchStream for PartSortStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[cfg(test)] +mod test { + use std::sync::Arc; + + use arrow::json::ArrayWriter; + use arrow_schema::{DataType, Field, Schema, SortOptions, TimeUnit}; + use common_telemetry::error; + use common_time::Timestamp; + use datafusion_physical_expr::expressions::Column; + use futures::StreamExt; + use store_api::region_engine::PartitionRange; + + use super::*; + use crate::test_util::{new_ts_array, MockInputExec}; + + #[tokio::test] + async fn fuzzy_test() { + let test_cnt = 100; + let part_cnt_bound = 100; + let range_size_bound = 100; + let range_offset_bound = 100; + let batch_cnt_bound = 20; + let batch_size_bound = 100; + + let mut rng = fastrand::Rng::new(); + rng.seed(1337); + + for case_id in 0..test_cnt { + let mut bound_val: Option = None; + let descending = rng.bool(); + let nulls_first = rng.bool(); + let opt = SortOptions { + descending, + nulls_first, + }; + let unit = match rng.u8(0..3) { + 0 => TimeUnit::Second, + 1 => TimeUnit::Millisecond, + 2 => TimeUnit::Microsecond, + _ => TimeUnit::Nanosecond, + }; + + let schema = Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit.clone(), None), + false, + )]); + let schema = Arc::new(schema); + + let mut input_ranged_data = vec![]; + let mut output_data = vec![]; + // generate each input `PartitionRange` + for part_id in 0..rng.usize(0..part_cnt_bound) { + // generate each `PartitionRange`'s timestamp range + let (start, end) = if descending { + let end = bound_val + .map(|i| i.checked_sub(rng.i64(0..range_offset_bound)).expect("Bad luck, fuzzy test generate data that will overflow, change seed and try again")) + .unwrap_or_else(|| rng.i64(..)); + bound_val = Some(end); + let start = end - rng.i64(1..range_size_bound); + let start = Timestamp::new(start, unit.clone().into()); + let end = Timestamp::new(end, unit.clone().into()); + (start, end) + } else { + let start = bound_val + .map(|i| i + rng.i64(0..range_offset_bound)) + .unwrap_or_else(|| rng.i64(..)); + bound_val = Some(start); + let end = start + rng.i64(1..range_size_bound); + let start = Timestamp::new(start, unit.clone().into()); + let end = Timestamp::new(end, unit.clone().into()); + (start, end) + }; + assert!(start < end); + + let mut sort_data = vec![]; + let mut batches = vec![]; + for _batch_idx in 0..rng.usize(1..batch_cnt_bound) { + let cnt = rng.usize(0..batch_size_bound) + 2; + let iter = 0..rng.usize(1..cnt); + let data_gen = iter + .map(|_| rng.i64(start.value()..end.value())) + .collect_vec(); + sort_data.extend(data_gen.clone()); + let arr = new_ts_array(unit.clone(), data_gen.clone()); + + let batch = DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap(); + batches.push(batch); + } + assert!(batches.iter().all(|i| i.num_rows() >= 1)); + + let range = PartitionRange { + start, + end, + num_rows: batches.iter().map(|b| b.num_rows()).sum(), + identifier: part_id, + }; + input_ranged_data.push((range, batches)); + + if descending { + sort_data.sort_by(|a, b| b.cmp(a)); + } else { + sort_data.sort(); + } + + output_data.push(sort_data); + } + + let expected_output = output_data + .into_iter() + .map(|a| { + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit.clone(), a)]) + .unwrap() + }) + .collect_vec(); + + assert!(!expected_output.is_empty()); + run_test(case_id, input_ranged_data, schema, opt, expected_output).await; + } + } + + #[tokio::test] + async fn simple_case() { + let testcases = vec![ + ( + TimeUnit::Millisecond, + vec![ + ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]), + ((5, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), + ], + false, + vec![ + vec![1, 2, 3, 4, 5, 6, 7, 8, 9], + vec![1, 2, 3, 4, 5, 6, 7, 8], + ], + ), + ( + TimeUnit::Millisecond, + vec![ + ((5, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]), + ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]), + ], + true, + vec![ + vec![9, 8, 7, 6, 5, 4, 3, 2, 1], + vec![8, 7, 6, 5, 4, 3, 2, 1], + ], + ), + ]; + + for (identifier, (unit, input_ranged_data, descending, expected_output)) in + testcases.into_iter().enumerate() + { + let schema = Schema::new(vec![Field::new( + "ts", + DataType::Timestamp(unit.clone(), None), + false, + )]); + let schema = Arc::new(schema); + let opt = SortOptions { + descending, + ..Default::default() + }; + let input_ranged_data = input_ranged_data + .into_iter() + .map(|(range, data)| { + let part = PartitionRange { + start: Timestamp::new(range.0, unit.clone().into()), + end: Timestamp::new(range.1, unit.clone().into()), + num_rows: data.iter().map(|b| b.len()).sum(), + identifier, + }; + + let batches = data + .into_iter() + .map(|b| { + let arr = new_ts_array(unit.clone(), b); + DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap() + }) + .collect_vec(); + (part, batches) + }) + .collect_vec(); + + let expected_output = expected_output + .into_iter() + .map(|a| { + DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit.clone(), a)]) + .unwrap() + }) + .collect_vec(); + + run_test(0, input_ranged_data, schema.clone(), opt, expected_output).await; + } + } + + async fn run_test( + case_id: usize, + input_ranged_data: Vec<(PartitionRange, Vec)>, + schema: SchemaRef, + opt: SortOptions, + expected_output: Vec, + ) { + let (_ranges, batches): (Vec<_>, Vec<_>) = input_ranged_data.clone().into_iter().unzip(); + + let batches = batches + .into_iter() + .flat_map(|mut cols| { + cols.push(DfRecordBatch::new_empty(schema.clone())); + cols + }) + .collect_vec(); + let mock_input = MockInputExec::new(batches, schema.clone()); + + let exec = PartSortExec::try_new( + PhysicalSortExpr { + expr: Arc::new(Column::new("ts", 0)), + options: opt, + }, + Arc::new(mock_input), + ) + .unwrap(); + + let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap(); + + let real_output = exec_stream.map(|r| r.unwrap()).collect::>().await; + + // a makeshift solution for compare large data + if real_output != expected_output { + { + let mut buf = Vec::with_capacity(10 * real_output.len()); + for batch in &real_output { + let mut rb_json: Vec = Vec::new(); + let mut writer = ArrayWriter::new(&mut rb_json); + writer.write(batch).unwrap(); + writer.finish().unwrap(); + buf.append(&mut rb_json); + buf.push(b','); + } + let buf = String::from_utf8_lossy(&buf); + error!("case_id:{case_id}, real_output: [{buf}]"); + } + { + let mut buf = Vec::with_capacity(10 * real_output.len()); + for batch in &expected_output { + let mut rb_json: Vec = Vec::new(); + let mut writer = ArrayWriter::new(&mut rb_json); + writer.write(batch).unwrap(); + writer.finish().unwrap(); + buf.append(&mut rb_json); + buf.push(b','); + } + let buf = String::from_utf8_lossy(&buf); + error!("case_id:{case_id}, expected_output: [{buf}]"); + } + panic!("case_{} failed, opt: {:?}", case_id, opt); + } + } +} diff --git a/src/query/src/test_util.rs b/src/query/src/test_util.rs new file mode 100644 index 0000000000..77a2dee1de --- /dev/null +++ b/src/query/src/test_util.rs @@ -0,0 +1,135 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow::array::{ + ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, + TimestampSecondArray, +}; +use arrow_schema::{SchemaRef, TimeUnit}; +use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; +use datafusion::execution::{RecordBatchStream, TaskContext}; +use datafusion::physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use futures::Stream; + +pub fn new_ts_array(unit: TimeUnit, arr: Vec) -> ArrayRef { + match unit { + TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter_values(arr)) as ArrayRef, + TimeUnit::Millisecond => { + Arc::new(TimestampMillisecondArray::from_iter_values(arr)) as ArrayRef + } + TimeUnit::Microsecond => { + Arc::new(TimestampMicrosecondArray::from_iter_values(arr)) as ArrayRef + } + TimeUnit::Nanosecond => { + Arc::new(TimestampNanosecondArray::from_iter_values(arr)) as ArrayRef + } + } +} + +#[derive(Debug)] +pub struct MockInputExec { + input: Vec, + schema: SchemaRef, + properties: PlanProperties, +} + +impl MockInputExec { + pub fn new(input: Vec, schema: SchemaRef) -> Self { + Self { + properties: PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + Partitioning::UnknownPartitioning(1), + ExecutionMode::Bounded, + ), + input, + schema, + } + } +} + +impl DisplayAs for MockInputExec { + fn fmt_as(&self, _t: DisplayFormatType, _f: &mut std::fmt::Formatter) -> std::fmt::Result { + unimplemented!() + } +} + +impl ExecutionPlan for MockInputExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn properties(&self) -> &PlanProperties { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> datafusion_common::Result> { + Ok(self) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> datafusion_common::Result { + let stream = MockStream { + stream: self.input.clone(), + schema: self.schema.clone(), + idx: 0, + }; + Ok(Box::pin(stream)) + } +} + +struct MockStream { + stream: Vec, + schema: SchemaRef, + idx: usize, +} + +impl Stream for MockStream { + type Item = datafusion_common::Result; + fn poll_next( + mut self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll>> { + if self.idx < self.stream.len() { + let ret = self.stream[self.idx].clone(); + self.idx += 1; + Poll::Ready(Some(Ok(ret))) + } else { + Poll::Ready(None) + } + } +} + +impl RecordBatchStream for MockStream { + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} diff --git a/src/query/src/window_sort.rs b/src/query/src/window_sort.rs index c2cc9b2389..25dd4b5fb8 100644 --- a/src/query/src/window_sort.rs +++ b/src/query/src/window_sort.rs @@ -31,6 +31,7 @@ use arrow_schema::{DataType, SchemaRef, SortOptions}; use common_error::ext::{BoxedError, PlainError}; use common_error::status_code::StatusCode; use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream}; +use common_telemetry::error; use common_time::Timestamp; use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool}; use datafusion::execution::{RecordBatchStream, TaskContext}; @@ -376,9 +377,11 @@ impl WindowedSortStream { if sort_column.options.unwrap_or_default().descending { if cur_range.end > working_range.end { + error!("Invalid range: {:?} > {:?}", cur_range, working_range); internal_err!("Current batch have data on the right side of working range, something is very wrong")?; } } else if cur_range.start < working_range.start { + error!("Invalid range: {:?} < {:?}", cur_range, working_range); internal_err!("Current batch have data on the left side of working range, something is very wrong")?; } @@ -1123,20 +1126,17 @@ mod test { use std::io::Write; use std::sync::Arc; - use arrow::array::{ - ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray, - TimestampSecondArray, - }; + use arrow::array::{ArrayRef, TimestampMillisecondArray}; use arrow::compute::concat_batches; use arrow::json::ArrayWriter; use arrow_schema::{Field, Schema, TimeUnit}; - use datafusion::physical_plan::ExecutionMode; - use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use futures::StreamExt; use pretty_assertions::assert_eq; use serde_json::json; use super::*; + use crate::test_util::{new_ts_array, MockInputExec}; + #[test] fn test_overlapping() { let testcases = [ @@ -2423,95 +2423,6 @@ mod test { } } - #[derive(Debug)] - struct MockInputExec { - input: Vec, - schema: SchemaRef, - properties: PlanProperties, - } - - impl MockInputExec { - fn new(input: Vec, schema: SchemaRef) -> Self { - Self { - properties: PlanProperties::new( - EquivalenceProperties::new(schema.clone()), - Partitioning::UnknownPartitioning(1), - ExecutionMode::Bounded, - ), - input, - schema, - } - } - } - - impl DisplayAs for MockInputExec { - fn fmt_as(&self, _t: DisplayFormatType, _f: &mut std::fmt::Formatter) -> std::fmt::Result { - unimplemented!() - } - } - - impl ExecutionPlan for MockInputExec { - fn as_any(&self) -> &dyn Any { - self - } - - fn properties(&self) -> &PlanProperties { - &self.properties - } - - fn children(&self) -> Vec<&Arc> { - vec![] - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> datafusion_common::Result> { - Ok(self) - } - - fn execute( - &self, - _partition: usize, - _context: Arc, - ) -> datafusion_common::Result { - let stream = MockStream { - stream: self.input.clone(), - schema: self.schema.clone(), - idx: 0, - }; - Ok(Box::pin(stream)) - } - } - - struct MockStream { - stream: Vec, - schema: SchemaRef, - idx: usize, - } - - impl Stream for MockStream { - type Item = datafusion_common::Result; - fn poll_next( - mut self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll>> { - if self.idx < self.stream.len() { - let ret = self.stream[self.idx].clone(); - self.idx += 1; - Poll::Ready(Some(Ok(ret))) - } else { - Poll::Ready(None) - } - } - } - - impl RecordBatchStream for MockStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } - } - #[tokio::test] async fn test_window_sort_stream() { let test_cases = [ @@ -3045,21 +2956,6 @@ mod test { } } - fn new_array(unit: TimeUnit, arr: Vec) -> ArrayRef { - match unit { - TimeUnit::Second => Arc::new(TimestampSecondArray::from_iter_values(arr)) as ArrayRef, - TimeUnit::Millisecond => { - Arc::new(TimestampMillisecondArray::from_iter_values(arr)) as ArrayRef - } - TimeUnit::Microsecond => { - Arc::new(TimestampMicrosecondArray::from_iter_values(arr)) as ArrayRef - } - TimeUnit::Nanosecond => { - Arc::new(TimestampNanosecondArray::from_iter_values(arr)) as ArrayRef - } - } - } - #[tokio::test] async fn fuzzy_ish_test_window_sort_stream() { let test_cnt = 100; @@ -3067,6 +2963,7 @@ mod test { let range_size_bound = 100; let range_offset_bound = 100; let in_range_datapoint_cnt_bound = 100; + let fetch_bound = 100; let mut rng = fastrand::Rng::new(); rng.seed(1337); @@ -3088,6 +2985,11 @@ mod test { 2 => TimeUnit::Microsecond, _ => TimeUnit::Nanosecond, }; + let fetch = if rng.bool() { + Some(rng.usize(0..fetch_bound)) + } else { + None + }; let mut input_ranged_data = vec![]; let mut output_data: Vec = vec![]; @@ -3119,7 +3021,7 @@ mod test { .sorted_by(ret_cmp_fn(descending)) .collect_vec(); output_data.extend(data_gen.clone()); - let arr = new_array(unit.clone(), data_gen); + let arr = new_ts_array(unit.clone(), data_gen); let range = PartitionRange { start, end, @@ -3130,7 +3032,10 @@ mod test { } output_data.sort_by(ret_cmp_fn(descending)); - let output_arr = new_array(unit.clone(), output_data); + if let Some(fetch) = fetch { + output_data.truncate(fetch); + } + let output_arr = new_ts_array(unit.clone(), output_data); let test_stream = TestStream::new( Column::new("ts", 0), @@ -3138,7 +3043,7 @@ mod test { descending, nulls_first: true, }, - None, + fetch, vec![Field::new( "ts", DataType::Timestamp(unit.clone(), None), @@ -3158,8 +3063,7 @@ mod test { if res_concat != expected_concat { { - let mut f_input = - std::fs::File::create(format!("case_{}_input.json", case_id)).unwrap(); + let mut f_input = std::io::stderr(); f_input.write_all(b"[").unwrap(); for (input_range, input_arr) in test_stream.input { let range_json = json!({ @@ -3181,8 +3085,7 @@ mod test { f_input.write_all(b"]").unwrap(); } { - let mut f_res = - std::fs::File::create(format!("case_{}_result.json", case_id)).unwrap(); + let mut f_res = std::io::stderr(); f_res.write_all(b"[").unwrap(); for batch in &res { let mut res_writer = ArrayWriter::new(f_res); @@ -3193,21 +3096,18 @@ mod test { } f_res.write_all(b"]").unwrap(); - let f_res_concat = - std::fs::File::create(format!("case_{}_result_concat.json", case_id)) - .unwrap(); + let f_res_concat = std::io::stderr(); let mut res_writer = ArrayWriter::new(f_res_concat); res_writer.write(&res_concat).unwrap(); res_writer.finish().unwrap(); - let f_expected = - std::fs::File::create(format!("case_{}_expected.json", case_id)).unwrap(); + let f_expected = std::io::stderr(); let mut expected_writer = ArrayWriter::new(f_expected); expected_writer.write(&expected_concat).unwrap(); expected_writer.finish().unwrap(); } panic!( - "case failed, case id: {0}, output and expected save to case_{0}_*.json file", + "case failed, case id: {0}, output and expected output to stderr", case_id ); }