diff --git a/Cargo.lock b/Cargo.lock index 73ce408e50..85cbbb10ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2906,13 +2906,16 @@ version = "0.2.0" dependencies = [ "async-trait", "common-catalog", + "common-datasource", "common-error", "common-procedure", "common-procedure-test", "common-query", + "common-recordbatch", "common-telemetry", "common-test-util", "common-time", + "datafusion", "datatypes", "futures", "object-store", diff --git a/src/file-table-engine/Cargo.toml b/src/file-table-engine/Cargo.toml index 1e73b573cb..a69691a0b5 100644 --- a/src/file-table-engine/Cargo.toml +++ b/src/file-table-engine/Cargo.toml @@ -11,12 +11,15 @@ test = ["common-test-util"] [dependencies] async-trait = "0.1" common-catalog = { path = "../common/catalog" } +common-datasource = { path = "../common/datasource" } common-error = { path = "../common/error" } common-procedure = { path = "../common/procedure" } common-procedure-test = { path = "../common/procedure-test" } common-query = { path = "../common/query" } +common-recordbatch = { path = "../common/recordbatch" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } +datafusion.workspace = true datatypes = { path = "../datatypes" } futures.workspace = true object-store = { path = "../object-store" } diff --git a/src/file-table-engine/src/engine/immutable.rs b/src/file-table-engine/src/engine/immutable.rs index e37ca3e24d..0a20faa133 100644 --- a/src/file-table-engine/src/engine/immutable.rs +++ b/src/file-table-engine/src/engine/immutable.rs @@ -341,7 +341,11 @@ impl EngineInner { table_id, table_info ); - let table = Arc::new(ImmutableFileTable::new(table_info, metadata)); + let table = Arc::new( + ImmutableFileTable::new(table_info, metadata) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu)?, + ); self.tables .write() diff --git a/src/file-table-engine/src/error.rs b/src/file-table-engine/src/error.rs index c9dbfd4ddb..db172524e6 100644 --- a/src/file-table-engine/src/error.rs +++ b/src/file-table-engine/src/error.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_error::prelude::*; +use datafusion::arrow::error::ArrowError; use serde_json::error::Error as JsonError; use snafu::Location; use table::metadata::{TableInfoBuilderError, TableMetaBuilderError}; @@ -122,6 +123,48 @@ pub enum Error { #[snafu(backtrace)] source: datatypes::error::Error, }, + + #[snafu(display("Missing required field: {}", name))] + MissingRequiredField { name: String, location: Location }, + + #[snafu(display("Failed to build backend, source: {}", source))] + BuildBackend { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Unsupported file format: {}", format))] + UnsupportedFileFormat { format: String, location: Location }, + + #[snafu(display("Failed to build csv config: {}", source))] + BuildCsvConfig { + source: common_datasource::file_format::csv::CsvConfigBuilderError, + location: Location, + }, + + #[snafu(display("Failed to build stream: {}", source))] + BuildStream { + source: datafusion::error::DataFusionError, + location: Location, + }, + + #[snafu(display("Failed to project schema: {}", source))] + ProjectSchema { + source: ArrowError, + location: Location, + }, + + #[snafu(display("Failed to build stream adapter: {}", source))] + BuildStreamAdapter { + #[snafu(backtrace)] + source: common_recordbatch::error::Error, + }, + + #[snafu(display("Failed to parse file format: {}", source))] + ParseFileFormat { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, } pub type Result = std::result::Result; @@ -134,7 +177,15 @@ impl ErrorExt for Error { TableExists { .. } | BuildTableMeta { .. } | BuildTableInfo { .. } - | InvalidRawSchema { .. } => StatusCode::InvalidArguments, + | InvalidRawSchema { .. } + | UnsupportedFileFormat { .. } + | BuildCsvConfig { .. } + | ProjectSchema { .. } + | MissingRequiredField { .. } => StatusCode::InvalidArguments, + + BuildBackend { source, .. } => source.status_code(), + BuildStreamAdapter { source, .. } => source.status_code(), + ParseFileFormat { source, .. } => source.status_code(), WriteTableManifest { .. } | DeleteTableManifest { .. } @@ -145,7 +196,8 @@ impl ErrorExt for Error { | DecodeJson { .. } | ConvertRaw { .. } | DropTable { .. } - | WriteImmutableManifest { .. } => StatusCode::Unexpected, + | WriteImmutableManifest { .. } + | BuildStream { .. } => StatusCode::Unexpected, } } diff --git a/src/file-table-engine/src/table.rs b/src/file-table-engine/src/table.rs index 2eb5de637a..859c374d9a 100644 --- a/src/file-table-engine/src/table.rs +++ b/src/file-table-engine/src/table.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod format; pub mod immutable; diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs new file mode 100644 index 0000000000..d99cb5ca2b --- /dev/null +++ b/src/file-table-engine/src/table/format.rs @@ -0,0 +1,166 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener}; +use common_datasource::file_format::json::{JsonFormat, JsonOpener}; +use common_datasource::file_format::Format; +use common_query::physical_plan::PhysicalPlanRef; +use common_query::prelude::Expr; +use common_recordbatch::adapter::RecordBatchStreamAdapter; +use datafusion::arrow::datatypes::Schema; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream}; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datatypes::schema::SchemaRef; +use object_store::ObjectStore; +use snafu::ResultExt; +use table::table::scan::SimpleTableScan; + +use crate::error::{self, Result}; + +const DEFAULT_BATCH_SIZE: usize = 8192; + +#[derive(Debug, Clone, Copy, Default)] +pub struct CreateScanPlanContext {} + +fn build_csv_opener( + file_schema: Arc, + config: &ScanPlanConfig, + format: &CsvFormat, +) -> Result { + let csv_config = CsvConfigBuilder::default() + .batch_size(DEFAULT_BATCH_SIZE) + .file_schema(file_schema) + .file_projection(config.projection.cloned()) + .delimiter(format.delimiter) + .has_header(format.has_header) + .build() + .context(error::BuildCsvConfigSnafu)?; + Ok(CsvOpener::new( + csv_config, + config.store.clone(), + format.compression_type, + )) +} + +fn build_json_opener( + file_schema: Arc, + config: &ScanPlanConfig, + format: &JsonFormat, +) -> Result { + let projected_schema = if let Some(projection) = config.projection { + Arc::new( + file_schema + .project(projection) + .context(error::ProjectSchemaSnafu)?, + ) + } else { + file_schema + }; + Ok(JsonOpener::new( + DEFAULT_BATCH_SIZE, + projected_schema, + config.store.clone(), + format.compression_type, + )) +} + +fn build_scan_plan( + opener: T, + file_schema: Arc, + files: &[String], + projection: Option<&Vec>, + limit: Option, +) -> Result { + let stream = FileStream::new( + &FileScanConfig { + object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used + file_schema, + file_groups: vec![files + .iter() + .map(|filename| PartitionedFile::new(filename.to_string(), 0)) + .collect::>()], + statistics: Default::default(), + projection: projection.cloned(), + limit, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + 0, // partition: hard-code + opener, + &ExecutionPlanMetricsSet::new(), + ) + .context(error::BuildStreamSnafu)?; + let adapter = RecordBatchStreamAdapter::try_new(Box::pin(stream)) + .context(error::BuildStreamAdapterSnafu)?; + Ok(Arc::new(SimpleTableScan::new(Box::pin(adapter)))) +} + +fn new_csv_scan_plan( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + format: &CsvFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let opener = build_csv_opener(file_schema.clone(), config, format)?; + build_scan_plan( + opener, + file_schema, + config.files, + config.projection, + config.limit, + ) +} + +fn new_json_scan_plan( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + format: &JsonFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let opener = build_json_opener(file_schema.clone(), config, format)?; + build_scan_plan( + opener, + file_schema, + config.files, + config.projection, + config.limit, + ) +} + +#[derive(Debug, Clone)] +pub struct ScanPlanConfig<'a> { + pub file_schema: SchemaRef, + pub files: &'a Vec, + pub projection: Option<&'a Vec>, + pub filters: &'a [Expr], + pub limit: Option, + pub store: ObjectStore, +} + +pub fn create_physical_plan( + format: &Format, + ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, +) -> Result { + match format { + Format::Csv(format) => new_csv_scan_plan(ctx, config, format), + Format::Json(format) => new_json_scan_plan(ctx, config, format), + Format::Parquet(_) => error::UnsupportedFileFormatSnafu { format: "parquet" }.fail(), + } +} diff --git a/src/file-table-engine/src/table/immutable.rs b/src/file-table-engine/src/table/immutable.rs index 2fb7a35107..c034858bcc 100644 --- a/src/file-table-engine/src/table/immutable.rs +++ b/src/file-table-engine/src/table/immutable.rs @@ -16,26 +16,45 @@ use std::any::Any; use std::sync::Arc; use async_trait::async_trait; +use common_datasource::file_format::Format; +use common_datasource::object_store::build_backend; +use common_error::prelude::BoxedError; use common_query::physical_plan::PhysicalPlanRef; use common_query::prelude::Expr; use datatypes::schema::SchemaRef; use object_store::ObjectStore; -use snafu::ResultExt; +use serde::{Deserialize, Serialize}; +use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionNumber; -use table::error::Result as TableResult; +use table::error::{self as table_error, Result as TableResult}; use table::metadata::{RawTableInfo, TableInfo, TableInfoRef, TableType}; use table::Table; -use crate::error::{ConvertRawSnafu, Result}; +use super::format::{create_physical_plan, CreateScanPlanContext, ScanPlanConfig}; +use crate::error::{self, ConvertRawSnafu, Result}; use crate::manifest::immutable::{ read_table_manifest, write_table_manifest, ImmutableMetadata, INIT_META_VERSION, }; use crate::manifest::table_manifest_dir; +pub const IMMUTABLE_TABLE_META_KEY: &str = "IMMUTABLE_TABLE_META"; +pub const IMMUTABLE_TABLE_LOCATION_KEY: &str = "LOCATION"; +pub const IMMUTABLE_TABLE_PATTERN_KEY: &str = "PATTERN"; +pub const IMMUTABLE_TABLE_FORMAT_KEY: &str = "FORMAT"; + +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct ImmutableFileTableOptions { + pub files: Vec, +} + pub struct ImmutableFileTable { metadata: ImmutableMetadata, // currently, it's immutable table_info: Arc, + object_store: ObjectStore, + files: Vec, + format: Format, } pub type ImmutableFileTableRef = Arc; @@ -46,25 +65,40 @@ impl Table for ImmutableFileTable { self } + /// The [`SchemaRef`] before the projection. + /// It contains all the columns that may appear in the files (All missing columns should be filled NULLs). fn schema(&self) -> SchemaRef { self.table_info().meta.schema.clone() } - fn table_type(&self) -> TableType { - self.table_info().table_type - } - fn table_info(&self) -> TableInfoRef { self.table_info.clone() } + fn table_type(&self) -> TableType { + self.table_info().table_type + } + async fn scan( &self, - _projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, ) -> TableResult { - todo!() + create_physical_plan( + &self.format, + &CreateScanPlanContext::default(), + &ScanPlanConfig { + file_schema: self.schema(), + files: &self.files, + projection, + filters, + limit, + store: self.object_store.clone(), + }, + ) + .map_err(BoxedError::new) + .context(table_error::TableOperationSnafu) } async fn flush( @@ -87,11 +121,36 @@ impl ImmutableFileTable { &self.metadata } - pub(crate) fn new(table_info: TableInfo, metadata: ImmutableMetadata) -> Self { - Self { + pub(crate) fn new(table_info: TableInfo, metadata: ImmutableMetadata) -> Result { + let table_info = Arc::new(table_info); + let options = &table_info.meta.options.extra_options; + + let url = options.get(IMMUTABLE_TABLE_LOCATION_KEY).context( + error::MissingRequiredFieldSnafu { + name: IMMUTABLE_TABLE_LOCATION_KEY, + }, + )?; + + let meta = + options + .get(IMMUTABLE_TABLE_META_KEY) + .context(error::MissingRequiredFieldSnafu { + name: IMMUTABLE_TABLE_META_KEY, + })?; + + let meta: ImmutableFileTableOptions = + serde_json::from_str(meta).context(error::DecodeJsonSnafu)?; + let format = Format::try_from(options).context(error::ParseFileFormatSnafu)?; + + let object_store = build_backend(url, options).context(error::BuildBackendSnafu)?; + + Ok(Self { metadata, - table_info: Arc::new(table_info), - } + table_info, + object_store, + files: meta.files, + format, + }) } pub async fn create( @@ -113,7 +172,7 @@ impl ImmutableFileTable { ) .await?; - Ok(ImmutableFileTable::new(table_info, metadata)) + ImmutableFileTable::new(table_info, metadata) } pub(crate) async fn recover_table_info( diff --git a/src/file-table-engine/src/test_util.rs b/src/file-table-engine/src/test_util.rs index e568763d70..96923d78ae 100644 --- a/src/file-table-engine/src/test_util.rs +++ b/src/file-table-engine/src/test_util.rs @@ -28,6 +28,7 @@ use table::TableRef; use crate::config::EngineConfig; use crate::engine::immutable::ImmutableFileTableEngine; use crate::manifest::immutable::ImmutableMetadata; +use crate::table::immutable::{self, ImmutableFileTableOptions}; pub const TEST_TABLE_NAME: &str = "demo"; @@ -95,6 +96,20 @@ pub struct TestEngineComponents { } pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { + let mut table_options = TableOptions::default(); + table_options.extra_options.insert( + immutable::IMMUTABLE_TABLE_LOCATION_KEY.to_string(), + "mock_path".to_string(), + ); + table_options.extra_options.insert( + immutable::IMMUTABLE_TABLE_META_KEY.to_string(), + serde_json::to_string(&ImmutableFileTableOptions::default()).unwrap(), + ); + table_options.extra_options.insert( + immutable::IMMUTABLE_TABLE_FORMAT_KEY.to_string(), + "csv".to_string(), + ); + CreateTableRequest { id: 1, catalog_name: "greptime".to_string(), @@ -105,7 +120,7 @@ pub fn new_create_request(schema: SchemaRef) -> CreateTableRequest { region_numbers: vec![0], create_if_not_exists: true, primary_key_indices: vec![0], - table_options: TableOptions::default(), + table_options, engine: IMMUTABLE_FILE_ENGINE.to_string(), } }