From fac9c17a9bd3211bfb3b0103c74ad4181c0b345b Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Tue, 11 Apr 2023 17:59:30 +0900 Subject: [PATCH] feat: implement infer schema from single file (#1348) * feat: implement infer schema from file * feat: implement compression type * refactor: remove unnecessary BufReader * refactor: remove SyncIoBridge and using tokio_util::io::SyncIoBridge instead * chore: apply suggestions from CR --- Cargo.lock | 8 + Cargo.toml | 2 +- src/common/datasource/Cargo.toml | 15 ++ src/common/datasource/src/compression.rs | 84 ++++++++++ src/common/datasource/src/error.rs | 76 ++++++++- src/common/datasource/src/file_format.rs | 30 ++++ src/common/datasource/src/file_format/csv.rs | 158 ++++++++++++++++++ src/common/datasource/src/file_format/json.rs | 121 ++++++++++++++ .../datasource/src/file_format/parquet.rs | 78 +++++++++ src/common/datasource/src/lib.rs | 3 + src/common/datasource/src/test_util.rs | 48 ++++++ src/common/datasource/tests/README.md | 24 +++ src/common/datasource/tests/csv/basic.csv | 4 + .../tests/csv/schema_infer_limit.csv | 5 + src/common/datasource/tests/csv/simple.csv | 11 ++ .../tests/json/schema_infer_limit.json | 4 + src/common/datasource/tests/json/simple.json | 12 ++ .../datasource/tests/parquet/basic.parquet | Bin 0 -> 806 bytes 18 files changed, 673 insertions(+), 10 deletions(-) create mode 100644 src/common/datasource/src/compression.rs create mode 100644 src/common/datasource/src/file_format.rs create mode 100644 src/common/datasource/src/file_format/csv.rs create mode 100644 src/common/datasource/src/file_format/json.rs create mode 100644 src/common/datasource/src/file_format/parquet.rs create mode 100644 src/common/datasource/src/test_util.rs create mode 100644 src/common/datasource/tests/README.md create mode 100644 src/common/datasource/tests/csv/basic.csv create mode 100644 src/common/datasource/tests/csv/schema_infer_limit.csv create mode 100644 src/common/datasource/tests/csv/simple.csv create mode 100644 src/common/datasource/tests/json/schema_infer_limit.json create mode 100644 src/common/datasource/tests/json/simple.json create mode 100644 src/common/datasource/tests/parquet/basic.parquet diff --git a/Cargo.lock b/Cargo.lock index 5b758f5f4a..1a6925a663 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1561,11 +1561,19 @@ dependencies = [ name = "common-datasource" version = "0.1.1" dependencies = [ + "arrow", + "arrow-schema", + "async-compression", + "async-trait", "common-error", + "common-runtime", + "datafusion", "futures", "object-store", "regex", "snafu", + "tokio", + "tokio-util", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 662ad5f50a..2b250157bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ snafu = { version = "0.7", features = ["backtraces"] } sqlparser = "0.32" tempfile = "3" tokio = { version = "1.24.2", features = ["full"] } -tokio-util = "0.7" +tokio-util = { version = "0.7", features = ["io-util"] } tonic = { version = "0.8", features = ["tls"] } uuid = { version = "1", features = ["serde", "v4", "fast-rng"] } diff --git a/src/common/datasource/Cargo.toml b/src/common/datasource/Cargo.toml index a077fde9c6..0c58fe801c 100644 --- a/src/common/datasource/Cargo.toml +++ b/src/common/datasource/Cargo.toml @@ -5,9 +5,24 @@ edition.workspace = true license.workspace = true [dependencies] +arrow.workspace = true +arrow-schema.workspace = true +async-compression = { version = "0.3", features = [ + "bzip2", + "gzip", + "xz", + "zstd", + "futures-io", + "tokio", +] } +async-trait.workspace = true common-error = { path = "../error" } +common-runtime = { path = "../runtime" } +datafusion.workspace = true futures.workspace = true object-store = { path = "../../object-store" } regex = "1.7" snafu.workspace = true +tokio.workspace = true +tokio-util.workspace = true url = "2.3" diff --git a/src/common/datasource/src/compression.rs b/src/common/datasource/src/compression.rs new file mode 100644 index 0000000000..0ea12b401c --- /dev/null +++ b/src/common/datasource/src/compression.rs @@ -0,0 +1,84 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::str::FromStr; + +use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder}; +use tokio::io::{AsyncRead, BufReader}; + +use crate::error::{self, Error, Result}; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum CompressionType { + /// Gzip-ed file + GZIP, + /// Bzip2-ed file + BZIP2, + /// Xz-ed file (liblzma) + XZ, + /// Zstd-ed file, + ZSTD, + /// Uncompressed file + UNCOMPRESSED, +} + +impl FromStr for CompressionType { + type Err = Error; + + fn from_str(s: &str) -> Result { + let s = s.to_uppercase(); + match s.as_str() { + "GZIP" | "GZ" => Ok(Self::GZIP), + "BZIP2" | "BZ2" => Ok(Self::BZIP2), + "XZ" => Ok(Self::XZ), + "ZST" | "ZSTD" => Ok(Self::ZSTD), + "" => Ok(Self::UNCOMPRESSED), + _ => error::UnsupportedCompressionTypeSnafu { + compression_type: s, + } + .fail(), + } + } +} + +impl Display for CompressionType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(match self { + Self::GZIP => "GZIP", + Self::BZIP2 => "BZIP2", + Self::XZ => "XZ", + Self::ZSTD => "ZSTD", + Self::UNCOMPRESSED => "", + }) + } +} + +impl CompressionType { + pub const fn is_compressed(&self) -> bool { + !matches!(self, &Self::UNCOMPRESSED) + } + + pub fn convert_async_read( + &self, + s: T, + ) -> Box { + match self { + CompressionType::GZIP => Box::new(GzipDecoder::new(BufReader::new(s))), + CompressionType::BZIP2 => Box::new(BzDecoder::new(BufReader::new(s))), + CompressionType::XZ => Box::new(XzDecoder::new(BufReader::new(s))), + CompressionType::ZSTD => Box::new(ZstdDecoder::new(BufReader::new(s))), + CompressionType::UNCOMPRESSED => Box::new(s), + } + } +} diff --git a/src/common/datasource/src/error.rs b/src/common/datasource/src/error.rs index 657a536098..b8f4f61831 100644 --- a/src/common/datasource/src/error.rs +++ b/src/common/datasource/src/error.rs @@ -21,6 +21,9 @@ use url::ParseError; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Unsupported compression type: {}", compression_type))] + UnsupportedCompressionType { compression_type: String }, + #[snafu(display("Unsupported backend protocol: {}", protocol))] UnsupportedBackendProtocol { protocol: String }, @@ -33,12 +36,44 @@ pub enum Error { #[snafu(display("Invalid url: {}, error :{}", url, source))] InvalidUrl { url: String, source: ParseError }, + #[snafu(display("Failed to decompression, source: {}", source))] + Decompression { + source: object_store::Error, + location: Location, + }, + #[snafu(display("Failed to build backend, source: {}", source))] BuildBackend { source: object_store::Error, location: Location, }, + #[snafu(display("Failed to read object from path: {}, source: {}", path, source))] + ReadObject { + path: String, + location: Location, + source: object_store::Error, + }, + + #[snafu(display("Failed to read parquet source: {}", source))] + ReadParquetSnafu { + location: Location, + source: datafusion::parquet::errors::ParquetError, + }, + + #[snafu(display("Failed to convert parquet to schema: {}", source))] + ParquetToSchema { + location: Location, + source: datafusion::parquet::errors::ParquetError, + }, + + #[snafu(display("Failed to infer schema from file: {}, source: {}", path, source))] + InferSchema { + path: String, + location: Location, + source: arrow_schema::ArrowError, + }, + #[snafu(display("Failed to list object in path: {}, source: {}", path, source))] ListObjects { path: String, @@ -48,6 +83,12 @@ pub enum Error { #[snafu(display("Invalid connection: {}", msg))] InvalidConnection { msg: String }, + + #[snafu(display("Failed to join handle: {}", source))] + JoinHandle { + location: Location, + source: tokio::task::JoinError, + }, } pub type Result = std::result::Result; @@ -56,13 +97,21 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable, + BuildBackend { .. } | ListObjects { .. } | ReadObject { .. } => { + StatusCode::StorageUnavailable + } UnsupportedBackendProtocol { .. } + | UnsupportedCompressionType { .. } | InvalidConnection { .. } | InvalidUrl { .. } | EmptyHostPath { .. } - | InvalidPath { .. } => StatusCode::InvalidArguments, + | InvalidPath { .. } + | InferSchema { .. } + | ReadParquetSnafu { .. } + | ParquetToSchema { .. } => StatusCode::InvalidArguments, + + Decompression { .. } | JoinHandle { .. } => StatusCode::Unexpected, } } @@ -71,14 +120,23 @@ impl ErrorExt for Error { } fn location_opt(&self) -> Option { + use Error::*; match self { - Error::BuildBackend { location, .. } => Some(*location), - Error::ListObjects { location, .. } => Some(*location), - Error::UnsupportedBackendProtocol { .. } - | Error::EmptyHostPath { .. } - | Error::InvalidPath { .. } - | Error::InvalidUrl { .. } - | Error::InvalidConnection { .. } => None, + BuildBackend { location, .. } => Some(*location), + ReadObject { location, .. } => Some(*location), + ListObjects { location, .. } => Some(*location), + InferSchema { location, .. } => Some(*location), + ReadParquetSnafu { location, .. } => Some(*location), + ParquetToSchema { location, .. } => Some(*location), + Decompression { location, .. } => Some(*location), + JoinHandle { location, .. } => Some(*location), + + UnsupportedBackendProtocol { .. } + | EmptyHostPath { .. } + | InvalidPath { .. } + | InvalidUrl { .. } + | InvalidConnection { .. } + | UnsupportedCompressionType { .. } => None, } } } diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs new file mode 100644 index 0000000000..e4d651fec9 --- /dev/null +++ b/src/common/datasource/src/file_format.rs @@ -0,0 +1,30 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod csv; +pub mod json; +pub mod parquet; + +pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000; + +use arrow::datatypes::SchemaRef; +use async_trait::async_trait; +use object_store::ObjectStore; + +use crate::error::Result; + +#[async_trait] +pub trait FileFormat: Send + Sync + std::fmt::Debug { + async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result; +} diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs new file mode 100644 index 0000000000..73473e41cf --- /dev/null +++ b/src/common/datasource/src/file_format/csv.rs @@ -0,0 +1,158 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow::csv::reader::infer_reader_schema as infer_csv_schema; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use common_runtime; +use object_store::ObjectStore; +use snafu::ResultExt; +use tokio_util::io::SyncIoBridge; + +use crate::compression::CompressionType; +use crate::error::{self, Result}; +use crate::file_format::{self, FileFormat}; + +#[derive(Debug)] +pub struct CsvFormat { + pub has_header: bool, + pub delimiter: u8, + pub schema_infer_max_record: Option, + pub compression_type: CompressionType, +} + +impl Default for CsvFormat { + fn default() -> Self { + Self { + has_header: true, + delimiter: b',', + schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD), + compression_type: CompressionType::UNCOMPRESSED, + } + } +} + +#[async_trait] +impl FileFormat for CsvFormat { + async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result { + let reader = store + .reader(&path) + .await + .context(error::ReadObjectSnafu { path: &path })?; + + let decoded = self.compression_type.convert_async_read(reader); + + let delimiter = self.delimiter; + let schema_infer_max_record = self.schema_infer_max_record; + let has_header = self.has_header; + + common_runtime::spawn_blocking_read(move || { + let reader = SyncIoBridge::new(decoded); + + let (schema, _records_read) = + infer_csv_schema(reader, delimiter, schema_infer_max_record, has_header) + .context(error::InferSchemaSnafu { path: &path })?; + + Ok(Arc::new(schema)) + }) + .await + .context(error::JoinHandleSnafu)? + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use crate::file_format::FileFormat; + use crate::test_util::{self, format_schema, test_store}; + + fn test_data_root() -> String { + test_util::get_data_dir("tests/csv").display().to_string() + } + + #[tokio::test] + async fn infer_schema_basic() { + let csv = CsvFormat::default(); + let store = test_store(&test_data_root()); + let schema = csv + .infer_schema(&store, "simple.csv".to_string()) + .await + .unwrap(); + let formatted: Vec<_> = format_schema(schema); + + assert_eq!( + vec![ + "c1: Utf8: NULL", + "c2: Int64: NULL", + "c3: Int64: NULL", + "c4: Int64: NULL", + "c5: Int64: NULL", + "c6: Int64: NULL", + "c7: Int64: NULL", + "c8: Int64: NULL", + "c9: Int64: NULL", + "c10: Int64: NULL", + "c11: Float64: NULL", + "c12: Float64: NULL", + "c13: Utf8: NULL" + ], + formatted, + ); + } + + #[tokio::test] + async fn infer_schema_with_limit() { + let json = CsvFormat { + schema_infer_max_record: Some(3), + ..CsvFormat::default() + }; + let store = test_store(&test_data_root()); + let schema = json + .infer_schema(&store, "schema_infer_limit.csv".to_string()) + .await + .unwrap(); + let formatted: Vec<_> = format_schema(schema); + + assert_eq!( + vec![ + "a: Int64: NULL", + "b: Float64: NULL", + "c: Int64: NULL", + "d: Int64: NULL" + ], + formatted + ); + + let json = CsvFormat::default(); + let store = test_store(&test_data_root()); + let schema = json + .infer_schema(&store, "schema_infer_limit.csv".to_string()) + .await + .unwrap(); + let formatted: Vec<_> = format_schema(schema); + + assert_eq!( + vec![ + "a: Int64: NULL", + "b: Float64: NULL", + "c: Int64: NULL", + "d: Utf8: NULL" + ], + formatted + ); + } +} diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs new file mode 100644 index 0000000000..49c9f535ca --- /dev/null +++ b/src/common/datasource/src/file_format/json.rs @@ -0,0 +1,121 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::BufReader; +use std::sync::Arc; + +use arrow::datatypes::SchemaRef; +use arrow::json::reader::{infer_json_schema_from_iterator, ValueIter}; +use async_trait::async_trait; +use common_runtime; +use object_store::ObjectStore; +use snafu::ResultExt; +use tokio_util::io::SyncIoBridge; + +use crate::compression::CompressionType; +use crate::error::{self, Result}; +use crate::file_format::{self, FileFormat}; + +#[derive(Debug)] +pub struct JsonFormat { + pub schema_infer_max_record: Option, + pub compression_type: CompressionType, +} + +impl Default for JsonFormat { + fn default() -> Self { + Self { + schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD), + compression_type: CompressionType::UNCOMPRESSED, + } + } +} + +#[async_trait] +impl FileFormat for JsonFormat { + async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result { + let reader = store + .reader(&path) + .await + .context(error::ReadObjectSnafu { path: &path })?; + + let decoded = self.compression_type.convert_async_read(reader); + + let schema_infer_max_record = self.schema_infer_max_record; + + common_runtime::spawn_blocking_read(move || { + let mut reader = BufReader::new(SyncIoBridge::new(decoded)); + + let iter = ValueIter::new(&mut reader, schema_infer_max_record); + + let schema = infer_json_schema_from_iterator(iter) + .context(error::InferSchemaSnafu { path: &path })?; + + Ok(Arc::new(schema)) + }) + .await + .context(error::JoinHandleSnafu)? + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::file_format::FileFormat; + use crate::test_util::{self, format_schema, test_store}; + + fn test_data_root() -> String { + test_util::get_data_dir("tests/json").display().to_string() + } + + #[tokio::test] + async fn infer_schema_basic() { + let json = JsonFormat::default(); + let store = test_store(&test_data_root()); + let schema = json + .infer_schema(&store, "simple.json".to_string()) + .await + .unwrap(); + let formatted: Vec<_> = format_schema(schema); + + assert_eq!( + vec![ + "a: Int64: NULL", + "b: Float64: NULL", + "c: Boolean: NULL", + "d: Utf8: NULL", + ], + formatted + ); + } + + #[tokio::test] + async fn infer_schema_with_limit() { + let json = JsonFormat { + schema_infer_max_record: Some(3), + ..JsonFormat::default() + }; + let store = test_store(&test_data_root()); + let schema = json + .infer_schema(&store, "schema_infer_limit.json".to_string()) + .await + .unwrap(); + let formatted: Vec<_> = format_schema(schema); + + assert_eq!( + vec!["a: Int64: NULL", "b: Float64: NULL", "c: Boolean: NULL"], + formatted + ); + } +} diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs new file mode 100644 index 0000000000..c0fd8e5abf --- /dev/null +++ b/src/common/datasource/src/file_format/parquet.rs @@ -0,0 +1,78 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion::parquet::arrow::async_reader::AsyncFileReader; +use datafusion::parquet::arrow::parquet_to_arrow_schema; +use object_store::ObjectStore; +use snafu::ResultExt; + +use crate::error::{self, Result}; +use crate::file_format::FileFormat; + +#[derive(Debug, Default)] +pub struct ParquetFormat {} + +#[async_trait] +impl FileFormat for ParquetFormat { + async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result { + let mut reader = store + .reader(&path) + .await + .context(error::ReadObjectSnafu { path: &path })?; + + let metadata = reader + .get_metadata() + .await + .context(error::ReadParquetSnafuSnafu)?; + + let file_metadata = metadata.file_metadata(); + let schema = parquet_to_arrow_schema( + file_metadata.schema_descr(), + file_metadata.key_value_metadata(), + ) + .context(error::ParquetToSchemaSnafu)?; + + Ok(Arc::new(schema)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::file_format::FileFormat; + use crate::test_util::{self, format_schema, test_store}; + + fn test_data_root() -> String { + test_util::get_data_dir("tests/parquet") + .display() + .to_string() + } + + #[tokio::test] + async fn infer_schema_basic() { + let json = ParquetFormat::default(); + let store = test_store(&test_data_root()); + let schema = json + .infer_schema(&store, "basic.parquet".to_string()) + .await + .unwrap(); + let formatted: Vec<_> = format_schema(schema); + + assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted); + } +} diff --git a/src/common/datasource/src/lib.rs b/src/common/datasource/src/lib.rs index 6044d52033..fc41d18cf9 100644 --- a/src/common/datasource/src/lib.rs +++ b/src/common/datasource/src/lib.rs @@ -12,7 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod compression; pub mod error; +pub mod file_format; pub mod lister; pub mod object_store; +pub mod test_util; pub mod util; diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs new file mode 100644 index 0000000000..3e65022e10 --- /dev/null +++ b/src/common/datasource/src/test_util.rs @@ -0,0 +1,48 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::PathBuf; + +use arrow_schema::SchemaRef; +use object_store::services::Fs; +use object_store::ObjectStore; + +pub fn get_data_dir(path: &str) -> PathBuf { + // https://doc.rust-lang.org/cargo/reference/environment-variables.html + let dir = env!("CARGO_MANIFEST_DIR"); + + PathBuf::from(dir).join(path) +} + +pub fn format_schema(schema: SchemaRef) -> Vec { + schema + .fields() + .iter() + .map(|f| { + format!( + "{}: {:?}: {}", + f.name(), + f.data_type(), + if f.is_nullable() { "NULL" } else { "NOT NULL" } + ) + }) + .collect() +} + +pub fn test_store(root: &str) -> ObjectStore { + let mut builder = Fs::default(); + builder.root(root); + + ObjectStore::new(builder).unwrap().finish() +} diff --git a/src/common/datasource/tests/README.md b/src/common/datasource/tests/README.md new file mode 100644 index 0000000000..df750faf86 --- /dev/null +++ b/src/common/datasource/tests/README.md @@ -0,0 +1,24 @@ +### Parquet +The `parquet/basic.parquet` was converted from `csv/basic.csv` via [bdt](https://github.com/andygrove/bdt). + +Internal of `parquet/basic.parquet`: + +Data: +``` ++-----+-------+ +| num | str | ++-----+-------+ +| 5 | test | +| 2 | hello | +| 4 | foo | ++-----+-------+ +``` +Schema: +``` ++-------------+-----------+-------------+ +| column_name | data_type | is_nullable | ++-------------+-----------+-------------+ +| num | Int64 | YES | +| str | Utf8 | YES | ++-------------+-----------+-------------+ +``` \ No newline at end of file diff --git a/src/common/datasource/tests/csv/basic.csv b/src/common/datasource/tests/csv/basic.csv new file mode 100644 index 0000000000..77051beb27 --- /dev/null +++ b/src/common/datasource/tests/csv/basic.csv @@ -0,0 +1,4 @@ +num,str +5,test +2,hello +4,foo \ No newline at end of file diff --git a/src/common/datasource/tests/csv/schema_infer_limit.csv b/src/common/datasource/tests/csv/schema_infer_limit.csv new file mode 100644 index 0000000000..40e5a598a7 --- /dev/null +++ b/src/common/datasource/tests/csv/schema_infer_limit.csv @@ -0,0 +1,5 @@ +a,b,c,d +1,2,3,4 +1,2,3,4 +1,2.0,3,4 +1,2,4,test \ No newline at end of file diff --git a/src/common/datasource/tests/csv/simple.csv b/src/common/datasource/tests/csv/simple.csv new file mode 100644 index 0000000000..67fd31494b --- /dev/null +++ b/src/common/datasource/tests/csv/simple.csv @@ -0,0 +1,11 @@ +c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13 +c,2,1,18109,2033001162,-6513304855495910254,25,43062,1491205016,5863949479783605708,0.110830784,0.9294097332465232,6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW +d,5,-40,22614,706441268,-7542719935673075327,155,14337,3373581039,11720144131976083864,0.69632107,0.3114712539863804,C2GT5KVyOPZpgKVl110TyZO0NcJ434 +b,1,29,-18218,994303988,5983957848665088916,204,9489,3275293996,14857091259186476033,0.53840446,0.17909035118828576,AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz +a,1,-85,-15154,1171968280,1919439543497968449,77,52286,774637006,12101411955859039553,0.12285209,0.6864391962767343,0keZ5G8BffGwgF2RwQD59TFzMStxCB +b,5,-82,22080,1824882165,7373730676428214987,208,34331,3342719438,3330177516592499461,0.82634634,0.40975383525297016,Ig1QcuKsjHXkproePdERo2w0mYzIqd +b,4,-111,-1967,-4229382,1892872227362838079,67,9832,1243785310,8382489916947120498,0.06563997,0.152498292971736,Sfx0vxv1skzZWT1PqVdoRDdO6Sb6xH +e,3,104,-25136,1738331255,300633854973581194,139,20807,3577318119,13079037564113702254,0.40154034,0.7764360990307122,DuJNG8tufSqW0ZstHqWj3aGvFLMg4A +a,3,13,12613,1299719633,2020498574254265315,191,17835,3998790955,14881411008939145569,0.041445434,0.8813167497816289,Amn2K87Db5Es3dFQO9cw9cvpAM6h35 +d,1,38,18384,-335410409,-1632237090406591229,26,57510,2712615025,1842662804748246269,0.6064476,0.6404495093354053,4HX6feIvmNXBN7XGqgO4YVBkhu8GDI +a,4,-38,20744,762932956,308913475857409919,7,45465,1787652631,878137512938218976,0.7459874,0.02182578039211991,ydkwycaISlYSlEq3TlkS2m15I2pcp8 \ No newline at end of file diff --git a/src/common/datasource/tests/json/schema_infer_limit.json b/src/common/datasource/tests/json/schema_infer_limit.json new file mode 100644 index 0000000000..bfacf2fa56 --- /dev/null +++ b/src/common/datasource/tests/json/schema_infer_limit.json @@ -0,0 +1,4 @@ +{"a":1} +{"a":-10, "b":-3.5} +{"a":2, "b":0.6, "c":false} +{"a":1, "b":2.0, "c":false, "d":"4"} \ No newline at end of file diff --git a/src/common/datasource/tests/json/simple.json b/src/common/datasource/tests/json/simple.json new file mode 100644 index 0000000000..dafd2dd2e4 --- /dev/null +++ b/src/common/datasource/tests/json/simple.json @@ -0,0 +1,12 @@ +{"a":1, "b":2.0, "c":false, "d":"4"} +{"a":-10, "b":-3.5, "c":true, "d":"4"} +{"a":2, "b":0.6, "c":false, "d":"text"} +{"a":1, "b":2.0, "c":false, "d":"4"} +{"a":7, "b":-3.5, "c":true, "d":"4"} +{"a":1, "b":0.6, "c":false, "d":"text"} +{"a":1, "b":2.0, "c":false, "d":"4"} +{"a":5, "b":-3.5, "c":true, "d":"4"} +{"a":1, "b":0.6, "c":false, "d":"text"} +{"a":1, "b":2.0, "c":false, "d":"4"} +{"a":1, "b":-3.5, "c":true, "d":"4"} +{"a":100000000000000, "b":0.6, "c":false, "d":"text"} \ No newline at end of file diff --git a/src/common/datasource/tests/parquet/basic.parquet b/src/common/datasource/tests/parquet/basic.parquet new file mode 100644 index 0000000000000000000000000000000000000000..b95269cbcfce8c25ad8c1d06592d371cea67ba4e GIT binary patch literal 806 zcmb7DK~L0B5T3r4W)FKnv-8rI^uR%STCtEo(1^*bnxJlAcY^{avXHWifvvX1{SU@p zVE7Fl_ebo(pF#Wuo}GDZBZQdXq@9^J^UZuSFL^y0tfGf8#$EIg9s(>tDXOwwWf8E2 ztu~_;`S-(vMvV*_HI|{``i^oR0eWT~{6!PElgkSP?du;7f6-pl_($>WL4>$S@y`F3-dW$E-NKN%+%L-hD2mX*B)z@~US&oXpFhHKx1 zrX9_k@c(a1|Msw@zj^oqqL~}w{nyoB7f|cDhi%YvrF~u?XwqP?yZ@}5Ir!x~5!vVy zi&bpj5l!jkh~7$wEm23(EPXx`y%Co~wdIqzCCKWXkKA