refactor: remove unused code (#1913)

This commit is contained in:
Weny Xu
2023-07-10 11:06:22 +09:00
committed by GitHub
parent 195dfdc5d3
commit 00181885cc

View File

@@ -18,7 +18,6 @@ use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener
use common_datasource::file_format::json::{JsonFormat, JsonOpener};
use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat};
use common_datasource::file_format::Format;
use common_query::physical_plan::{PhysicalPlanAdapter, PhysicalPlanRef};
use common_query::prelude::Expr;
use common_query::DfPhysicalPlan;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
@@ -33,10 +32,9 @@ use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStr
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::prelude::SessionContext;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::schema::{Schema, SchemaRef};
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use table::table::scan::StreamScanAdapter;
use crate::error::{self, Result};
@@ -87,17 +85,6 @@ fn build_json_opener(
))
}
fn build_scan_plan<T: FileOpener + Send + 'static>(
opener: T,
file_schema: Arc<ArrowSchema>,
files: &[String],
projection: Option<&Vec<usize>>,
limit: Option<usize>,
) -> Result<PhysicalPlanRef> {
let adapter = build_record_batch_stream(opener, file_schema, files, projection, limit)?;
Ok(Arc::new(StreamScanAdapter::new(adapter)))
}
fn build_record_batch_stream<T: FileOpener + Send + 'static>(
opener: T,
file_schema: Arc<ArrowSchema>,
@@ -130,22 +117,6 @@ fn build_record_batch_stream<T: FileOpener + Send + 'static>(
Ok(Box::pin(adapter))
}
fn new_csv_scan_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
format: &CsvFormat,
) -> Result<PhysicalPlanRef> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_csv_opener(file_schema.clone(), config, format)?;
build_scan_plan(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
}
fn new_csv_stream(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
@@ -162,22 +133,6 @@ fn new_csv_stream(
)
}
fn new_json_scan_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
format: &JsonFormat,
) -> Result<PhysicalPlanRef> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_json_opener(file_schema.clone(), config, format)?;
build_scan_plan(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
}
fn new_json_stream(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
@@ -194,76 +149,6 @@ fn new_json_stream(
)
}
fn new_parquet_scan_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
_format: &ParquetFormat,
) -> Result<PhysicalPlanRef> {
let file_schema = config.file_schema.arrow_schema().clone();
let ScanPlanConfig {
files,
projection,
limit,
filters,
store,
..
} = config;
let scan_config = FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema: file_schema.clone(),
file_groups: vec![files
.iter()
.map(|filename| PartitionedFile::new(filename.to_string(), 0))
.collect::<Vec<_>>()],
statistics: Default::default(),
projection: projection.cloned(),
limit: *limit,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
};
let filters = filters
.iter()
.map(|f| f.df_expr().clone())
.collect::<Vec<_>>();
let filters = if let Some(expr) = conjunction(filters) {
let df_schema = file_schema
.clone()
.to_dfschema_ref()
.context(error::ParquetScanPlanSnafu)?;
let filters = create_physical_expr(&expr, &df_schema, &file_schema, &ExecutionProps::new())
.context(error::ParquetScanPlanSnafu)?;
Some(filters)
} else {
None
};
let exec = ParquetExec::new(scan_config, filters, None).with_parquet_file_reader_factory(
Arc::new(DefaultParquetFileReaderFactory::new(store.clone())),
);
let projected_schema = if let Some(projection) = config.projection {
Arc::new(
file_schema
.project(projection)
.context(error::ProjectSchemaSnafu)?,
)
} else {
file_schema
};
let schema = Schema::try_from(projected_schema).context(error::ConvertSchemaSnafu)?;
Ok(Arc::new(PhysicalPlanAdapter::new(
Arc::new(schema),
Arc::new(exec),
)))
}
fn new_parquet_stream_with_exec_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
@@ -338,19 +223,6 @@ pub struct ScanPlanConfig<'a> {
pub store: ObjectStore,
}
pub fn create_physical_plan(
format: &Format,
ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
) -> Result<PhysicalPlanRef> {
match format {
Format::Csv(format) => new_csv_scan_plan(ctx, config, format),
Format::Json(format) => new_json_scan_plan(ctx, config, format),
Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format),
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}
pub fn create_stream(
format: &Format,
ctx: &CreateScanPlanContext,