feat: support creating the physical plan for JSON and CSV files (#1424)

* feat: support creating the physical plan for JSON and CSV files

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* refactor(file-table-engine): use datasource Format instead
This commit is contained in:
Weny Xu
2023-04-24 11:17:11 +09:00
committed by GitHub
parent d374859e24
commit 22b5a94d02
8 changed files with 323 additions and 20 deletions

3
Cargo.lock generated
View File

@@ -2906,13 +2906,16 @@ version = "0.2.0"
dependencies = [
"async-trait",
"common-catalog",
"common-datasource",
"common-error",
"common-procedure",
"common-procedure-test",
"common-query",
"common-recordbatch",
"common-telemetry",
"common-test-util",
"common-time",
"datafusion",
"datatypes",
"futures",
"object-store",

View File

@@ -11,12 +11,15 @@ test = ["common-test-util"]
[dependencies]
async-trait = "0.1"
common-catalog = { path = "../common/catalog" }
common-datasource = { path = "../common/datasource" }
common-error = { path = "../common/error" }
common-procedure = { path = "../common/procedure" }
common-procedure-test = { path = "../common/procedure-test" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datafusion.workspace = true
datatypes = { path = "../datatypes" }
futures.workspace = true
object-store = { path = "../object-store" }

View File

@@ -341,7 +341,11 @@ impl EngineInner {
table_id, table_info
);
let table = Arc::new(ImmutableFileTable::new(table_info, metadata));
let table = Arc::new(
ImmutableFileTable::new(table_info, metadata)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)?,
);
self.tables
.write()

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use common_error::prelude::*;
use datafusion::arrow::error::ArrowError;
use serde_json::error::Error as JsonError;
use snafu::Location;
use table::metadata::{TableInfoBuilderError, TableMetaBuilderError};
@@ -122,6 +123,48 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},
#[snafu(display("Missing required field: {}", name))]
MissingRequiredField { name: String, location: Location },
#[snafu(display("Failed to build backend, source: {}", source))]
BuildBackend {
#[snafu(backtrace)]
source: common_datasource::error::Error,
},
#[snafu(display("Unsupported file format: {}", format))]
UnsupportedFileFormat { format: String, location: Location },
#[snafu(display("Failed to build csv config: {}", source))]
BuildCsvConfig {
source: common_datasource::file_format::csv::CsvConfigBuilderError,
location: Location,
},
#[snafu(display("Failed to build stream: {}", source))]
BuildStream {
source: datafusion::error::DataFusionError,
location: Location,
},
#[snafu(display("Failed to project schema: {}", source))]
ProjectSchema {
source: ArrowError,
location: Location,
},
#[snafu(display("Failed to build stream adapter: {}", source))]
BuildStreamAdapter {
#[snafu(backtrace)]
source: common_recordbatch::error::Error,
},
#[snafu(display("Failed to parse file format: {}", source))]
ParseFileFormat {
#[snafu(backtrace)]
source: common_datasource::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -134,7 +177,15 @@ impl ErrorExt for Error {
TableExists { .. }
| BuildTableMeta { .. }
| BuildTableInfo { .. }
| InvalidRawSchema { .. } => StatusCode::InvalidArguments,
| InvalidRawSchema { .. }
| UnsupportedFileFormat { .. }
| BuildCsvConfig { .. }
| ProjectSchema { .. }
| MissingRequiredField { .. } => StatusCode::InvalidArguments,
BuildBackend { source, .. } => source.status_code(),
BuildStreamAdapter { source, .. } => source.status_code(),
ParseFileFormat { source, .. } => source.status_code(),
WriteTableManifest { .. }
| DeleteTableManifest { .. }
@@ -145,7 +196,8 @@ impl ErrorExt for Error {
| DecodeJson { .. }
| ConvertRaw { .. }
| DropTable { .. }
| WriteImmutableManifest { .. } => StatusCode::Unexpected,
| WriteImmutableManifest { .. }
| BuildStream { .. } => StatusCode::Unexpected,
}
}

View File

@@ -12,4 +12,5 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod format;
pub mod immutable;

View File

@@ -0,0 +1,166 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener};
use common_datasource::file_format::json::{JsonFormat, JsonOpener};
use common_datasource::file_format::Format;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::adapter::RecordBatchStreamAdapter;
use datafusion::arrow::datatypes::Schema;
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream};
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use table::table::scan::SimpleTableScan;
use crate::error::{self, Result};
const DEFAULT_BATCH_SIZE: usize = 8192;
#[derive(Debug, Clone, Copy, Default)]
pub struct CreateScanPlanContext {}
fn build_csv_opener(
file_schema: Arc<Schema>,
config: &ScanPlanConfig,
format: &CsvFormat,
) -> Result<CsvOpener> {
let csv_config = CsvConfigBuilder::default()
.batch_size(DEFAULT_BATCH_SIZE)
.file_schema(file_schema)
.file_projection(config.projection.cloned())
.delimiter(format.delimiter)
.has_header(format.has_header)
.build()
.context(error::BuildCsvConfigSnafu)?;
Ok(CsvOpener::new(
csv_config,
config.store.clone(),
format.compression_type,
))
}
fn build_json_opener(
file_schema: Arc<Schema>,
config: &ScanPlanConfig,
format: &JsonFormat,
) -> Result<JsonOpener> {
let projected_schema = if let Some(projection) = config.projection {
Arc::new(
file_schema
.project(projection)
.context(error::ProjectSchemaSnafu)?,
)
} else {
file_schema
};
Ok(JsonOpener::new(
DEFAULT_BATCH_SIZE,
projected_schema,
config.store.clone(),
format.compression_type,
))
}
fn build_scan_plan<T: FileOpener + Send + 'static>(
opener: T,
file_schema: Arc<Schema>,
files: &[String],
projection: Option<&Vec<usize>>,
limit: Option<usize>,
) -> Result<PhysicalPlanRef> {
let stream = FileStream::new(
&FileScanConfig {
object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used
file_schema,
file_groups: vec![files
.iter()
.map(|filename| PartitionedFile::new(filename.to_string(), 0))
.collect::<Vec<_>>()],
statistics: Default::default(),
projection: projection.cloned(),
limit,
table_partition_cols: vec![],
output_ordering: None,
infinite_source: false,
},
0, // partition: hard-code
opener,
&ExecutionPlanMetricsSet::new(),
)
.context(error::BuildStreamSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream))
.context(error::BuildStreamAdapterSnafu)?;
Ok(Arc::new(SimpleTableScan::new(Box::pin(adapter))))
}
fn new_csv_scan_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
format: &CsvFormat,
) -> Result<PhysicalPlanRef> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_csv_opener(file_schema.clone(), config, format)?;
build_scan_plan(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
}
fn new_json_scan_plan(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
format: &JsonFormat,
) -> Result<PhysicalPlanRef> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_json_opener(file_schema.clone(), config, format)?;
build_scan_plan(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
}
#[derive(Debug, Clone)]
pub struct ScanPlanConfig<'a> {
pub file_schema: SchemaRef,
pub files: &'a Vec<String>,
pub projection: Option<&'a Vec<usize>>,
pub filters: &'a [Expr],
pub limit: Option<usize>,
pub store: ObjectStore,
}
pub fn create_physical_plan(
format: &Format,
ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
) -> Result<PhysicalPlanRef> {
match format {
Format::Csv(format) => new_csv_scan_plan(ctx, config, format),
Format::Json(format) => new_json_scan_plan(ctx, config, format),
Format::Parquet(_) => error::UnsupportedFileFormatSnafu { format: "parquet" }.fail(),
}
}

View File

@@ -16,26 +16,45 @@ use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use common_datasource::file_format::Format;
use common_datasource::object_store::build_backend;
use common_error::prelude::BoxedError;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use datatypes::schema::SchemaRef;
use object_store::ObjectStore;
use snafu::ResultExt;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use table::error::Result as TableResult;
use table::error::{self as table_error, Result as TableResult};
use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType};
use table::Table;
use crate::error::{ConvertRawSnafu, Result};
use super::format::{create_physical_plan, CreateScanPlanContext, ScanPlanConfig};
use crate::error::{self, ConvertRawSnafu, Result};
use crate::manifest::immutable::{
read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION,
};
use crate::manifest::table_manifest_dir;
pub const IMMUTABLE_TABLE_META_KEY: &str = "IMMUTABLE_TABLE_META";
pub const IMMUTABLE_TABLE_LOCATION_KEY: &str = "LOCATION";
pub const IMMUTABLE_TABLE_PATTERN_KEY: &str = "PATTERN";
pub const IMMUTABLE_TABLE_FORMAT_KEY: &str = "FORMAT";
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct ImmutableFileTableOptions {
pub files: Vec<String>,
}
pub struct ImmutableFileTable {
metadata: ImmutableMetadata,
// currently, it's immutable
table_info: Arc<TableInfo>,
object_store: ObjectStore,
files: Vec<String>,
format: Format,
}
pub type ImmutableFileTableRef = Arc<ImmutableFileTable>;
@@ -46,25 +65,40 @@ impl Table for ImmutableFileTable {
self
}
/// The [`SchemaRef`] before the projection.
/// It contains all the columns that may appear in the files (All missing columns should be filled NULLs).
fn schema(&self) -> SchemaRef {
self.table_info().meta.schema.clone()
}
fn table_type(&self) -> TableType {
self.table_info().table_type
}
fn table_info(&self) -> TableInfoRef {
self.table_info.clone()
}
fn table_type(&self) -> TableType {
self.table_info().table_type
}
async fn scan(
&self,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> TableResult<PhysicalPlanRef> {
todo!()
create_physical_plan(
&self.format,
&CreateScanPlanContext::default(),
&ScanPlanConfig {
file_schema: self.schema(),
files: &self.files,
projection,
filters,
limit,
store: self.object_store.clone(),
},
)
.map_err(BoxedError::new)
.context(table_error::TableOperationSnafu)
}
async fn flush(
@@ -87,11 +121,36 @@ impl ImmutableFileTable {
&self.metadata
}
pub(crate) fn new(table_info: TableInfo, metadata: ImmutableMetadata) -> Self {
Self {
pub(crate) fn new(table_info: TableInfo, metadata: ImmutableMetadata) -> Result<Self> {
let table_info = Arc::new(table_info);
let options = &table_info.meta.options.extra_options;
let url = options.get(IMMUTABLE_TABLE_LOCATION_KEY).context(
error::MissingRequiredFieldSnafu {
name: IMMUTABLE_TABLE_LOCATION_KEY,
},
)?;
let meta =
options
.get(IMMUTABLE_TABLE_META_KEY)
.context(error::MissingRequiredFieldSnafu {
name: IMMUTABLE_TABLE_META_KEY,
})?;
let meta: ImmutableFileTableOptions =
serde_json::from_str(meta).context(error::DecodeJsonSnafu)?;
let format = Format::try_from(options).context(error::ParseFileFormatSnafu)?;
let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?;
Ok(Self {
metadata,
table_info: Arc::new(table_info),
}
table_info,
object_store,
files: meta.files,
format,
})
}
pub async fn create(
@@ -113,7 +172,7 @@ impl ImmutableFileTable {
)
.await?;
Ok(ImmutableFileTable::new(table_info, metadata))
ImmutableFileTable::new(table_info, metadata)
}
pub(crate) async fn recover_table_info(

View File

@@ -28,6 +28,7 @@ use table::TableRef;
use crate::config::EngineConfig;
use crate::engine::immutable::ImmutableFileTableEngine;
use crate::manifest::immutable::ImmutableMetadata;
use crate::table::immutable::{self, ImmutableFileTableOptions};
pub const TEST_TABLE_NAME: &str = "demo";
@@ -95,6 +96,20 @@ pub struct TestEngineComponents {
}
pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
let mut table_options = TableOptions::default();
table_options.extra_options.insert(
immutable::IMMUTABLE_TABLE_LOCATION_KEY.to_string(),
"mock_path".to_string(),
);
table_options.extra_options.insert(
immutable::IMMUTABLE_TABLE_META_KEY.to_string(),
serde_json::to_string(&ImmutableFileTableOptions::default()).unwrap(),
);
table_options.extra_options.insert(
immutable::IMMUTABLE_TABLE_FORMAT_KEY.to_string(),
"csv".to_string(),
);
CreateTableRequest {
id: 1,
catalog_name: "greptime".to_string(),
@@ -105,7 +120,7 @@ pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest {
region_numbers: vec![0],
create_if_not_exists: true,
primary_key_indices: vec![0],
table_options: TableOptions::default(),
table_options,
engine: IMMUTABLE_FILE_ENGINE.to_string(),
}
}