diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index c2c14b4680..9c8e8d6ce8 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -23,7 +23,9 @@ use datafusion::error::Result as DatafusionResult; use datafusion::parquet::arrow::async_reader::AsyncFileReader; use datafusion::parquet::arrow::{ArrowWriter, parquet_to_arrow_schema}; use datafusion::parquet::errors::{ParquetError, Result as ParquetResult}; -use datafusion::parquet::file::metadata::ParquetMetaData; +use datafusion::parquet::file::metadata::{ + PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, +}; use datafusion::physical_plan::SendableRecordBatchStream; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_datasource::PartitionedFile; @@ -94,35 +96,40 @@ impl DefaultParquetFileReaderFactory { } impl ParquetFileReaderFactory for DefaultParquetFileReaderFactory { - // TODO(weny): Supports [`metadata_size_hint`]. - // The upstream has a implementation supports [`metadata_size_hint`], - // however it coupled with Box. fn create_reader( &self, _partition_index: usize, partitioned_file: PartitionedFile, - _metadata_size_hint: Option, + metadata_size_hint: Option, _metrics: &ExecutionPlanMetricsSet, ) -> DatafusionResult> { let path = partitioned_file.path().to_string(); let object_store = self.object_store.clone(); - Ok(Box::new(LazyParquetFileReader::new(object_store, path))) + Ok(Box::new(LazyParquetFileReader::new( + object_store, + path, + metadata_size_hint, + ))) } } pub struct LazyParquetFileReader { object_store: ObjectStore, reader: Option>, + file_size: Option, + metadata_size_hint: Option, path: String, } impl LazyParquetFileReader { - pub fn new(object_store: ObjectStore, path: String) -> Self { + pub fn new(object_store: ObjectStore, path: String, metadata_size_hint: Option) -> Self { LazyParquetFileReader { object_store, path, reader: None, + file_size: None, + metadata_size_hint, } } @@ -130,6 +137,7 @@ impl LazyParquetFileReader { async fn maybe_initialize(&mut self) -> result::Result<(), object_store::Error> { if self.reader.is_none() { let meta = self.object_store.stat(&self.path).await?; + self.file_size = Some(meta.content_length()); let reader = self .object_store .reader(&self.path) @@ -166,8 +174,19 @@ impl AsyncFileReader for LazyParquetFileReader { self.maybe_initialize() .await .map_err(|e| ParquetError::External(Box::new(e)))?; - // Safety: Must initialized - self.reader.as_mut().unwrap().get_metadata(options).await + + let metadata_opts = options.map(|o| o.metadata_options().clone()); + let metadata_reader = ParquetMetaDataReader::new() + .with_metadata_options(metadata_opts) + .with_page_index_policy(PageIndexPolicy::from( + options.is_some_and(|o| o.page_index()), + )) + .with_prefetch_hint(self.metadata_size_hint); + + let metadata = metadata_reader + .load_and_finish(self.reader.as_mut().unwrap(), self.file_size.unwrap()) + .await?; + Ok(Arc::new(metadata)) }) } } diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 278838b369..9b987c810b 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -288,6 +288,17 @@ pub(crate) struct FileCache { pub(crate) type FileCacheRef = Arc; impl FileCache { + /// Splits the configured total capacity between parquet and puffin caches + /// without exceeding the requested overall budget. + fn split_cache_capacities(total_capacity: u64, index_percent: u8) -> (u64, u64) { + let desired_puffin_capacity = total_capacity * u64::from(index_percent) / 100; + let min_cache_capacity = MIN_CACHE_CAPACITY.min(total_capacity / 2); + let puffin_capacity = + desired_puffin_capacity.clamp(min_cache_capacity, total_capacity - min_cache_capacity); + let parquet_capacity = total_capacity - puffin_capacity; + (parquet_capacity, puffin_capacity) + } + /// Creates a new file cache. pub(crate) fn new( local_store: ObjectStore, @@ -302,14 +313,8 @@ impl FileCache { .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT); let total_capacity = capacity.as_bytes(); - // Convert percent to ratio and calculate capacity for each cache - let index_ratio = index_percent as f64 / 100.0; - let puffin_capacity = (total_capacity as f64 * index_ratio) as u64; - let parquet_capacity = total_capacity - puffin_capacity; - - // Ensure both capacities are at least 512MB - let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY); - let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY); + let (parquet_capacity, puffin_capacity) = + Self::split_cache_capacities(total_capacity, index_percent); info!( "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}", @@ -1064,6 +1069,28 @@ mod tests { assert_eq!(data, bytes[3].as_ref()); } + #[test] + fn test_file_cache_capacity_respects_total_budget() { + let total_capacity = ReadableSize::mb(256).as_bytes(); + let (parquet_capacity, puffin_capacity) = + FileCache::split_cache_capacities(total_capacity, 20); + + assert_eq!(total_capacity, parquet_capacity + puffin_capacity); + assert_eq!(ReadableSize::mb(128).as_bytes(), parquet_capacity); + assert_eq!(ReadableSize::mb(128).as_bytes(), puffin_capacity); + } + + #[test] + fn test_file_cache_capacity_keeps_split_when_total_allows_it() { + let total_capacity = ReadableSize::gb(5).as_bytes(); + let (parquet_capacity, puffin_capacity) = + FileCache::split_cache_capacities(total_capacity, 20); + + assert_eq!(total_capacity, parquet_capacity + puffin_capacity); + assert_eq!(ReadableSize::gb(4).as_bytes(), parquet_capacity); + assert_eq!(ReadableSize::gb(1).as_bytes(), puffin_capacity); + } + #[test] fn test_cache_file_path() { let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); diff --git a/src/mito2/src/memtable/bulk.rs b/src/mito2/src/memtable/bulk.rs index 9d25d0c39f..24b2bebaa9 100644 --- a/src/mito2/src/memtable/bulk.rs +++ b/src/mito2/src/memtable/bulk.rs @@ -137,7 +137,7 @@ struct CollectedParts { /// All parts in a bulk memtable. #[derive(Default)] struct BulkParts { - /// Unordered small parts (< 1024 rows). + /// Unordered small parts. unordered_part: UnorderedPart, /// All parts (raw and encoded). parts: Vec, diff --git a/src/mito2/src/memtable/partition_tree/data.rs b/src/mito2/src/memtable/partition_tree/data.rs index a6d40bdcbf..f6e2a59bec 100644 --- a/src/mito2/src/memtable/partition_tree/data.rs +++ b/src/mito2/src/memtable/partition_tree/data.rs @@ -50,6 +50,7 @@ use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource use crate::metrics::{ PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED, }; +use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE; const PK_INDEX_COLUMN_NAME: &str = "__pk_index"; @@ -821,7 +822,11 @@ impl DataPart { /// Reads frozen data part and yields [DataBatch]es. pub fn read(&self) -> Result { match self { - DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None), + // Keep encoded memtable scans aligned with mito/DataFusion batch sizing instead of + // parquet-rs's implicit 1024-row default. + DataPart::Parquet(data_bytes) => { + DataPartReader::new(data_bytes.data.clone(), Some(DEFAULT_READ_BATCH_SIZE)) + } } } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 2ca83ca8cf..2447824ad9 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -41,9 +41,17 @@ pub mod writer; pub const PARQUET_METADATA_KEY: &str = "greptime:metadata"; /// Default batch size to read parquet files. -pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024; +/// +/// This is a runtime-only scan granularity, so we align it with DataFusion's +/// default execution batch size to reduce rebatching and concatenation in the +/// query pipeline. +pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 8 * 1024; /// Default row group size for parquet files. -pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE; +/// +/// Keep the existing persisted/on-disk default stable. It intentionally stays +/// decoupled from [`DEFAULT_READ_BATCH_SIZE`] so we can tune runtime scan +/// batching without changing the row group layout of newly written SSTs. +pub const DEFAULT_ROW_GROUP_SIZE: usize = 100 * 1024; /// Parquet write options. #[derive(Debug, Clone)] diff --git a/src/promql/src/extension_plan/absent.rs b/src/promql/src/extension_plan/absent.rs index db31a3d901..71af413029 100644 --- a/src/promql/src/extension_plan/absent.rs +++ b/src/promql/src/extension_plan/absent.rs @@ -49,9 +49,6 @@ use snafu::ResultExt; use crate::error::DeserializeSnafu; use crate::extension_plan::{Millisecond, resolve_column_name, serialize_column_index}; -/// Maximum number of rows per output batch -const ABSENT_BATCH_SIZE: usize = 8192; - #[derive(Debug, PartialEq, Eq, Hash)] pub struct Absent { start: Millisecond, @@ -390,11 +387,13 @@ impl ExecutionPlan for AbsentExec { context: Arc, ) -> DataFusionResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let batch_size = context.session_config().batch_size(); let input = self.input.execute(partition, context)?; Ok(Box::pin(AbsentStream { end: self.end, step: self.step, + batch_size, time_index_column_index: self .input .schema() @@ -407,6 +406,8 @@ impl ExecutionPlan for AbsentExec { metric: baseline_metric, // Buffer for streaming output timestamps output_timestamps: Vec::new(), + input_timestamps: Vec::new(), + input_timestamp_offset: 0, // Current timestamp in the output range output_ts_cursor: self.start, input_finished: false, @@ -441,6 +442,7 @@ impl DisplayAs for AbsentExec { pub struct AbsentStream { end: Millisecond, step: Millisecond, + batch_size: usize, time_index_column_index: usize, output_schema: SchemaRef, fake_labels: Vec<(String, String)>, @@ -448,6 +450,9 @@ pub struct AbsentStream { metric: BaselineMetrics, // Buffer for streaming output timestamps output_timestamps: Vec, + // Current input timestamps being processed incrementally. + input_timestamps: Vec, + input_timestamp_offset: usize, // Current timestamp in the output range output_ts_cursor: Millisecond, input_finished: bool, @@ -464,52 +469,53 @@ impl Stream for AbsentStream { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - if !self.input_finished { - match ready!(self.input.poll_next_unpin(cx)) { - Some(Ok(batch)) => { - let timer = std::time::Instant::now(); - if let Err(e) = self.process_input_batch(&batch) { - return Poll::Ready(Some(Err(e))); - } - self.metric.elapsed_compute().add_elapsed(timer); - - // If we have enough data for a batch, output it - if self.output_timestamps.len() >= ABSENT_BATCH_SIZE { - let timer = std::time::Instant::now(); - let result = self.flush_output_batch(); - self.metric.elapsed_compute().add_elapsed(timer); - - match result { - Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))), - Ok(None) => continue, - Err(e) => return Poll::Ready(Some(Err(e))), - } - } - } - Some(Err(e)) => return Poll::Ready(Some(Err(e))), - None => { - self.input_finished = true; - - let timer = std::time::Instant::now(); - // Process any remaining absent timestamps - if let Err(e) = self.process_remaining_absent_timestamps() { - return Poll::Ready(Some(Err(e))); - } - let result = self.flush_output_batch(); - self.metric.elapsed_compute().add_elapsed(timer); - return Poll::Ready(result.transpose()); - } + if self.has_pending_input_timestamps() { + let timer = std::time::Instant::now(); + if let Err(e) = self.process_input_batch() { + return Poll::Ready(Some(Err(e))); + } + self.metric.elapsed_compute().add_elapsed(timer); + + match self.flush_output_batch() { + Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))), + Ok(None) => continue, + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + if self.input_finished { + let timer = std::time::Instant::now(); + if let Err(e) = self.process_remaining_absent_timestamps() { + return Poll::Ready(Some(Err(e))); + } + self.metric.elapsed_compute().add_elapsed(timer); + + match self.flush_output_batch() { + Ok(Some(batch)) => return Poll::Ready(Some(Ok(batch))), + Ok(None) => return Poll::Ready(None), + Err(e) => return Poll::Ready(Some(Err(e))), + } + } + + match ready!(self.input.poll_next_unpin(cx)) { + Some(Ok(batch)) => { + let timer = std::time::Instant::now(); + if let Err(e) = self.buffer_input_timestamps(&batch) { + return Poll::Ready(Some(Err(e))); + } + self.metric.elapsed_compute().add_elapsed(timer); + } + Some(Err(e)) => return Poll::Ready(Some(Err(e))), + None => { + self.input_finished = true; } - } else { - return Poll::Ready(None); } } } } impl AbsentStream { - fn process_input_batch(&mut self, batch: &RecordBatch) -> DataFusionResult<()> { - // Extract timestamps from this batch + fn buffer_input_timestamps(&mut self, batch: &RecordBatch) -> DataFusionResult<()> { let timestamp_array = batch.column(self.time_index_column_index); let milli_ts_array = arrow::compute::cast( timestamp_array, @@ -519,29 +525,52 @@ impl AbsentStream { .as_any() .downcast_ref::() .unwrap(); + self.input_timestamps.clear(); + self.input_timestamps + .extend_from_slice(timestamp_array.values()); + self.input_timestamp_offset = 0; + Ok(()) + } + + fn has_pending_input_timestamps(&self) -> bool { + self.input_timestamp_offset < self.input_timestamps.len() + } + + fn process_input_batch(&mut self) -> DataFusionResult<()> { + while self.input_timestamp_offset < self.input_timestamps.len() { + let input_ts = self.input_timestamps[self.input_timestamp_offset]; - // Process against current output cursor position - for &input_ts in timestamp_array.values() { // Generate absent timestamps up to this input timestamp while self.output_ts_cursor < input_ts && self.output_ts_cursor <= self.end { self.output_timestamps.push(self.output_ts_cursor); self.output_ts_cursor += self.step; + + if self.output_timestamps.len() >= self.batch_size { + return Ok(()); + } } // Skip the input timestamp if it matches our cursor if self.output_ts_cursor == input_ts { self.output_ts_cursor += self.step; } + + self.input_timestamp_offset += 1; } + self.input_timestamps.clear(); + self.input_timestamp_offset = 0; Ok(()) } fn process_remaining_absent_timestamps(&mut self) -> DataFusionResult<()> { - // Generate all remaining absent timestamps (input is finished) while self.output_ts_cursor <= self.end { self.output_timestamps.push(self.output_ts_cursor); self.output_ts_cursor += self.step; + + if self.output_timestamps.len() >= self.batch_size { + return Ok(()); + } } Ok(()) } @@ -551,11 +580,16 @@ impl AbsentStream { return Ok(None); } + let timestamps = if self.output_timestamps.len() <= self.batch_size { + std::mem::take(&mut self.output_timestamps) + } else { + let remaining = self.output_timestamps.split_off(self.batch_size); + std::mem::replace(&mut self.output_timestamps, remaining) + }; + let mut columns: Vec = Vec::with_capacity(self.output_schema.fields().len()); - let num_rows = self.output_timestamps.len(); - columns.push(Arc::new(TimestampMillisecondArray::from( - self.output_timestamps.clone(), - )) as _); + let num_rows = timestamps.len(); + columns.push(Arc::new(TimestampMillisecondArray::from(timestamps)) as _); columns.push(Arc::new(Float64Array::from(vec![1.0; num_rows])) as _); for (_, value) in self.fake_labels.iter() { @@ -567,7 +601,6 @@ impl AbsentStream { let batch = RecordBatch::try_new(self.output_schema.clone(), columns)?; - self.output_timestamps.clear(); Ok(Some(batch)) } } @@ -580,7 +613,7 @@ mod tests { use datafusion::arrow::record_batch::RecordBatch; use datafusion::catalog::memory::DataSourceExec; use datafusion::datasource::memory::MemorySourceConfig; - use datafusion::prelude::SessionContext; + use datafusion::prelude::{SessionConfig, SessionContext}; use datatypes::arrow::array::{Float64Array, TimestampMillisecondArray}; use super::*; @@ -725,4 +758,146 @@ mod tests { // Should output all timestamps in range: 0, 1000, 2000 assert_eq!(output_timestamps, vec![0, 1000, 2000]); } + + #[tokio::test] + async fn test_absent_respects_session_batch_size_for_large_gap() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + + let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9])); + let value_array = Arc::new(Float64Array::from(vec![1.0])); + let batch = + RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap(); + + let memory_exec = DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + )); + + let output_schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + + let absent_exec = AbsentExec { + start: 0, + end: 10, + step: 1, + time_index_column: "timestamp".to_string(), + value_column: "value".to_string(), + fake_labels: vec![], + output_schema: output_schema.clone(), + input: Arc::new(memory_exec), + properties: Arc::new(PlanProperties::new( + EquivalenceProperties::new(output_schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + )), + metric: ExecutionPlanMetricsSet::new(), + }; + + let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3)); + let task_ctx = session_ctx.task_ctx(); + let mut stream = absent_exec.execute(0, task_ctx).unwrap(); + + let mut batch_sizes = Vec::new(); + let mut output_timestamps = Vec::new(); + while let Some(batch_result) = stream.next().await { + let batch = batch_result.unwrap(); + batch_sizes.push(batch.num_rows()); + + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ts_array.len() { + if !ts_array.is_null(i) { + output_timestamps.push(ts_array.value(i)); + } + } + } + + assert_eq!(batch_sizes, vec![3, 3, 3, 1]); + assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10]); + } + + #[tokio::test] + async fn test_absent_resumes_same_input_timestamp_after_batch_flush() { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + + let timestamp_array = Arc::new(TimestampMillisecondArray::from(vec![9])); + let value_array = Arc::new(Float64Array::from(vec![1.0])); + let batch = + RecordBatch::try_new(schema.clone(), vec![timestamp_array, value_array]).unwrap(); + + let memory_exec = DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + )); + + let output_schema = Arc::new(Schema::new(vec![ + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new("value", DataType::Float64, true), + ])); + + let absent_exec = AbsentExec { + start: 0, + end: 9, + step: 1, + time_index_column: "timestamp".to_string(), + value_column: "value".to_string(), + fake_labels: vec![], + output_schema: output_schema.clone(), + input: Arc::new(memory_exec), + properties: Arc::new(PlanProperties::new( + EquivalenceProperties::new(output_schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + )), + metric: ExecutionPlanMetricsSet::new(), + }; + + let session_ctx = SessionContext::new_with_config(SessionConfig::new().with_batch_size(3)); + let task_ctx = session_ctx.task_ctx(); + let mut stream = absent_exec.execute(0, task_ctx).unwrap(); + + let mut output_timestamps = Vec::new(); + while let Some(batch_result) = stream.next().await { + let batch = batch_result.unwrap(); + let ts_array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..ts_array.len() { + if !ts_array.is_null(i) { + output_timestamps.push(ts_array.value(i)); + } + } + } + + assert_eq!(output_timestamps, vec![0, 1, 2, 3, 4, 5, 6, 7, 8]); + } } diff --git a/src/query/src/range_select/plan.rs b/src/query/src/range_select/plan.rs index e863aaced0..d83514f435 100644 --- a/src/query/src/range_select/plan.rs +++ b/src/query/src/range_select/plan.rs @@ -836,6 +836,7 @@ impl ExecutionPlan for RangeSelectExec { context: Arc, ) -> DfResult { let baseline_metric = BaselineMetrics::new(&self.metric, partition); + let batch_size = context.session_config().batch_size(); let input = self.input.execute(partition, context)?; let schema = input.schema(); let time_index = schema @@ -852,6 +853,7 @@ impl ExecutionPlan for RangeSelectExec { .collect(), )?; Ok(Box::pin(RangeSelectStream { + batch_size, schema: self.schema.clone(), range_exec: self.range_exec.clone(), input, @@ -868,6 +870,8 @@ impl ExecutionPlan for RangeSelectExec { metric: baseline_metric, schema_project: self.schema_project.clone(), schema_before_project: self.schema_before_project.clone(), + output_batch: None, + output_batch_offset: 0, })) } @@ -881,6 +885,7 @@ impl ExecutionPlan for RangeSelectExec { } struct RangeSelectStream { + batch_size: usize, /// the schema of output column schema: SchemaRef, range_exec: Vec, @@ -907,6 +912,8 @@ struct RangeSelectStream { metric: BaselineMetrics, schema_project: Option>, schema_before_project: SchemaRef, + output_batch: Option, + output_batch_offset: usize, } #[derive(Debug)] @@ -1149,6 +1156,36 @@ impl RangeSelectStream { }; Ok(project_output) } + + fn next_output_batch(&mut self) -> DfResult> { + if self.output_batch.is_none() { + self.output_batch = Some(self.generate_output()?); + self.output_batch_offset = 0; + } + + let num_rows = self.output_batch.as_ref().unwrap().num_rows(); + if num_rows == 0 { + self.output_batch = None; + self.output_batch_offset = 0; + return Ok(None); + } + + if self.output_batch_offset == 0 && num_rows <= self.batch_size { + return Ok(self.output_batch.take()); + } + + let offset = self.output_batch_offset; + let len = (num_rows - offset).min(self.batch_size); + let batch = self.output_batch.as_ref().unwrap().slice(offset, len); + self.output_batch_offset += len; + + if self.output_batch_offset >= num_rows { + self.output_batch = None; + self.output_batch_offset = 0; + } + + Ok(Some(batch)) + } } enum ExecutionState { @@ -1191,13 +1228,19 @@ impl Stream for RangeSelectStream { } } ExecutionState::ProducingOutput => { - let result = self.generate_output(); + let result = self.next_output_batch(); return match result { // made output - Ok(batch) => { - self.exec_state = ExecutionState::Done; + Ok(Some(batch)) => { + if self.output_batch.is_none() { + self.exec_state = ExecutionState::Done; + } Poll::Ready(Some(Ok(batch))) } + Ok(None) => { + self.exec_state = ExecutionState::Done; + Poll::Ready(None) + } // error making output Err(error) => Poll::Ready(Some(Err(error))), }; @@ -1251,7 +1294,7 @@ mod test { use datafusion::prelude::SessionContext; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr::expressions::Column; - use datatypes::arrow::array::TimestampMillisecondArray; + use datatypes::arrow::array::{Float64Array, Int64Array, TimestampMillisecondArray}; use datatypes::arrow_array::StringArray; use super::*; @@ -1313,15 +1356,49 @@ mod test { )) } - async fn do_range_select_test( + fn prepare_empty_test_data(is_float: bool) -> DataSourceExec { + let schema = Arc::new(Schema::new(vec![ + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new( + "value", + if is_float { + DataType::Float64 + } else { + DataType::Int64 + }, + true, + ), + Field::new("host", DataType::Utf8, true), + ])); + let timestamp_column: Arc = + Arc::new(TimestampMillisecondArray::from(Vec::::new())) as _; + let value_column: Arc = if is_float { + Arc::new(Float64Array::from(Vec::>::new())) as _ + } else { + Arc::new(Int64Array::from(Vec::>::new())) as _ + }; + let host_column: Arc = + Arc::new(StringArray::from(Vec::>::new())) as _; + let data = RecordBatch::try_new( + schema.clone(), + vec![timestamp_column, value_column, host_column], + ) + .unwrap(); + + DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(), + )) + } + + async fn collect_range_select_test( range1: Millisecond, range2: Millisecond, align: Millisecond, fill: Option, is_float: bool, is_gap: bool, - expected: String, - ) { + batch_size: usize, + ) -> Vec { let data_type = if is_float { DataType::Float64 } else { @@ -1412,11 +1489,25 @@ mod test { .into(), range_select_exec, ); - let session_context = SessionContext::default(); + let session_context = SessionContext::new_with_config( + datafusion::execution::config::SessionConfig::new().with_batch_size(batch_size), + ); + datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx()) + .await + .unwrap() + } + + async fn do_range_select_test( + range1: Millisecond, + range2: Millisecond, + align: Millisecond, + fill: Option, + is_float: bool, + is_gap: bool, + expected: String, + ) { let result = - datafusion::physical_plan::collect(Arc::new(sort_exec), session_context.task_ctx()) - .await - .unwrap(); + collect_range_select_test(range1, range2, align, fill, is_float, is_gap, 8192).await; let result_literal = arrow::util::pretty::pretty_format_batches(&result) .unwrap() @@ -1700,6 +1791,88 @@ mod test { .await; } + #[tokio::test] + async fn range_select_respects_session_batch_size() { + let result = + collect_range_select_test(10_000, 5_000, 5_000, Some(Fill::Null), true, false, 3).await; + + let row_counts = result + .iter() + .map(|batch| batch.num_rows()) + .collect::>(); + assert_eq!(vec![3, 3, 3, 3], row_counts); + } + + #[tokio::test] + async fn range_select_skips_empty_output_batch() { + let memory_exec = Arc::new(prepare_empty_test_data(true)); + let schema = Arc::new(Schema::new(vec![ + Field::new("MIN(value)", DataType::Float64, true), + Field::new("MAX(value)", DataType::Float64, true), + Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true), + Field::new("host", DataType::Utf8, true), + ])); + let cache = Arc::new(PlanProperties::new( + EquivalenceProperties::new(schema.clone()), + Partitioning::UnknownPartitioning(1), + EmissionType::Incremental, + Boundedness::Bounded, + )); + let input_schema = memory_exec.schema().clone(); + let range_select_exec = Arc::new(RangeSelectExec { + input: memory_exec, + range_exec: vec![ + RangeFnExec { + expr: Arc::new( + AggregateExprBuilder::new( + min_max::min_udaf(), + vec![Arc::new(Column::new("value", 1))], + ) + .schema(input_schema.clone()) + .alias("MIN(value)") + .build() + .unwrap(), + ), + range: 10_000, + fill: Some(Fill::Null), + need_cast: None, + }, + RangeFnExec { + expr: Arc::new( + AggregateExprBuilder::new( + min_max::max_udaf(), + vec![Arc::new(Column::new("value", 1))], + ) + .schema(input_schema) + .alias("MAX(value)") + .build() + .unwrap(), + ), + range: 5_000, + fill: Some(Fill::Null), + need_cast: None, + }, + ], + align: 5_000, + align_to: 0, + by: vec![Arc::new(Column::new("host", 2))], + time_index: TIME_INDEX_COLUMN.to_string(), + schema: schema.clone(), + schema_before_project: schema.clone(), + schema_project: None, + by_schema: Arc::new(Schema::new(vec![Field::new("host", DataType::Utf8, true)])), + metric: ExecutionPlanMetricsSet::new(), + cache, + }); + let session_context = SessionContext::new(); + let result = + datafusion::physical_plan::collect(range_select_exec, session_context.task_ctx()) + .await + .unwrap(); + + assert!(result.is_empty()); + } + #[test] fn fill_test() { assert!(Fill::try_from_str("", &DataType::UInt8).unwrap().is_none());