diff --git a/Cargo.lock b/Cargo.lock index 787aa7560b..5a1035c3c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1566,9 +1566,11 @@ dependencies = [ "arrow-schema", "async-compression", "async-trait", + "bytes", "common-error", "common-runtime", "datafusion", + "derive_builder 0.12.0", "futures", "object-store", "regex", diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index 0c58fe801c..cf55f9fe85 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -16,9 +16,11 @@ async-compression = { version = "0.3", features = [ "tokio", ] } async-trait.workspace = true +bytes = "1.1" common-error = { path = "../error" } common-runtime = { path = "../runtime" } datafusion.workspace = true +derive_builder = "0.12" futures.workspace = true object-store = { path = "../../object-store" } regex = "1.7" diff --git a/src/common/datasource/src/compression.rs b/src/common/datasource/src/compression.rs index 0ea12b401c..85701b0bf5 100644 --- a/src/common/datasource/src/compression.rs +++ b/src/common/datasource/src/compression.rs @@ -13,10 +13,14 @@ // limitations under the License. use std::fmt::Display; +use std::io; use std::str::FromStr; use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder}; +use bytes::Bytes; +use futures::Stream; use tokio::io::{AsyncRead, BufReader}; +use tokio_util::io::{ReaderStream, StreamReader}; use crate::error::{self, Error, Result}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] @@ -81,4 +85,25 @@ impl CompressionType { CompressionType::UNCOMPRESSED => Box::new(s), } } + + pub fn convert_stream> + Unpin + Send + 'static>( + &self, + s: T, + ) -> Box> + Send + Unpin> { + match self { + CompressionType::GZIP => { + Box::new(ReaderStream::new(GzipDecoder::new(StreamReader::new(s)))) + } + CompressionType::BZIP2 => { + Box::new(ReaderStream::new(BzDecoder::new(StreamReader::new(s)))) + } + CompressionType::XZ => { + Box::new(ReaderStream::new(XzDecoder::new(StreamReader::new(s)))) + } + CompressionType::ZSTD => { + Box::new(ReaderStream::new(ZstdDecoder::new(StreamReader::new(s)))) + } + CompressionType::UNCOMPRESSED => Box::new(s), + } + } } diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index e4d651fec9..463d17ae82 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -15,16 +15,108 @@ pub mod csv; pub mod json; pub mod parquet; +#[cfg(test)] +pub mod tests; pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; +use std::result; +use std::sync::Arc; +use std::task::Poll; + use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use arrow_schema::ArrowError; use async_trait::async_trait; +use bytes::{Buf, Bytes}; +use datafusion::error::{DataFusionError, Result as DataFusionResult}; +use datafusion::physical_plan::file_format::FileOpenFuture; +use futures::StreamExt; use object_store::ObjectStore; +use crate::compression::CompressionType; use crate::error::Result; #[async_trait] pub trait FileFormat: Send + Sync + std::fmt::Debug { async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result; } + +pub trait ArrowDecoder: Send + 'static { + /// Decode records from `buf` returning the number of bytes read. + /// + /// This method returns `Ok(0)` once `batch_size` objects have been parsed since the + /// last call to [`Self::flush`], or `buf` is exhausted. + /// + /// Any remaining bytes should be included in the next call to [`Self::decode`]. + fn decode(&mut self, buf: &[u8]) -> result::Result; + + /// Flushes the currently buffered data to a [`RecordBatch`]. + /// + /// This should only be called after [`Self::decode`] has returned `Ok(0)`, + /// otherwise may return an error if part way through decoding a record + /// + /// Returns `Ok(None)` if no buffered data. + fn flush(&mut self) -> result::Result, ArrowError>; +} + +impl ArrowDecoder for arrow::csv::reader::Decoder { + fn decode(&mut self, buf: &[u8]) -> result::Result { + self.decode(buf) + } + + fn flush(&mut self) -> result::Result, ArrowError> { + self.flush() + } +} + +impl ArrowDecoder for arrow::json::RawDecoder { + fn decode(&mut self, buf: &[u8]) -> result::Result { + self.decode(buf) + } + + fn flush(&mut self) -> result::Result, ArrowError> { + self.flush() + } +} + +pub fn open_with_decoder DataFusionResult>( + object_store: Arc, + path: String, + compression_type: CompressionType, + decoder_factory: F, +) -> DataFusionResult { + let mut decoder = decoder_factory()?; + Ok(Box::pin(async move { + let reader = object_store + .reader(&path) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let mut upstream = compression_type.convert_stream(reader).fuse(); + + let mut buffered = Bytes::new(); + + let stream = futures::stream::poll_fn(move |cx| { + loop { + if buffered.is_empty() { + if let Some(result) = futures::ready!(upstream.poll_next_unpin(cx)) { + buffered = result?; + }; + } + + let decoded = decoder.decode(buffered.as_ref())?; + + if decoded == 0 { + break; + } else { + buffered.advance(decoded); + } + } + + Poll::Ready(decoder.flush().transpose()) + }); + + Ok(stream.boxed()) + })) +} diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index 73473e41cf..07f5287cb4 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -14,17 +14,21 @@ use std::sync::Arc; +use arrow::csv; use arrow::csv::reader::infer_reader_schema as infer_csv_schema; use arrow_schema::SchemaRef; use async_trait::async_trait; use common_runtime; +use datafusion::error::Result as DataFusionResult; +use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener}; +use derive_builder::Builder; use object_store::ObjectStore; use snafu::ResultExt; use tokio_util::io::SyncIoBridge; use crate::compression::CompressionType; use crate::error::{self, Result}; -use crate::file_format::{self, FileFormat}; +use crate::file_format::{self, open_with_decoder, FileFormat}; #[derive(Debug)] pub struct CsvFormat { @@ -45,6 +49,67 @@ impl Default for CsvFormat { } } +#[derive(Debug, Clone, Builder)] +pub struct CsvConfig { + batch_size: usize, + file_schema: SchemaRef, + #[builder(default = "None")] + file_projection: Option>, + #[builder(default = "true")] + has_header: bool, + #[builder(default = "b','")] + delimiter: u8, +} + +impl CsvConfig { + fn builder(&self) -> csv::ReaderBuilder { + let mut builder = csv::ReaderBuilder::new() + .with_schema(self.file_schema.clone()) + .with_delimiter(self.delimiter) + .with_batch_size(self.batch_size) + .has_header(self.has_header); + + if let Some(proj) = &self.file_projection { + builder = builder.with_projection(proj.clone()); + } + + builder + } +} + +#[derive(Debug, Clone)] +pub struct CsvOpener { + config: Arc, + object_store: Arc, + compression_type: CompressionType, +} + +impl CsvOpener { + /// Return a new [`CsvOpener`]. The caller must ensure [`CsvConfig`].file_schema must correspond to the opening file. + pub fn new( + config: CsvConfig, + object_store: ObjectStore, + compression_type: CompressionType, + ) -> Self { + CsvOpener { + config: Arc::new(config), + object_store: Arc::new(object_store), + compression_type, + } + } +} + +impl FileOpener for CsvOpener { + fn open(&self, meta: FileMeta) -> DataFusionResult { + open_with_decoder( + self.object_store.clone(), + meta.location().to_string(), + self.compression_type, + || Ok(self.config.builder().build_decoder()), + ) + } +} + #[async_trait] impl FileFormat for CsvFormat { async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result { diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index 49c9f535ca..26a4b3d561 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -17,15 +17,18 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; +use arrow::json::RawReaderBuilder; use async_trait::async_trait; use common_runtime; +use datafusion::error::{DataFusionError, Result as DataFusionResult}; +use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener}; use object_store::ObjectStore; use snafu::ResultExt; use tokio_util::io::SyncIoBridge; use crate::compression::CompressionType; use crate::error::{self, Result}; -use crate::file_format::{self, FileFormat}; +use crate::file_format::{self, open_with_decoder, FileFormat}; #[derive(Debug)] pub struct JsonFormat { @@ -69,6 +72,47 @@ impl FileFormat for JsonFormat { } } +#[derive(Debug, Clone)] +pub struct JsonOpener { + batch_size: usize, + projected_schema: SchemaRef, + object_store: Arc, + compression_type: CompressionType, +} + +impl JsonOpener { + /// Return a new [`JsonOpener`]. Any fields not present in `projected_schema` will be ignored. + pub fn new( + batch_size: usize, + projected_schema: SchemaRef, + object_store: ObjectStore, + compression_type: CompressionType, + ) -> Self { + Self { + batch_size, + projected_schema, + object_store: Arc::new(object_store), + compression_type, + } + } +} + +impl FileOpener for JsonOpener { + fn open(&self, meta: FileMeta) -> DataFusionResult { + open_with_decoder( + self.object_store.clone(), + meta.location().to_string(), + self.compression_type, + || { + RawReaderBuilder::new(self.projected_schema.clone()) + .with_batch_size(self.batch_size) + .build_decoder() + .map_err(DataFusionError::from) + }, + ) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs new file mode 100644 index 0000000000..7df3ffd216 --- /dev/null +++ b/src/common/datasource/src/file_format/tests.rs @@ -0,0 +1,161 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::vec; + +use arrow_schema::SchemaRef; +use datafusion::assert_batches_eq; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream}; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use futures::StreamExt; + +use crate::compression::CompressionType; +use crate::file_format::csv::{CsvConfigBuilder, CsvOpener}; +use crate::file_format::json::JsonOpener; +use crate::test_util::{self, test_basic_schema, test_store}; + +fn scan_config(file_schema: SchemaRef, limit: Option, filename: &str) -> FileScanConfig { + FileScanConfig { + object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used + file_schema, + file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]], + statistics: Default::default(), + projection: None, + limit, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + } +} + +struct Test<'a, T: FileOpener> { + config: FileScanConfig, + opener: T, + expected: Vec<&'a str>, +} + +impl<'a, T: FileOpener> Test<'a, T> { + pub async fn run(self) { + let result = FileStream::new( + &self.config, + 0, + self.opener, + &ExecutionPlanMetricsSet::new(), + ) + .unwrap() + .map(|b| b.unwrap()) + .collect::>() + .await; + + assert_batches_eq!(self.expected, &result); + } +} + +#[tokio::test] +async fn test_json_opener() { + let store = test_store("/"); + + let schema = test_basic_schema(); + + let json_opener = JsonOpener::new( + 100, + schema.clone(), + store.clone(), + CompressionType::UNCOMPRESSED, + ); + + let path = &test_util::get_data_dir("tests/json/basic.json") + .display() + .to_string(); + let tests = [ + Test { + config: scan_config(schema.clone(), None, path), + opener: json_opener.clone(), + expected: vec![ + "+-----+-------+", + "| num | str |", + "+-----+-------+", + "| 5 | test |", + "| 2 | hello |", + "| 4 | foo |", + "+-----+-------+", + ], + }, + Test { + config: scan_config(schema.clone(), Some(1), path), + opener: json_opener.clone(), + expected: vec![ + "+-----+------+", + "| num | str |", + "+-----+------+", + "| 5 | test |", + "+-----+------+", + ], + }, + ]; + + for test in tests { + test.run().await; + } +} + +#[tokio::test] +async fn test_csv_opener() { + let store = test_store("/"); + + let schema = test_basic_schema(); + let path = &test_util::get_data_dir("tests/csv/basic.csv") + .display() + .to_string(); + let csv_conf = CsvConfigBuilder::default() + .batch_size(test_util::TEST_BATCH_SIZE) + .file_schema(schema.clone()) + .build() + .unwrap(); + + let csv_opener = CsvOpener::new(csv_conf, store, CompressionType::UNCOMPRESSED); + + let tests = [ + Test { + config: scan_config(schema.clone(), None, path), + opener: csv_opener.clone(), + expected: vec![ + "+-----+-------+", + "| num | str |", + "+-----+-------+", + "| 5 | test |", + "| 2 | hello |", + "| 4 | foo |", + "+-----+-------+", + ], + }, + Test { + config: scan_config(schema.clone(), Some(1), path), + opener: csv_opener.clone(), + expected: vec![ + "+-----+------+", + "| num | str |", + "+-----+------+", + "| 5 | test |", + "+-----+------+", + ], + }, + ]; + + for test in tests { + test.run().await; + } +} diff --git a/src/common/datasource/src/lib.rs b/src/common/datasource/src/lib.rs index fc41d18cf9..458516cd42 100644 --- a/src/common/datasource/src/lib.rs +++ b/src/common/datasource/src/lib.rs @@ -17,5 +17,6 @@ pub mod error; pub mod file_format; pub mod lister; pub mod object_store; +#[cfg(test)] pub mod test_util; pub mod util; diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index 3e65022e10..fcf735d175 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -13,11 +13,14 @@ // limitations under the License. use std::path::PathBuf; +use std::sync::Arc; -use arrow_schema::SchemaRef; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; use object_store::services::Fs; use object_store::ObjectStore; +pub const TEST_BATCH_SIZE: usize = 100; + pub fn get_data_dir(path: &str) -> PathBuf { // https://doc.rust-lang.org/cargo/reference/environment-variables.html let dir = env!("CARGO_MANIFEST_DIR"); @@ -46,3 +49,11 @@ pub fn test_store(root: &str) -> ObjectStore { ObjectStore::new(builder).unwrap().finish() } + +pub fn test_basic_schema() -> SchemaRef { + let schema = Schema::new(vec![ + Field::new("num", DataType::Int64, false), + Field::new("str", DataType::Utf8, false), + ]); + Arc::new(schema) +} diff --git a/src/common/datasource/tests/json/basic.json b/src/common/datasource/tests/json/basic.json new file mode 100644 index 0000000000..8abf0d6f83 --- /dev/null +++ b/src/common/datasource/tests/json/basic.json @@ -0,0 +1,3 @@ +{"num":5,"str":"test"} +{"num":2,"str":"hello"} +{"num":4,"str":"foo"} \ No newline at end of file