feat: support to copy from orc format (#1814)

* feat: support to copy from orc format

* test: add copy from orc test

* chore: add license header

* refactor: remove unimplemented macro

* chore: apply suggestions from CR

* chore: bump orc-rust to 0.2.3
This commit is contained in:
Weny Xu
2023-06-25 15:07:16 +09:00
committed by GitHub
parent 62f660e439
commit 223cf31409
16 changed files with 361 additions and 4 deletions

2
.gitignore vendored
View File

@@ -44,3 +44,5 @@ benchmarks/data
# Vscode workspace
*.code-workspace
venv/

37
Cargo.lock generated
View File

@@ -1679,6 +1679,7 @@ dependencies = [
"derive_builder 0.12.0",
"futures",
"object-store",
"orc-rust",
"paste",
"regex",
"snafu",
@@ -3069,6 +3070,12 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fallible-streaming-iterator"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
[[package]]
name = "fastrand"
version = "1.9.0"
@@ -5983,6 +5990,27 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978aa494585d3ca4ad74929863093e87cac9790d81fe7aba2b3dc2890643a0fc"
[[package]]
name = "orc-rust"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e15d3f67795da54d9526e46b7808181ce6236d518f56ca1ee556d3a3fdd77c66"
dependencies = [
"arrow",
"bytes",
"chrono",
"fallible-streaming-iterator",
"flate2",
"futures",
"futures-util",
"lazy_static",
"paste",
"prost",
"snafu",
"tokio",
"zigzag",
]
[[package]]
name = "ordered-float"
version = "1.1.1"
@@ -11214,6 +11242,15 @@ version = "1.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a0956f1ba7c7909bfb66c2e9e4124ab6f6482560f6628b5aaeba39207c9aad9"
[[package]]
name = "zigzag"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70b40401a28d86ce16a330b863b86fd7dbee4d7c940587ab09ab8c019f9e3fdf"
dependencies = [
"num-traits",
]
[[package]]
name = "zstd"
version = "0.11.2+zstd.1.5.2"

View File

@@ -24,6 +24,7 @@ datafusion.workspace = true
derive_builder = "0.12"
futures.workspace = true
object-store = { path = "../../object-store" }
orc-rust = "0.2.3"
regex = "1.7"
snafu.workspace = true
tokio.workspace = true

View File

@@ -54,6 +54,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to build orc reader, source: {}", source))]
OrcReader {
location: Location,
source: orc_rust::error::Error,
},
#[snafu(display("Failed to read object from path: {}, source: {}", path, source))]
ReadObject {
path: String,
@@ -171,7 +177,8 @@ impl ErrorExt for Error {
| ReadRecordBatch { .. }
| WriteRecordBatch { .. }
| EncodeRecordBatch { .. }
| BufferedWriterClosed { .. } => StatusCode::Unexpected,
| BufferedWriterClosed { .. }
| OrcReader { .. } => StatusCode::Unexpected,
}
}
@@ -182,6 +189,7 @@ impl ErrorExt for Error {
fn location_opt(&self) -> Option<common_error::snafu::Location> {
use Error::*;
match self {
OrcReader { location, .. } => Some(*location),
BuildBackend { location, .. } => Some(*location),
ReadObject { location, .. } => Some(*location),
ListObjects { location, .. } => Some(*location),

View File

@@ -14,6 +14,7 @@
pub mod csv;
pub mod json;
pub mod orc;
pub mod parquet;
#[cfg(test)]
pub mod tests;
@@ -38,6 +39,7 @@ use snafu::ResultExt;
use self::csv::CsvFormat;
use self::json::JsonFormat;
use self::orc::OrcFormat;
use self::parquet::ParquetFormat;
use crate::buffered_writer::{DfRecordBatchEncoder, LazyBufferedWriter};
use crate::compression::CompressionType;
@@ -56,6 +58,7 @@ pub enum Format {
Csv(CsvFormat),
Json(JsonFormat),
Parquet(ParquetFormat),
Orc(OrcFormat),
}
impl Format {
@@ -64,6 +67,7 @@ impl Format {
Format::Csv(_) => ".csv",
Format::Json(_) => ".json",
Format::Parquet(_) => ".parquet",
&Format::Orc(_) => ".orc",
}
}
}
@@ -81,6 +85,7 @@ impl TryFrom<&HashMap<String, String>> for Format {
"CSV" => Ok(Self::Csv(CsvFormat::try_from(options)?)),
"JSON" => Ok(Self::Json(JsonFormat::try_from(options)?)),
"PARQUET" => Ok(Self::Parquet(ParquetFormat::default())),
"ORC" => Ok(Self::Orc(OrcFormat)),
_ => error::UnsupportedFormatSnafu { format: &format }.fail(),
}
}

View File

@@ -0,0 +1,102 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::pin::Pin;
use std::task::{Context, Poll};
use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::arrow::record_batch::RecordBatch as DfRecordBatch;
use datafusion::error::{DataFusionError, Result as DfResult};
use datafusion::physical_plan::RecordBatchStream;
use futures::Stream;
use object_store::ObjectStore;
use orc_rust::arrow_reader::{create_arrow_schema, Cursor};
use orc_rust::async_arrow_reader::ArrowStreamReader;
pub use orc_rust::error::Error as OrcError;
use orc_rust::reader::Reader;
use snafu::ResultExt;
use tokio::io::{AsyncRead, AsyncSeek};
use crate::error::{self, Result};
use crate::file_format::FileFormat;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct OrcFormat;
pub async fn new_orc_cursor<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Cursor<R>> {
let reader = Reader::new_async(reader)
.await
.context(error::OrcReaderSnafu)?;
let cursor = Cursor::root(reader).context(error::OrcReaderSnafu)?;
Ok(cursor)
}
pub async fn new_orc_stream_reader<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<ArrowStreamReader<R>> {
let cursor = new_orc_cursor(reader).await?;
Ok(ArrowStreamReader::new(cursor, None))
}
pub async fn infer_orc_schema<R: AsyncRead + AsyncSeek + Unpin + Send + 'static>(
reader: R,
) -> Result<Schema> {
let cursor = new_orc_cursor(reader).await?;
Ok(create_arrow_schema(&cursor))
}
pub struct OrcArrowStreamReaderAdapter<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> {
stream: ArrowStreamReader<T>,
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> OrcArrowStreamReaderAdapter<T> {
pub fn new(stream: ArrowStreamReader<T>) -> Self {
Self { stream }
}
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> RecordBatchStream
for OrcArrowStreamReaderAdapter<T>
{
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
}
impl<T: AsyncRead + AsyncSeek + Unpin + Send + 'static> Stream for OrcArrowStreamReaderAdapter<T> {
type Item = DfResult<DfRecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx))
.map(|r| r.map_err(|e| DataFusionError::External(Box::new(e))));
Poll::Ready(batch)
}
}
#[async_trait]
impl FileFormat for OrcFormat {
async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result<Schema> {
let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let schema = infer_orc_schema(reader).await?;
Ok(schema)
}
}

View File

@@ -0,0 +1,11 @@
## Generate orc data
```bash
python3 -m venv venv
venv/bin/pip install -U pip
venv/bin/pip install -U pyorc
./venv/bin/python write.py
cargo test
```

Binary file not shown.

View File

@@ -0,0 +1,103 @@
# Copyright 2023 Greptime Team
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import random
import datetime
import pyorc
data = {
"double_a": [1.0, 2.0, 3.0, 4.0, 5.0],
"a": [1.0, 2.0, None, 4.0, 5.0],
"b": [True, False, None, True, False],
"str_direct": ["a", "cccccc", None, "ddd", "ee"],
"d": ["a", "bb", None, "ccc", "ddd"],
"e": ["ddd", "cc", None, "bb", "a"],
"f": ["aaaaa", "bbbbb", None, "ccccc", "ddddd"],
"int_short_repeated": [5, 5, None, 5, 5],
"int_neg_short_repeated": [-5, -5, None, -5, -5],
"int_delta": [1, 2, None, 4, 5],
"int_neg_delta": [5, 4, None, 2, 1],
"int_direct": [1, 6, None, 3, 2],
"int_neg_direct": [-1, -6, None, -3, -2],
"bigint_direct": [1, 6, None, 3, 2],
"bigint_neg_direct": [-1, -6, None, -3, -2],
"bigint_other": [5, -5, 1, 5, 5],
"utf8_increase": ["a", "bb", "ccc", "dddd", "eeeee"],
"utf8_decrease": ["eeeee", "dddd", "ccc", "bb", "a"],
"timestamp_simple": [datetime.datetime(2023, 4, 1, 20, 15, 30, 2000), datetime.datetime.fromtimestamp(int('1629617204525777000')/1000000000), datetime.datetime(2023, 1, 1), datetime.datetime(2023, 2, 1), datetime.datetime(2023, 3, 1)],
"date_simple": [datetime.date(2023, 4, 1), datetime.date(2023, 3, 1), datetime.date(2023, 1, 1), datetime.date(2023, 2, 1), datetime.date(2023, 3, 1)]
}
def infer_schema(data):
schema = "struct<"
for key, value in data.items():
dt = type(value[0])
if dt == float:
dt = "float"
elif dt == int:
dt = "int"
elif dt == bool:
dt = "boolean"
elif dt == str:
dt = "string"
elif key.startswith("timestamp"):
dt = "timestamp"
elif key.startswith("date"):
dt = "date"
else:
print(key,value,dt)
raise NotImplementedError
if key.startswith("double"):
dt = "double"
if key.startswith("bigint"):
dt = "bigint"
schema += key + ":" + dt + ","
schema = schema[:-1] + ">"
return schema
def _write(
schema: str,
data,
file_name: str,
compression=pyorc.CompressionKind.NONE,
dict_key_size_threshold=0.0,
):
output = open(file_name, "wb")
writer = pyorc.Writer(
output,
schema,
dict_key_size_threshold=dict_key_size_threshold,
# use a small number to ensure that compression crosses value boundaries
compression_block_size=32,
compression=compression,
)
num_rows = len(list(data.values())[0])
for x in range(num_rows):
row = tuple(values[x] for values in data.values())
writer.write(row)
writer.close()
with open(file_name, "rb") as f:
reader = pyorc.Reader(f)
list(reader)
_write(
infer_schema(data),
data,
"test.orc",
)

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use common_datasource::file_format::Format;
use common_error::prelude::*;
use datafusion::arrow::error::ArrowError;
use datafusion::error::DataFusionError;
@@ -175,6 +176,9 @@ pub enum Error {
source: datatypes::error::Error,
location: Location,
},
#[snafu(display("Unsupported format: {:?}", format))]
UnsupportedFormat { format: Format, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -191,7 +195,8 @@ impl ErrorExt for Error {
| BuildCsvConfig { .. }
| ProjectSchema { .. }
| MissingRequiredField { .. }
| ConvertSchema { .. } => StatusCode::InvalidArguments,
| ConvertSchema { .. }
| UnsupportedFormat { .. } => StatusCode::InvalidArguments,
BuildBackend { source, .. } => source.status_code(),
BuildStreamAdapter { source, .. } => source.status_code(),

View File

@@ -361,6 +361,7 @@ pub fn create_physical_plan(
Format::Csv(format) => new_csv_scan_plan(ctx, config, format),
Format::Json(format) => new_json_scan_plan(ctx, config, format),
Format::Parquet(format) => new_parquet_scan_plan(ctx, config, format),
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}
@@ -373,5 +374,6 @@ pub fn create_stream(
Format::Csv(format) => new_csv_stream(ctx, config, format),
Format::Json(format) => new_json_stream(ctx, config, format),
Format::Parquet(format) => new_parquet_stream(ctx, config, format),
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}

View File

@@ -14,6 +14,7 @@
use std::any::Any;
use common_datasource::file_format::Format;
use common_error::prelude::*;
use datafusion::parquet;
use datatypes::arrow::error::ArrowError;
@@ -443,6 +444,9 @@ pub enum Error {
source: common_datasource::error::Error,
},
#[snafu(display("Unsupported format: {:?}", format))]
UnsupportedFormat { location: Location, format: Format },
#[snafu(display("Failed to parse file format, source: {}", source))]
ParseFileFormat {
#[snafu(backtrace)]
@@ -500,6 +504,12 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to read orc schema, source: {}", source))]
ReadOrc {
source: common_datasource::error::Error,
location: Location,
},
#[snafu(display("Failed to build parquet record batch stream, source: {}", source))]
BuildParquetRecordBatchStream {
location: Location,
@@ -575,7 +585,8 @@ impl ErrorExt for Error {
| Error::InvalidSchema { .. }
| Error::PrepareImmutableTable { .. }
| Error::BuildCsvConfig { .. }
| Error::ProjectSchema { .. } => StatusCode::InvalidArguments,
| Error::ProjectSchema { .. }
| Error::UnsupportedFormat { .. } => StatusCode::InvalidArguments,
Error::NotSupported { .. } => StatusCode::Unsupported,
@@ -667,7 +678,9 @@ impl ErrorExt for Error {
Error::TableScanExec { source, .. } => source.status_code(),
Error::ReadObject { .. } | Error::ReadParquet { .. } => StatusCode::StorageUnavailable,
Error::ReadObject { .. } | Error::ReadParquet { .. } | Error::ReadOrc { .. } => {
StatusCode::StorageUnavailable
}
Error::ListObjects { source }
| Error::ParseUrl { source }

View File

@@ -20,6 +20,9 @@ use async_compat::CompatExt;
use common_base::readable_size::ReadableSize;
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvOpener};
use common_datasource::file_format::json::JsonOpener;
use common_datasource::file_format::orc::{
infer_orc_schema, new_orc_stream_reader, OrcArrowStreamReaderAdapter,
};
use common_datasource::file_format::{FileFormat, Format};
use common_datasource::lister::{Lister, Source};
use common_datasource::object_store::{build_backend, parse_url};
@@ -110,6 +113,18 @@ impl StatementExecutor {
.context(error::ReadParquetSnafu)?;
Ok(builder.schema().clone())
}
Format::Orc(_) => {
let reader = object_store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let schema = infer_orc_schema(reader)
.await
.context(error::ReadOrcSnafu)?;
Ok(Arc::new(schema))
}
}
}
@@ -201,6 +216,18 @@ impl StatementExecutor {
Ok(Box::pin(ParquetRecordBatchStreamAdapter::new(upstream)))
}
Format::Orc(_) => {
let reader = object_store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?;
let stream = new_orc_stream_reader(reader)
.await
.context(error::ReadOrcSnafu)?;
let stream = OrcArrowStreamReaderAdapter::new(stream);
Ok(Box::pin(stream))
}
}
}

View File

@@ -68,6 +68,7 @@ impl StatementExecutor {
Ok(rows_copied)
}
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
}
}

View File

@@ -332,6 +332,7 @@ fn parse_immutable_file_table_format(
Format::Csv(format) => Box::new(format),
Format::Json(format) => Box::new(format),
Format::Parquet(format) => Box::new(format),
Format::Orc(format) => Box::new(format),
},
)
}

View File

@@ -1227,6 +1227,45 @@ async fn test_execute_copy_from_s3(instance: Arc<dyn MockInstance>) {
}
}
#[apply(both_instances_cases)]
async fn test_execute_copy_from_orc(instance: Arc<dyn MockInstance>) {
logging::init_default_ut_logging();
let instance = instance.frontend();
// setups
execute_sql(
&instance,
"create table demo(double_a double, a float, b boolean, str_direct string, d string, e string, f string, int_short_repeated int, int_neg_short_repeated int, int_delta int, int_neg_delta int, int_direct int, int_neg_direct int, bigint_direct bigint, bigint_neg_direct bigint, bigint_other bigint, utf8_increase string, utf8_decrease string, timestamp_simple timestamp(9) time index, date_simple date);",
)
.await;
let filepath = get_data_dir("../src/common/datasource/tests/orc/test.orc")
.canonicalize()
.unwrap()
.display()
.to_string();
let output = execute_sql(
&instance,
&format!("copy demo from '{}' WITH(FORMAT='orc');", &filepath),
)
.await;
assert!(matches!(output, Output::AffectedRows(5)));
let output = execute_sql(&instance, "select * from demo order by double_a;").await;
let expected = r#"+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| double_a | a | b | str_direct | d | e | f | int_short_repeated | int_neg_short_repeated | int_delta | int_neg_delta | int_direct | int_neg_direct | bigint_direct | bigint_neg_direct | bigint_other | utf8_increase | utf8_decrease | timestamp_simple | date_simple |
+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+
| 1.0 | 1.0 | true | a | a | ddd | aaaaa | 5 | -5 | 1 | 5 | 1 | -1 | 1 | -1 | 5 | a | eeeee | 2023-04-01T20:15:30.002 | 2023-04-01 |
| 2.0 | 2.0 | false | cccccc | bb | cc | bbbbb | 5 | -5 | 2 | 4 | 6 | -6 | 6 | -6 | -5 | bb | dddd | 2021-08-22T07:26:44.525777 | 2023-03-01 |
| 3.0 | | | | | | | | | | | | | | | 1 | ccc | ccc | 2023-01-01T00:00:00 | 2023-01-01 |
| 4.0 | 4.0 | true | ddd | ccc | bb | ccccc | 5 | -5 | 4 | 2 | 3 | -3 | 3 | -3 | 5 | dddd | bb | 2023-02-01T00:00:00 | 2023-02-01 |
| 5.0 | 5.0 | false | ee | ddd | a | ddddd | 5 | -5 | 5 | 1 | 2 | -2 | 2 | -2 | 5 | eeeee | a | 2023-03-01T00:00:00 | 2023-03-01 |
+----------+-----+-------+------------+-----+-----+-------+--------------------+------------------------+-----------+---------------+------------+----------------+---------------+-------------------+--------------+---------------+---------------+----------------------------+-------------+"#;
check_output_stream(output, expected).await;
}
#[apply(both_instances_cases)]
async fn test_cast_type_issue_1594(instance: Arc<dyn MockInstance>) {
let instance = instance.frontend();