feat: defensively specify limit parameter for file stream (#2517)

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-09-28 14:14:27 +08:00
committed by GitHub
parent 7edafc3407
commit 1f1d72bdb8
2 changed files with 70 additions and 53 deletions

View File

@@ -25,6 +25,7 @@ use common_query::prelude::Expr;
use common_recordbatch::error::{CastVectorSnafu, ExternalSnafu, Result as RecordBatchResult};
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use datafusion::logical_expr::utils as df_logical_expr_utils;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::VectorRef;
use futures::Stream;
@@ -160,38 +161,13 @@ impl Stream for FileToScanRegionStream {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(file_record_batch)) => {
let file_record_batch = file_record_batch?;
let scan_record_batch = if self.schema_eq(&file_record_batch) {
Ok(file_record_batch)
} else {
self.convert_record_batch(&file_record_batch)
};
if self.schema_type_match(&file_record_batch) {
return Poll::Ready(Some(Ok(file_record_batch)));
}
let file_row_count = file_record_batch.num_rows();
let scan_schema = self.scan_schema.clone();
let mut columns = Vec::with_capacity(scan_schema.num_columns());
for scan_column_schema in scan_schema.column_schemas() {
let scan_data_type = &scan_column_schema.data_type;
let file_column = file_record_batch.column_by_name(&scan_column_schema.name);
let column = if let Some(file_column) = file_column {
if &file_column.data_type() == scan_data_type {
file_column.clone()
} else {
file_column.cast(scan_data_type).context(CastVectorSnafu {
from_type: file_column.data_type(),
to_type: scan_data_type.clone(),
})?
}
} else {
Self::create_default_vector(scan_column_schema, file_row_count)
.map_err(BoxedError::new)
.context(ExternalSnafu)?
};
columns.push(column);
}
let scan_record_batch = RecordBatch::new(scan_schema, columns)?;
Poll::Ready(Some(Ok(scan_record_batch)))
Poll::Ready(Some(scan_record_batch))
}
Poll::Ready(None) => Poll::Ready(None),
}
@@ -206,7 +182,7 @@ impl FileToScanRegionStream {
}
}
fn schema_type_match(&self, file_record_batch: &RecordBatch) -> bool {
fn schema_eq(&self, file_record_batch: &RecordBatch) -> bool {
self.scan_schema
.column_schemas()
.iter()
@@ -218,6 +194,59 @@ impl FileToScanRegionStream {
})
}
/// Converts a RecordBatch from file schema to scan schema.
///
/// This function performs the following operations:
/// - Projection: Only columns present in scan schema are retained.
/// - Cast Type: Columns present in both file schema and scan schema but with different types are cast to the type in scan schema.
/// - Backfill: Columns present in scan schema but not in file schema are backfilled with default values.
fn convert_record_batch(
&self,
file_record_batch: &RecordBatch,
) -> RecordBatchResult<RecordBatch> {
let file_row_count = file_record_batch.num_rows();
let columns = self
.scan_schema
.column_schemas()
.iter()
.map(|scan_column_schema| {
let file_column = file_record_batch.column_by_name(&scan_column_schema.name);
if let Some(file_column) = file_column {
Self::cast_column_type(file_column, &scan_column_schema.data_type)
} else {
Self::backfill_column(scan_column_schema, file_row_count)
}
})
.collect::<RecordBatchResult<Vec<_>>>()?;
RecordBatch::new(self.scan_schema.clone(), columns)
}
fn cast_column_type(
source_column: &VectorRef,
target_data_type: &ConcreteDataType,
) -> RecordBatchResult<VectorRef> {
if &source_column.data_type() == target_data_type {
Ok(source_column.clone())
} else {
source_column
.cast(target_data_type)
.context(CastVectorSnafu {
from_type: source_column.data_type(),
to_type: target_data_type.clone(),
})
}
}
fn backfill_column(
column_schema: &ColumnSchema,
num_rows: usize,
) -> RecordBatchResult<VectorRef> {
Self::create_default_vector(column_schema, num_rows)
.map_err(BoxedError::new)
.context(ExternalSnafu)
}
fn create_default_vector(column_schema: &ColumnSchema, num_rows: usize) -> Result<VectorRef> {
column_schema
.create_default_vector(num_rows)

View File

@@ -133,13 +133,9 @@ fn new_csv_stream(
) -> Result<SendableRecordBatchStream> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_csv_opener(file_schema.clone(), config, format)?;
build_record_batch_stream(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
build_record_batch_stream(opener, file_schema, config.files, config.projection, limit)
}
fn new_json_stream(
@@ -149,13 +145,9 @@ fn new_json_stream(
) -> Result<SendableRecordBatchStream> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_json_opener(file_schema.clone(), config, format)?;
build_record_batch_stream(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
build_record_batch_stream(opener, file_schema, config.files, config.projection, limit)
}
fn new_parquet_stream_with_exec_plan(
@@ -229,13 +221,9 @@ fn new_orc_stream(
) -> Result<SendableRecordBatchStream> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_orc_opener(file_schema.clone(), config)?;
build_record_batch_stream(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
// push down limit only if there is no filter
let limit = config.filters.is_empty().then_some(config.limit).flatten();
build_record_batch_stream(opener, file_schema, config.files, config.projection, limit)
}
#[derive(Debug, Clone)]