From e3ac3298b1d410adcb2c0b53b00a8e06dd9a0916 Mon Sep 17 00:00:00 2001 From: gobraves Date: Fri, 21 Jul 2023 13:54:02 +0800 Subject: [PATCH] feat: add orc stream (#1981) * add orc stream #1820 * update orc stream * fix: create orcstreamadapter with opt projection * fix: license header * docs: delete comment --- src/common/datasource/src/file_format/orc.rs | 127 ++++++++++++++++-- .../datasource/src/file_format/tests.rs | 54 +++++++- src/common/datasource/tests/orc/README.md | 43 ++++++ src/file-table-engine/src/table/format.rs | 27 +++- src/frontend/src/statement/copy_table_from.rs | 2 +- tests-integration/src/tests/instance_test.rs | 80 +++++++++++ 6 files changed, 320 insertions(+), 13 deletions(-) diff --git a/src/common/datasource/src/file_format/orc.rs b/src/common/datasource/src/file_format/orc.rs index 7b9858661a..00d455c72e 100644 --- a/src/common/datasource/src/file_format/orc.rs +++ b/src/common/datasource/src/file_format/orc.rs @@ -13,19 +13,20 @@ // limitations under the License. use std::pin::Pin; +use std::sync::Arc; use std::task::{Context, Poll}; use arrow::compute::cast; -use arrow_schema::{Schema, SchemaRef}; +use arrow_schema::{ArrowError, Schema, SchemaRef}; use async_trait::async_trait; use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch; use datafusion::error::{DataFusionError, Result as DfResult}; +use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener}; use datafusion::physical_plan::RecordBatchStream; -use futures::Stream; +use futures::{Stream, StreamExt, TryStreamExt}; use object_store::ObjectStore; use orc_rust::arrow_reader::{create_arrow_schema, Cursor}; use orc_rust::async_arrow_reader::ArrowStreamReader; -pub use orc_rust::error::Error as OrcError; use orc_rust::reader::Reader; use snafu::ResultExt; use tokio::io::{AsyncRead, AsyncSeek}; @@ -62,14 +63,26 @@ pub async fn infer_orc_schema pub struct OrcArrowStreamReaderAdapter { output_schema: SchemaRef, + projection: Vec, stream: ArrowStreamReader, } impl OrcArrowStreamReaderAdapter { - pub fn new(output_schema: SchemaRef, stream: ArrowStreamReader) -> Self { + pub fn new( + output_schema: SchemaRef, + stream: ArrowStreamReader, + projection: Option>, + ) -> Self { + let projection = if let Some(projection) = projection { + projection + } else { + (0..output_schema.fields().len()).collect() + }; + Self { - stream, output_schema, + projection, + stream, } } } @@ -89,18 +102,23 @@ impl Stream for OrcArrowStrea let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx)) .map(|r| r.map_err(|e| DataFusionError::External(Box::new(e)))); + let projected_schema = self.output_schema.project(&self.projection)?; let batch = batch.map(|b| { b.and_then(|b| { - let mut columns = Vec::with_capacity(b.num_columns()); - for (idx, column) in b.columns().iter().enumerate() { - if column.data_type() != self.output_schema.field(idx).data_type() { - let output = cast(&column, self.output_schema.field(idx).data_type())?; + let mut columns = Vec::with_capacity(self.projection.len()); + for idx in self.projection.iter() { + let column = b.column(*idx); + let field = self.output_schema.field(*idx); + + if column.data_type() != field.data_type() { + let output = cast(&column, field.data_type())?; columns.push(output) } else { columns.push(column.clone()) } } - let record_batch = DfRecordBatch::try_new(self.output_schema.clone(), columns)?; + + let record_batch = DfRecordBatch::try_new(projected_schema.into(), columns)?; Ok(record_batch) }) @@ -123,3 +141,92 @@ impl FileFormat for OrcFormat { Ok(schema) } } + +#[derive(Debug, Clone)] +pub struct OrcOpener { + object_store: Arc, + output_schema: SchemaRef, + projection: Option>, +} + +impl OrcOpener { + pub fn new( + object_store: ObjectStore, + output_schema: SchemaRef, + projection: Option>, + ) -> Self { + Self { + object_store: Arc::from(object_store), + output_schema, + projection, + } + } +} + +impl FileOpener for OrcOpener { + fn open(&self, meta: FileMeta) -> DfResult { + let object_store = self.object_store.clone(); + let output_schema = self.output_schema.clone(); + let projection = self.projection.clone(); + Ok(Box::pin(async move { + let reader = object_store + .reader(meta.location().to_string().as_str()) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let stream_reader = new_orc_stream_reader(reader) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let stream = OrcArrowStreamReaderAdapter::new(output_schema, stream_reader, projection); + + let adopted = stream.map_err(|e| ArrowError::ExternalError(Box::new(e))); + Ok(adopted.boxed()) + })) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::file_format::FileFormat; + use crate::test_util::{self, format_schema, test_store}; + + fn test_data_root() -> String { + test_util::get_data_dir("tests/orc").display().to_string() + } + + #[tokio::test] + async fn test_orc_infer_schema() { + let orc = OrcFormat::default(); + let store = test_store(&test_data_root()); + let schema = orc.infer_schema(&store, "test.orc").await.unwrap(); + let formatted: Vec<_> = format_schema(schema); + + assert_eq!( + vec![ + "double_a: Float64: NULL", + "a: Float32: NULL", + "b: Boolean: NULL", + "str_direct: Utf8: NULL", + "d: Utf8: NULL", + "e: Utf8: NULL", + "f: Utf8: NULL", + "int_short_repeated: Int32: NULL", + "int_neg_short_repeated: Int32: NULL", + "int_delta: Int32: NULL", + "int_neg_delta: Int32: NULL", + "int_direct: Int32: NULL", + "int_neg_direct: Int32: NULL", + "bigint_direct: Int64: NULL", + "bigint_neg_direct: Int64: NULL", + "bigint_other: Int64: NULL", + "utf8_increase: Utf8: NULL", + "utf8_decrease: Utf8: NULL", + "timestamp_simple: Timestamp(Nanosecond, None): NULL", + "date_simple: Date32: NULL" + ], + formatted + ); + } +} diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index 6ed3c8486f..bc01abe16c 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -30,8 +30,9 @@ use crate::compression::CompressionType; use crate::error; use crate::file_format::csv::{CsvConfigBuilder, CsvOpener}; use crate::file_format::json::JsonOpener; +use crate::file_format::orc::{OrcFormat, OrcOpener}; use crate::file_format::parquet::DefaultParquetFileReaderFactory; -use crate::file_format::Format; +use crate::file_format::{FileFormat, Format}; use crate::test_util::{self, scan_config, test_basic_schema, test_store}; struct Test<'a, T: FileOpener> { @@ -193,6 +194,51 @@ async fn test_parquet_exec() { ); } +#[tokio::test] +async fn test_orc_opener() { + let root = test_util::get_data_dir("tests/orc").display().to_string(); + let store = test_store(&root); + let orc = OrcFormat::default(); + let schema = orc.infer_schema(&store, "test.orc").await.unwrap(); + let schema = Arc::new(schema); + + let orc_opener = OrcOpener::new(store.clone(), schema.clone(), None); + let path = &test_util::get_data_dir("/test.orc").display().to_string(); + + let tests = [ + Test { + config: scan_config(schema.clone(), None, path), + opener: orc_opener.clone(), + expected: vec![ + "+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+", + "| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |", + "+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+", + "| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |", + "| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |", + "| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |", + "| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |", + "| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |", + "+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+", + ], + }, + Test { + config: scan_config(schema.clone(), Some(1), path), + opener: orc_opener.clone(), + expected: vec![ + "+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+", + "| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |", + "+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+", + "| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |", + "+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+", + ], + }, + ]; + + for test in tests { + test.run().await; + } +} + #[test] fn test_format() { let value = [(FORMAT_TYPE.to_string(), "csv".to_string())] @@ -213,6 +259,12 @@ fn test_format() { assert_matches!(Format::try_from(&value).unwrap(), Format::Json(_)); + let value = [(FORMAT_TYPE.to_string(), "ORC".to_string())] + .into_iter() + .collect::>(); + + assert_matches!(Format::try_from(&value).unwrap(), Format::Orc(_)); + let value = [(FORMAT_TYPE.to_string(), "Foobar".to_string())] .into_iter() .collect::>(); diff --git a/src/common/datasource/tests/orc/README.md b/src/common/datasource/tests/orc/README.md index a6d75884d9..addae16a24 100644 --- a/src/common/datasource/tests/orc/README.md +++ b/src/common/datasource/tests/orc/README.md @@ -9,3 +9,46 @@ venv/bin/pip install -U pyorc cargo test ``` + +Schema: +``` ++------------------------+-----------------------------+-------------+ +| column_name | data_type | is_nullable | ++------------------------+-----------------------------+-------------+ +| double_a | Float64 | YES | +| a | Float32 | YES | +| b | Boolean | YES | +| str_direct | Utf8 | YES | +| d | Utf8 | YES | +| e | Utf8 | YES | +| f | Utf8 | YES | +| int_short_repeated | Int32 | YES | +| int_neg_short_repeated | Int32 | YES | +| int_delta | Int32 | YES | +| int_neg_delta | Int32 | YES | +| int_direct | Int32 | YES | +| int_neg_direct | Int32 | YES | +| bigint_direct | Int64 | YES | +| bigint_neg_direct | Int64 | YES | +| bigint_other | Int64 | YES | +| utf8_increase | Utf8 | YES | +| utf8_decrease | Utf8 | YES | +| timestamp_simple | Timestamp(Nanosecond, None) | YES | +| date_simple | Date32 | YES | ++------------------------+-----------------------------+-------------+ +``` + + +Data: +``` +"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+", +"| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |", +"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+", +"| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |", +"| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |", +"| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |", +"| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |", +"| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |", +"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+", + +``` diff --git a/src/file-table-engine/src/table/format.rs b/src/file-table-engine/src/table/format.rs index 13d8fd9f6f..811e6d78a1 100644 --- a/src/file-table-engine/src/table/format.rs +++ b/src/file-table-engine/src/table/format.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener}; use common_datasource::file_format::json::{JsonFormat, JsonOpener}; +use common_datasource::file_format::orc::{OrcFormat, OrcOpener}; use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat}; use common_datasource::file_format::Format; use common_query::prelude::Expr; @@ -85,6 +86,14 @@ fn build_json_opener( )) } +fn build_orc_opener(output_schema: Arc, config: &ScanPlanConfig) -> Result { + Ok(OrcOpener::new( + config.store.clone(), + output_schema, + config.projection.cloned(), + )) +} + fn build_record_batch_stream( opener: T, file_schema: Arc, @@ -213,6 +222,22 @@ fn new_parquet_stream_with_exec_plan( )) } +fn new_orc_stream( + _ctx: &CreateScanPlanContext, + config: &ScanPlanConfig, + _format: &OrcFormat, +) -> Result { + let file_schema = config.file_schema.arrow_schema().clone(); + let opener = build_orc_opener(file_schema.clone(), config)?; + build_record_batch_stream( + opener, + file_schema, + config.files, + config.projection, + config.limit, + ) +} + #[derive(Debug, Clone)] pub struct ScanPlanConfig<'a> { pub file_schema: SchemaRef, @@ -232,6 +257,6 @@ pub fn create_stream( Format::Csv(format) => new_csv_stream(ctx, config, format), Format::Json(format) => new_json_stream(ctx, config, format), Format::Parquet(format) => new_parquet_stream_with_exec_plan(ctx, config, format), - _ => error::UnsupportedFormatSnafu { format: *format }.fail(), + Format::Orc(format) => new_orc_stream(ctx, config, format), } } diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index 45807a08f6..041d0cb1a6 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -224,7 +224,7 @@ impl StatementExecutor { let stream = new_orc_stream_reader(reader) .await .context(error::ReadOrcSnafu)?; - let stream = OrcArrowStreamReaderAdapter::new(schema, stream); + let stream = OrcArrowStreamReaderAdapter::new(schema, stream, Some(projection)); Ok(Box::pin(stream)) } diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index e2a6a41101..3570ab533f 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -584,6 +584,86 @@ async fn test_execute_query_external_table_parquet(instance: Arc) { + let instance = instance.frontend(); + let format = "orc"; + let location = get_data_dir("../src/common/datasource/tests/orc/test.orc") + .canonicalize() + .unwrap() + .display() + .to_string(); + + let table_name = "various_type_orc"; + + let output = execute_sql( + &instance, + &format!( + r#"create external table {table_name} with (location='{location}', format='{format}');"#, + ), + ) + .await; + assert!(matches!(output, Output::AffectedRows(0))); + + let output = execute_sql(&instance, &format!("desc table {table_name};")).await; + let expect = "\ ++------------------------+---------------------+------+---------+---------------+ +| Field | Type | Null | Default | Semantic Type | ++------------------------+---------------------+------+---------+---------------+ +| double_a | Float64 | YES | | FIELD | +| a | Float32 | YES | | FIELD | +| b | Boolean | YES | | FIELD | +| str_direct | String | YES | | FIELD | +| d | String | YES | | FIELD | +| e | String | YES | | FIELD | +| f | String | YES | | FIELD | +| int_short_repeated | Int32 | YES | | FIELD | +| int_neg_short_repeated | Int32 | YES | | FIELD | +| int_delta | Int32 | YES | | FIELD | +| int_neg_delta | Int32 | YES | | FIELD | +| int_direct | Int32 | YES | | FIELD | +| int_neg_direct | Int32 | YES | | FIELD | +| bigint_direct | Int64 | YES | | FIELD | +| bigint_neg_direct | Int64 | YES | | FIELD | +| bigint_other | Int64 | YES | | FIELD | +| utf8_increase | String | YES | | FIELD | +| utf8_decrease | String | YES | | FIELD | +| timestamp_simple | TimestampNanosecond | YES | | FIELD | +| date_simple | Date | YES | | FIELD | ++------------------------+---------------------+------+---------+---------------+"; + check_output_stream(output, expect).await; + + let output = execute_sql(&instance, &format!("select * from {table_name};")).await; + let expect = "\ ++----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+ +| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple | ++----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+ +| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 | +| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 | +| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 | +| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 | +| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 | ++----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+"; + check_output_stream(output, expect).await; + + let output = execute_sql( + &instance, + &format!("select double_a,a, str_direct as c, a as another_a from {table_name};"), + ) + .await; + let expect = "\ ++----------+-----+--------+-----------+ +| double_a | a | c | another_a | ++----------+-----+--------+-----------+ +| 1.0 | 1.0 | a | 1.0 | +| 2.0 | 2.0 | cccccc | 2.0 | +| 3.0 | | | | +| 4.0 | 4.0 | ddd | 4.0 | +| 5.0 | 5.0 | ee | 5.0 | ++----------+-----+--------+-----------+"; + check_output_stream(output, expect).await; +} + #[apply(both_instances_cases)] async fn test_execute_query_external_table_csv(instance: Arc) { let instance = instance.frontend();