feat: add orc stream (#1981)

* add orc stream #1820

* update orc stream

* fix: create orcstreamadapter with opt projection

* fix: license header

* docs: delete comment
This commit is contained in:
gobraves
2023-07-21 13:54:02 +08:00
committed by GitHub
parent 953b8a0132
commit e3ac3298b1
6 changed files with 320 additions and 13 deletions

View File

@@ -13,19 +13,20 @@
// limitations under the License.
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use arrow::compute::cast;
use arrow_schema::{Schema, SchemaRef};
use arrow_schema::{ArrowError, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener};
use datafusion::physical_plan::RecordBatchStream;
use futures::Stream;
use futures::{Stream, StreamExt, TryStreamExt};
use object_store::ObjectStore;
use orc_rust::arrow_reader::{create_arrow_schema, Cursor};
use orc_rust::async_arrow_reader::ArrowStreamReader;
pub use orc_rust::error::Error as OrcError;
use orc_rust::reader::Reader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};
@@ -62,14 +63,26 @@ pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>
pub struct OrcArrowStreamReaderAdapter<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> {
output_schema: SchemaRef,
projection: Vec<usize>,
stream: ArrowStreamReader<T>,
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> OrcArrowStreamReaderAdapter<T> {
pub fn new(output_schema: SchemaRef, stream: ArrowStreamReader<T>) -> Self {
pub fn new(
output_schema: SchemaRef,
stream: ArrowStreamReader<T>,
projection: Option<Vec<usize>>,
) -> Self {
let projection = if let Some(projection) = projection {
projection
} else {
(0..output_schema.fields().len()).collect()
};
Self {
stream,
output_schema,
projection,
stream,
}
}
}
@@ -89,18 +102,23 @@ impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream for OrcArrowStrea
let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx))
.map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
let projected_schema = self.output_schema.project(&self.projection)?;
let batch = batch.map(|b| {
b.and_then(|b| {
let mut columns = Vec::with_capacity(b.num_columns());
for (idx, column) in b.columns().iter().enumerate() {
if column.data_type() != self.output_schema.field(idx).data_type() {
let output = cast(&column, self.output_schema.field(idx).data_type())?;
let mut columns = Vec::with_capacity(self.projection.len());
for idx in self.projection.iter() {
let column = b.column(*idx);
let field = self.output_schema.field(*idx);
if column.data_type() != field.data_type() {
let output = cast(&column, field.data_type())?;
columns.push(output)
} else {
columns.push(column.clone())
}
}
let record_batch = DfRecordBatch::try_new(self.output_schema.clone(), columns)?;
let record_batch = DfRecordBatch::try_new(projected_schema.into(), columns)?;
Ok(record_batch)
})
@@ -123,3 +141,92 @@ impl FileFormat for OrcFormat {
Ok(schema)
}
}
#[derive(Debug, Clone)]
pub struct OrcOpener {
object_store: Arc<ObjectStore>,
output_schema: SchemaRef,
projection: Option<Vec<usize>>,
}
impl OrcOpener {
pub fn new(
object_store: ObjectStore,
output_schema: SchemaRef,
projection: Option<Vec<usize>>,
) -> Self {
Self {
object_store: Arc::from(object_store),
output_schema,
projection,
}
}
}
impl FileOpener for OrcOpener {
fn open(&self, meta: FileMeta) -> DfResult<FileOpenFuture> {
let object_store = self.object_store.clone();
let output_schema = self.output_schema.clone();
let projection = self.projection.clone();
Ok(Box::pin(async move {
let reader = object_store
.reader(meta.location().to_string().as_str())
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream_reader = new_orc_stream_reader(reader)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let stream = OrcArrowStreamReaderAdapter::new(output_schema, stream_reader, projection);
let adopted = stream.map_err(|e| ArrowError::ExternalError(Box::new(e)));
Ok(adopted.boxed())
}))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::file_format::FileFormat;
use crate::test_util::{self, format_schema, test_store};
fn test_data_root() -> String {
test_util::get_data_dir("tests/orc").display().to_string()
}
#[tokio::test]
async fn test_orc_infer_schema() {
let orc = OrcFormat::default();
let store = test_store(&test_data_root());
let schema = orc.infer_schema(&store, "test.orc").await.unwrap();
let formatted: Vec<_> = format_schema(schema);
assert_eq!(
vec![
"double_a: Float64: NULL",
"a: Float32: NULL",
"b: Boolean: NULL",
"str_direct: Utf8: NULL",
"d: Utf8: NULL",
"e: Utf8: NULL",
"f: Utf8: NULL",
"int_short_repeated: Int32: NULL",
"int_neg_short_repeated: Int32: NULL",
"int_delta: Int32: NULL",
"int_neg_delta: Int32: NULL",
"int_direct: Int32: NULL",
"int_neg_direct: Int32: NULL",
"bigint_direct: Int64: NULL",
"bigint_neg_direct: Int64: NULL",
"bigint_other: Int64: NULL",
"utf8_increase: Utf8: NULL",
"utf8_decrease: Utf8: NULL",
"timestamp_simple: Timestamp(Nanosecond, None): NULL",
"date_simple: Date32: NULL"
],
formatted
);
}
}

View File

@@ -30,8 +30,9 @@ use crate::compression::CompressionType;
use crate::error;
use crate::file_format::csv::{CsvConfigBuilder, CsvOpener};
use crate::file_format::json::JsonOpener;
use crate::file_format::orc::{OrcFormat, OrcOpener};
use crate::file_format::parquet::DefaultParquetFileReaderFactory;
use crate::file_format::Format;
use crate::file_format::{FileFormat, Format};
use crate::test_util::{self, scan_config, test_basic_schema, test_store};
struct Test<'a, T: FileOpener> {
@@ -193,6 +194,51 @@ async fn test_parquet_exec() {
);
}
#[tokio::test]
async fn test_orc_opener() {
let root = test_util::get_data_dir("tests/orc").display().to_string();
let store = test_store(&root);
let orc = OrcFormat::default();
let schema = orc.infer_schema(&store, "test.orc").await.unwrap();
let schema = Arc::new(schema);
let orc_opener = OrcOpener::new(store.clone(), schema.clone(), None);
let path = &test_util::get_data_dir("/test.orc").display().to_string();
let tests = [
Test {
config: scan_config(schema.clone(), None, path),
opener: orc_opener.clone(),
expected: vec![
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |",
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |",
"| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |",
"| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |",
"| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |",
"| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |",
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
],
},
Test {
config: scan_config(schema.clone(), Some(1), path),
opener: orc_opener.clone(),
expected: vec![
"+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+",
"| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |",
"+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+",
"| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |",
"+----------+-----+------+------------+---+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+-------------------------+-------------+",
],
},
];
for test in tests {
test.run().await;
}
}
#[test]
fn test_format() {
let value = [(FORMAT_TYPE.to_string(), "csv".to_string())]
@@ -213,6 +259,12 @@ fn test_format() {
assert_matches!(Format::try_from(&value).unwrap(), Format::Json(_));
let value = [(FORMAT_TYPE.to_string(), "ORC".to_string())]
.into_iter()
.collect::<HashMap<_, _>>();
assert_matches!(Format::try_from(&value).unwrap(), Format::Orc(_));
let value = [(FORMAT_TYPE.to_string(), "Foobar".to_string())]
.into_iter()
.collect::<HashMap<_, _>>();

View File

@@ -9,3 +9,46 @@ venv/bin/pip install -U pyorc
cargo test
```
Schema:
```
+------------------------+-----------------------------+-------------+
| column_name | data_type | is_nullable |
+------------------------+-----------------------------+-------------+
| double_a | Float64 | YES |
| a | Float32 | YES |
| b | Boolean | YES |
| str_direct | Utf8 | YES |
| d | Utf8 | YES |
| e | Utf8 | YES |
| f | Utf8 | YES |
| int_short_repeated | Int32 | YES |
| int_neg_short_repeated | Int32 | YES |
| int_delta | Int32 | YES |
| int_neg_delta | Int32 | YES |
| int_direct | Int32 | YES |
| int_neg_direct | Int32 | YES |
| bigint_direct | Int64 | YES |
| bigint_neg_direct | Int64 | YES |
| bigint_other | Int64 | YES |
| utf8_increase | Utf8 | YES |
| utf8_decrease | Utf8 | YES |
| timestamp_simple | Timestamp(Nanosecond, None) | YES |
| date_simple | Date32 | YES |
+------------------------+-----------------------------+-------------+
```
Data:
```
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |",
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
"| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |",
"| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |",
"| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |",
"| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |",
"| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |",
"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+",
```

View File

@@ -16,6 +16,7 @@ use std::sync::Arc;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener};
use common_datasource::file_format::json::{JsonFormat, JsonOpener};
use common_datasource::file_format::orc::{OrcFormat, OrcOpener};
use common_datasource::file_format::parquet::{DefaultParquetFileReaderFactory, ParquetFormat};
use common_datasource::file_format::Format;
use common_query::prelude::Expr;
@@ -85,6 +86,14 @@ fn build_json_opener(
))
}
fn build_orc_opener(output_schema: Arc<ArrowSchema>, config: &ScanPlanConfig) -> Result<OrcOpener> {
Ok(OrcOpener::new(
config.store.clone(),
output_schema,
config.projection.cloned(),
))
}
fn build_record_batch_stream<T: FileOpener + Send + 'static>(
opener: T,
file_schema: Arc<ArrowSchema>,
@@ -213,6 +222,22 @@ fn new_parquet_stream_with_exec_plan(
))
}
fn new_orc_stream(
_ctx: &CreateScanPlanContext,
config: &ScanPlanConfig,
_format: &OrcFormat,
) -> Result<SendableRecordBatchStream> {
let file_schema = config.file_schema.arrow_schema().clone();
let opener = build_orc_opener(file_schema.clone(), config)?;
build_record_batch_stream(
opener,
file_schema,
config.files,
config.projection,
config.limit,
)
}
#[derive(Debug, Clone)]
pub struct ScanPlanConfig<'a> {
pub file_schema: SchemaRef,
@@ -232,6 +257,6 @@ pub fn create_stream(
Format::Csv(format) => new_csv_stream(ctx, config, format),
Format::Json(format) => new_json_stream(ctx, config, format),
Format::Parquet(format) => new_parquet_stream_with_exec_plan(ctx, config, format),
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
Format::Orc(format) => new_orc_stream(ctx, config, format),
}
}

View File

@@ -224,7 +224,7 @@ impl StatementExecutor {
let stream = new_orc_stream_reader(reader)
.await
.context(error::ReadOrcSnafu)?;
let stream = OrcArrowStreamReaderAdapter::new(schema, stream);
let stream = OrcArrowStreamReaderAdapter::new(schema, stream, Some(projection));
Ok(Box::pin(stream))
}

View File

@@ -584,6 +584,86 @@ async fn test_execute_query_external_table_parquet(instance: Arc<dyn MockInstanc
check_output_stream(output, expect).await;
}
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_orc(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();
let format = "orc";
let location = get_data_dir("../src/common/datasource/tests/orc/test.orc")
.canonicalize()
.unwrap()
.display()
.to_string();
let table_name = "various_type_orc";
let output = execute_sql(
&instance,
&format!(
r#"create external table {table_name} with (location='{location}', format='{format}');"#,
),
)
.await;
assert!(matches!(output, Output::AffectedRows(0)));
let output = execute_sql(&instance, &format!("desc table {table_name};")).await;
let expect = "\
+------------------------+---------------------+------+---------+---------------+
| Field | Type | Null | Default | Semantic Type |
+------------------------+---------------------+------+---------+---------------+
| double_a | Float64 | YES | | FIELD |
| a | Float32 | YES | | FIELD |
| b | Boolean | YES | | FIELD |
| str_direct | String | YES | | FIELD |
| d | String | YES | | FIELD |
| e | String | YES | | FIELD |
| f | String | YES | | FIELD |
| int_short_repeated | Int32 | YES | | FIELD |
| int_neg_short_repeated | Int32 | YES | | FIELD |
| int_delta | Int32 | YES | | FIELD |
| int_neg_delta | Int32 | YES | | FIELD |
| int_direct | Int32 | YES | | FIELD |
| int_neg_direct | Int32 | YES | | FIELD |
| bigint_direct | Int64 | YES | | FIELD |
| bigint_neg_direct | Int64 | YES | | FIELD |
| bigint_other | Int64 | YES | | FIELD |
| utf8_increase | String | YES | | FIELD |
| utf8_decrease | String | YES | | FIELD |
| timestamp_simple | TimestampNanosecond | YES | | FIELD |
| date_simple | Date | YES | | FIELD |
+------------------------+---------------------+------+---------+---------------+";
check_output_stream(output, expect).await;
let output = execute_sql(&instance, &format!("select * from {table_name};")).await;
let expect = "\
+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |
+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |
| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |
| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |
| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |
| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |
+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+";
check_output_stream(output, expect).await;
let output = execute_sql(
&instance,
&format!("select double_a,a, str_direct as c, a as another_a from {table_name};"),
)
.await;
let expect = "\
+----------+-----+--------+-----------+
| double_a | a | c | another_a |
+----------+-----+--------+-----------+
| 1.0 | 1.0 | a | 1.0 |
| 2.0 | 2.0 | cccccc | 2.0 |
| 3.0 | | | |
| 4.0 | 4.0 | ddd | 4.0 |
| 5.0 | 5.0 | ee | 5.0 |
+----------+-----+--------+-----------+";
check_output_stream(output, expect).await;
}
#[apply(both_instances_cases)]
async fn test_execute_query_external_table_csv(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();