refactor: add scan_to_stream() to Table trait to postpone the stream generation (#1639)

* add scan_to_stream to Table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl parquet stream

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* reorganise adapters

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement scan_to_stream for mito table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add location info

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: table scan

* UT pass

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl project record batch

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix information schema

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove one todo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix errors generated by merge commit

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add output_ordering method to record batch stream

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix rustfmt

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* enhance error types

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Ruihang Xia
2023-05-29 20:03:47 +08:00
committed by GitHub
parent 0eaae634fa
commit b27c569ae0
34 changed files with 824 additions and 327 deletions

View File

@@ -142,7 +142,7 @@ pub enum Error {
#[snafu(display("Failed to build stream: {}", source))]
BuildStream {
source: datafusion::error::DataFusionError,
source: DataFusionError,
location: Location,
},

View File

@@ -21,21 +21,24 @@ use common_datasource::file_format::Format;
use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef};
use common_query::prelude::Expr;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::common::ToDFSchema;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_expr::execution_props::ExecutionProps;
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream, ParquetExec};
use datafusion::physical_plan::file_format::{
FileOpener, FileScanConfig, FileStream, ParquetExec, ParquetOpener,
};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::{Schema, SchemaRef};
use object_store::ObjectStore;
use snafu::ResultExt;
use table::table::scan::SimpleTableScan;
use table::table::scan::StreamScanAdapter;
use crate::error::{self, Result};
use crate::error::{self, BuildStreamSnafu, Result};
const DEFAULT_BATCH_SIZE: usize = 8192;
@@ -113,7 +116,39 @@ fn build_scan_plan<T: FileOpener + Send + 'static>(
.context(error::BuildStreamSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
.context(error::BuildStreamAdapterSnafu)?;
Ok(Arc::new(SimpleTableScan::new(Box::pin(adapter))))
Ok(Arc::new(StreamScanAdapter::new(Box::pin(adapter))))
}
fn build_record_batch_stream<T: FileOpener + Send + 'static>(
opener: T,
file_schema: Arc<ArrowSchema>,
files: &[String],
projection: Option<&Vec<usize>>,
limit: Option<usize>,
) -> Result<SendableRecordBatchStream> {
let stream = FileStream::new(
&FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema,
file_groups: vec![files
.iter()
.map(|filename| PartitionedFile::new(filename.to_string(), 0))
.collect::<Vec<_>>()],
statistics: Default::default(),
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
0, // partition: hard-code
opener,
&ExecutionPlanMetricsSet::new(),
)
.context(error::BuildStreamSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
.context(error::BuildStreamAdapterSnafu)?;
Ok(Box::pin(adapter))
}
fn new_csv_scan_plan(
@@ -132,6 +167,22 @@ fn new_csv_scan_plan(
)
}
fn new_csv_stream(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
format: &CsvFormat,
) -> Result<SendableRecordBatchStream> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_csv_opener(file_schema.clone(), config, format)?;
build_record_batch_stream(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
}
fn new_json_scan_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
@@ -148,6 +199,22 @@ fn new_json_scan_plan(
)
}
fn new_json_stream(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
format: &JsonFormat,
) -> Result<SendableRecordBatchStream> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_json_opener(file_schema.clone(), config, format)?;
build_record_batch_stream(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
}
fn new_parquet_scan_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
@@ -218,6 +285,84 @@ fn new_parquet_scan_plan(
)))
}
fn new_parquet_stream(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
_format: &ParquetFormat,
) -> Result<SendableRecordBatchStream> {
let file_schema = config.file_schema.arrow_schema().clone();
let ScanPlanConfig {
files,
projection,
limit,
filters,
store,
..
} = config;
let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema: file_schema.clone(),
file_groups: vec![files
.iter()
.map(|filename| PartitionedFile::new(filename.to_string(), 0))
.collect::<Vec<_>>()],
statistics: Default::default(),
projection: projection.cloned(),
limit: *limit,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
};
let filters = filters
.iter()
.map(|f| f.df_expr().clone())
.collect::<Vec<_>>();
let filters = if let Some(expr) = conjunction(filters) {
let df_schema = file_schema
.clone()
.to_dfschema_ref()
.context(error::ParquetScanPlanSnafu)?;
let filters = create_physical_expr(&expr, &df_schema, &file_schema, &ExecutionProps::new())
.context(error::ParquetScanPlanSnafu)?;
Some(filters)
} else {
None
};
let parquet_opener = ParquetOpener {
partition_index: 0, // partition: hard-code. This is only for statistics purpose
projection: Arc::from(projection.cloned().unwrap_or_default()),
batch_size: DEFAULT_BATCH_SIZE,
limit: *limit,
predicate: filters,
pruning_predicate: None,
page_pruning_predicate: None,
table_schema: file_schema.clone(),
metadata_size_hint: None,
metrics: ExecutionPlanMetricsSet::new(),
parquet_file_reader_factory: Arc::new(DefaultParquetFileReaderFactory::new(store.clone())),
pushdown_filters: true,
reorder_filters: true,
enable_page_index: true,
};
let stream = FileStream::new(
&scan_config,
0,
parquet_opener,
&ExecutionPlanMetricsSet::new(),
)
.context(BuildStreamSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
.context(error::BuildStreamAdapterSnafu)?;
Ok(Box::pin(adapter))
}
#[derive(Debug, Clone)]
pub struct ScanPlanConfig<'a> {
pub file_schema: SchemaRef,
@@ -239,3 +384,15 @@ pub fn create_physical_plan(
Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format),
}
}
pub fn create_stream(
format: &Format,
ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
) -> Result<SendableRecordBatchStream> {
match format {
Format::Csv(format) => new_csv_stream(ctx, config, format),
Format::Json(format) => new_json_stream(ctx, config, format),
Format::Parquet(format) => new_parquet_stream(ctx, config, format),
}
}

View File

@@ -21,15 +21,17 @@ use common_datasource::object_store::build_backend;
use common_error::prelude::BoxedError;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::SendableRecordBatchStream;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use store_api::storage::{RegionNumber, ScanRequest};
use table::error::{self as table_error, Result as TableResult};
use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType};
use table::{requests, Table};
use super::format::create_stream;
use crate::error::{self, ConvertRawSnafu, Result};
use crate::manifest::immutable::{
read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION,
@@ -96,6 +98,23 @@ impl Table for ImmutableFileTable {
.context(table_error::TableOperationSnafu)
}
async fn scan_to_stream(&self, request: ScanRequest) -> TableResult<SendableRecordBatchStream> {
create_stream(
&self.format,
&CreateScanPlanContext::default(),
&ScanPlanConfig {
file_schema: self.schema(),
files: &self.files,
projection: request.projection.as_ref(),
filters: &request.filters,
limit: request.limit,
store: self.object_store.clone(),
},
)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
async fn flush(
&self,
_region_number: Option<RegionNumber>,