diff --git a/src/file-engine/src/query.rs b/src/file-engine/src/query.rs index b449311606..e305376b2a 100644 --- a/src/file-engine/src/query.rs +++ b/src/file-engine/src/query.rs @@ -25,6 +25,7 @@ use common_query::prelude::Expr; use common_recordbatch::error::{CastVectorSnafu, ExternalSnafu, Result as RecordBatchResult}; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use datafusion::logical_expr::utils as df_logical_expr_utils; +use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use datatypes::vectors::VectorRef; use futures::Stream; @@ -160,38 +161,13 @@ impl Stream for FileToScanRegionStream { Poll::Pending => Poll::Pending, Poll::Ready(Some(file_record_batch)) => { let file_record_batch = file_record_batch?; + let scan_record_batch = if self.schema_eq(&file_record_batch) { + Ok(file_record_batch) + } else { + self.convert_record_batch(&file_record_batch) + }; - if self.schema_type_match(&file_record_batch) { - return Poll::Ready(Some(Ok(file_record_batch))); - } - - let file_row_count = file_record_batch.num_rows(); - let scan_schema = self.scan_schema.clone(); - let mut columns = Vec::with_capacity(scan_schema.num_columns()); - for scan_column_schema in scan_schema.column_schemas() { - let scan_data_type = &scan_column_schema.data_type; - - let file_column = file_record_batch.column_by_name(&scan_column_schema.name); - let column = if let Some(file_column) = file_column { - if &file_column.data_type() == scan_data_type { - file_column.clone() - } else { - file_column.cast(scan_data_type).context(CastVectorSnafu { - from_type: file_column.data_type(), - to_type: scan_data_type.clone(), - })? - } - } else { - Self::create_default_vector(scan_column_schema, file_row_count) - .map_err(BoxedError::new) - .context(ExternalSnafu)? - }; - - columns.push(column); - } - - let scan_record_batch = RecordBatch::new(scan_schema, columns)?; - Poll::Ready(Some(Ok(scan_record_batch))) + Poll::Ready(Some(scan_record_batch)) } Poll::Ready(None) => Poll::Ready(None), } @@ -206,7 +182,7 @@ impl FileToScanRegionStream { } } - fn schema_type_match(&self, file_record_batch: &RecordBatch) -> bool { + fn schema_eq(&self, file_record_batch: &RecordBatch) -> bool { self.scan_schema .column_schemas() .iter() @@ -218,6 +194,59 @@ impl FileToScanRegionStream { }) } + /// Converts a RecordBatch from file schema to scan schema. + /// + /// This function performs the following operations: + /// - Projection: Only columns present in scan schema are retained. + /// - Cast Type: Columns present in both file schema and scan schema but with different types are cast to the type in scan schema. + /// - Backfill: Columns present in scan schema but not in file schema are backfilled with default values. + fn convert_record_batch( + &self, + file_record_batch: &RecordBatch, + ) -> RecordBatchResult { + let file_row_count = file_record_batch.num_rows(); + let columns = self + .scan_schema + .column_schemas() + .iter() + .map(|scan_column_schema| { + let file_column = file_record_batch.column_by_name(&scan_column_schema.name); + if let Some(file_column) = file_column { + Self::cast_column_type(file_column, &scan_column_schema.data_type) + } else { + Self::backfill_column(scan_column_schema, file_row_count) + } + }) + .collect::>>()?; + + RecordBatch::new(self.scan_schema.clone(), columns) + } + + fn cast_column_type( + source_column: &VectorRef, + target_data_type: &ConcreteDataType, + ) -> RecordBatchResult { + if &source_column.data_type() == target_data_type { + Ok(source_column.clone()) + } else { + source_column + .cast(target_data_type) + .context(CastVectorSnafu { + from_type: source_column.data_type(), + to_type: target_data_type.clone(), + }) + } + } + + fn backfill_column( + column_schema: &ColumnSchema, + num_rows: usize, + ) -> RecordBatchResult { + Self::create_default_vector(column_schema, num_rows) + .map_err(BoxedError::new) + .context(ExternalSnafu) + } + fn create_default_vector(column_schema: &ColumnSchema, num_rows: usize) -> Result { column_schema .create_default_vector(num_rows) diff --git a/src/file-engine/src/query/file_stream.rs b/src/file-engine/src/query/file_stream.rs index 7b98f09cd2..105785979e 100644 --- a/src/file-engine/src/query/file_stream.rs +++ b/src/file-engine/src/query/file_stream.rs @@ -133,13 +133,9 @@ fn new_csv_stream( ) -> Result { let file_schema = config.file_schema.arrow_schema().clone(); let opener = build_csv_opener(file_schema.clone(), config, format)?; - build_record_batch_stream( - opener, - file_schema, - config.files, - config.projection, - config.limit, - ) + // push down limit only if there is no filter + let limit = config.filters.is_empty().then_some(config.limit).flatten(); + build_record_batch_stream(opener, file_schema, config.files, config.projection, limit) } fn new_json_stream( @@ -149,13 +145,9 @@ fn new_json_stream( ) -> Result { let file_schema = config.file_schema.arrow_schema().clone(); let opener = build_json_opener(file_schema.clone(), config, format)?; - build_record_batch_stream( - opener, - file_schema, - config.files, - config.projection, - config.limit, - ) + // push down limit only if there is no filter + let limit = config.filters.is_empty().then_some(config.limit).flatten(); + build_record_batch_stream(opener, file_schema, config.files, config.projection, limit) } fn new_parquet_stream_with_exec_plan( @@ -229,13 +221,9 @@ fn new_orc_stream( ) -> Result { let file_schema = config.file_schema.arrow_schema().clone(); let opener = build_orc_opener(file_schema.clone(), config)?; - build_record_batch_stream( - opener, - file_schema, - config.files, - config.projection, - config.limit, - ) + // push down limit only if there is no filter + let limit = config.filters.is_empty().then_some(config.limit).flatten(); + build_record_batch_stream(opener, file_schema, config.files, config.projection, limit) } #[derive(Debug, Clone)]