diff --git a/src/common/datasource/src/error.rs b/src/common/datasource/src/error.rs index 4554b476c8..fe73238635 100644 --- a/src/common/datasource/src/error.rs +++ b/src/common/datasource/src/error.rs @@ -113,9 +113,8 @@ pub enum Error { source: datafusion::parquet::errors::ParquetError, }, - #[snafu(display("Failed to infer schema from file: {}, source: {}", path, source))] + #[snafu(display("Failed to infer schema from file, source: {}", source))] InferSchema { - path: String, location: Location, source: arrow_schema::ArrowError, }, diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index 0b9bb3b119..e5bff711eb 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -78,7 +78,7 @@ impl TryFrom<&HashMap> for Format { #[async_trait] pub trait FileFormat: Send + Sync + std::fmt::Debug { - async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result; + async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result; } pub trait ArrowDecoder: Send + 'static { @@ -167,7 +167,7 @@ pub async fn infer_schemas( ) -> Result { let mut schemas = Vec::with_capacity(files.len()); for file in files { - schemas.push(file_format.infer_schema(store, file.to_string()).await?) + schemas.push(file_format.infer_schema(store, file).await?) } ArrowSchema::try_merge(schemas).context(error::MergeSchemaSnafu) } @@ -175,14 +175,14 @@ pub async fn infer_schemas( pub async fn stream_to_file T>( mut stream: SendableRecordBatchStream, store: ObjectStore, - path: String, + path: &str, threshold: usize, encoder_factory: U, ) -> Result { let writer = store - .writer(&path) + .writer(path) .await - .context(error::WriteObjectSnafu { path: &path })? + .context(error::WriteObjectSnafu { path })? .compat_write(); let buffer = SharedBuffer::with_capacity(threshold); diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index dd2080e29f..27ff047579 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -162,11 +162,11 @@ impl FileOpener for CsvOpener { #[async_trait] impl FileFormat for CsvFormat { - async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result { + async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { let reader = store - .reader(&path) + .reader(path) .await - .context(error::ReadObjectSnafu { path: &path })?; + .context(error::ReadObjectSnafu { path })?; let decoded = self.compression_type.convert_async_read(reader); @@ -179,7 +179,7 @@ impl FileFormat for CsvFormat { let (schema, _records_read) = infer_csv_schema(reader, delimiter, schema_infer_max_record, has_header) - .context(error::InferSchemaSnafu { path: &path })?; + .context(error::InferSchemaSnafu)?; Ok(schema) }) .await @@ -190,7 +190,7 @@ impl FileFormat for CsvFormat { pub async fn stream_to_csv( stream: SendableRecordBatchStream, store: ObjectStore, - path: String, + path: &str, threshold: usize, ) -> Result { stream_to_file(stream, store, path, threshold, |buffer| { @@ -223,10 +223,7 @@ mod tests { async fn infer_schema_basic() { let csv = CsvFormat::default(); let store = test_store(&test_data_root()); - let schema = csv - .infer_schema(&store, "simple.csv".to_string()) - .await - .unwrap(); + let schema = csv.infer_schema(&store, "simple.csv").await.unwrap(); let formatted: Vec<_> = format_schema(schema); assert_eq!( @@ -257,7 +254,7 @@ mod tests { }; let store = test_store(&test_data_root()); let schema = json - .infer_schema(&store, "schema_infer_limit.csv".to_string()) + .infer_schema(&store, "schema_infer_limit.csv") .await .unwrap(); let formatted: Vec<_> = format_schema(schema); @@ -275,7 +272,7 @@ mod tests { let json = CsvFormat::default(); let store = test_store(&test_data_root()); let schema = json - .infer_schema(&store, "schema_infer_limit.csv".to_string()) + .infer_schema(&store, "schema_infer_limit.csv") .await .unwrap(); let formatted: Vec<_> = format_schema(schema); diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index 85526e5943..134b23bce0 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -80,11 +80,11 @@ impl Default for JsonFormat { #[async_trait] impl FileFormat for JsonFormat { - async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result { + async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { let reader = store - .reader(&path) + .reader(path) .await - .context(error::ReadObjectSnafu { path: &path })?; + .context(error::ReadObjectSnafu { path })?; let decoded = self.compression_type.convert_async_read(reader); @@ -95,8 +95,7 @@ impl FileFormat for JsonFormat { let iter = ValueIter::new(&mut reader, schema_infer_max_record); - let schema = infer_json_schema_from_iterator(iter) - .context(error::InferSchemaSnafu { path: &path })?; + let schema = infer_json_schema_from_iterator(iter).context(error::InferSchemaSnafu)?; Ok(schema) }) @@ -149,7 +148,7 @@ impl FileOpener for JsonOpener { pub async fn stream_to_json( stream: SendableRecordBatchStream, store: ObjectStore, - path: String, + path: &str, threshold: usize, ) -> Result { stream_to_file(stream, store, path, threshold, |buffer| { @@ -179,10 +178,7 @@ mod tests { async fn infer_schema_basic() { let json = JsonFormat::default(); let store = test_store(&test_data_root()); - let schema = json - .infer_schema(&store, "simple.json".to_string()) - .await - .unwrap(); + let schema = json.infer_schema(&store, "simple.json").await.unwrap(); let formatted: Vec<_> = format_schema(schema); assert_eq!( @@ -204,7 +200,7 @@ mod tests { }; let store = test_store(&test_data_root()); let schema = json - .infer_schema(&store, "schema_infer_limit.json".to_string()) + .infer_schema(&store, "schema_infer_limit.json") .await .unwrap(); let formatted: Vec<_> = format_schema(schema); diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index fa4b8d3a74..e29729e028 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -40,11 +40,11 @@ pub struct ParquetFormat {} #[async_trait] impl FileFormat for ParquetFormat { - async fn infer_schema(&self, store: &ObjectStore, path: String) -> Result { + async fn infer_schema(&self, store: &ObjectStore, path: &str) -> Result { let mut reader = store - .reader(&path) + .reader(path) .await - .context(error::ReadObjectSnafu { path: &path })?; + .context(error::ReadObjectSnafu { path })?; let metadata = reader .get_metadata() @@ -171,10 +171,7 @@ mod tests { async fn infer_schema_basic() { let json = ParquetFormat::default(); let store = test_store(&test_data_root()); - let schema = json - .infer_schema(&store, "basic.parquet".to_string()) - .await - .unwrap(); + let schema = json.infer_schema(&store, "basic.parquet").await.unwrap(); let formatted: Vec<_> = format_schema(schema); assert_eq!(vec!["num: Int64: NULL", "str: Utf8: NULL"], formatted); diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index 70d81ffa31..a72a194a7d 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -116,7 +116,7 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi stream_to_json( Box::pin(stream), tmp_store.clone(), - output_path.clone(), + &output_path, threshold(size), ) .await @@ -158,7 +158,7 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz stream_to_csv( Box::pin(stream), tmp_store.clone(), - output_path.clone(), + &output_path, threshold(size), ) .await diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index 179cedb2d7..4be42c1175 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -19,6 +19,7 @@ use std::task::{Context, Poll}; use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef; use datafusion::error::Result as DfResult; +use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream}; use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream; use datafusion_common::DataFusionError; use datatypes::schema::{Schema, SchemaRef}; @@ -39,6 +40,40 @@ type FutureStream = Pin< >, >; +/// ParquetRecordBatchStream -> DataFusion RecordBatchStream +pub struct ParquetRecordBatchStreamAdapter { + stream: ParquetRecordBatchStream, +} + +impl ParquetRecordBatchStreamAdapter { + pub fn new(stream: ParquetRecordBatchStream) -> Self { + Self { stream } + } +} + +impl DfRecordBatchStream + for ParquetRecordBatchStreamAdapter +{ + fn schema(&self) -> DfSchemaRef { + self.stream.schema().clone() + } +} + +impl Stream for ParquetRecordBatchStreamAdapter { + type Item = DfResult; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let batch = futures::ready!(Pin::new(&mut self.stream).poll_next(cx)) + .map(|r| r.map_err(|e| DataFusionError::External(Box::new(e)))); + Poll::Ready(batch) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + self.stream.size_hint() + } +} + /// Greptime SendableRecordBatchStream -> DataFusion RecordBatchStream pub struct DfRecordBatchStreamAdapter { stream: SendableRecordBatchStream, diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 214ed11b5e..a34661ed29 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -435,6 +435,12 @@ pub enum Error { source: common_datasource::error::Error, }, + #[snafu(display("Failed to parse file format, source: {}", source))] + ParseFileFormat { + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + #[snafu(display("Failed to build data source backend, source: {}", source))] BuildBackend { #[snafu(backtrace)] @@ -447,6 +453,26 @@ pub enum Error { source: common_datasource::error::Error, }, + #[snafu(display("Failed to infer schema from path: {}, source: {}", path, source))] + InferSchema { + path: String, + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + + #[snafu(display("Failed to build csv config: {}", source))] + BuildCsvConfig { + source: common_datasource::file_format::csv::CsvConfigBuilderError, + location: Location, + }, + + #[snafu(display("Failed to write stream to path: {}, source: {}", path, source))] + WriteStreamToFile { + path: String, + #[snafu(backtrace)] + source: common_datasource::error::Error, + }, + #[snafu(display("Failed to read object in path: {}, source: {}", path, source))] ReadObject { path: String, @@ -454,6 +480,12 @@ pub enum Error { source: object_store::Error, }, + #[snafu(display("Failed to read record batch, source: {}", source))] + ReadRecordBatch { + source: datafusion::error::DataFusionError, + location: Location, + }, + #[snafu(display("Failed to read parquet file, source: {}", source))] ReadParquet { source: parquet::errors::ParquetError, @@ -466,6 +498,12 @@ pub enum Error { source: parquet::errors::ParquetError, }, + #[snafu(display("Failed to build file stream, source: {}", source))] + BuildFileStream { + location: Location, + source: datafusion::error::DataFusionError, + }, + #[snafu(display("Failed to write parquet file, source: {}", source))] WriteParquet { #[snafu(backtrace)] @@ -517,7 +555,8 @@ impl ErrorExt for Error { | Error::ColumnNoneDefaultValue { .. } | Error::BuildRegex { .. } | Error::InvalidSchema { .. } - | Error::PrepareImmutableTable { .. } => StatusCode::InvalidArguments, + | Error::PrepareImmutableTable { .. } + | Error::BuildCsvConfig { .. } => StatusCode::InvalidArguments, Error::NotSupported { .. } => StatusCode::Unsupported, @@ -532,6 +571,10 @@ impl ErrorExt for Error { source.status_code() } + Error::ParseFileFormat { source, .. } | Error::InferSchema { source, .. } => { + source.status_code() + } + Error::Table { source } | Error::CopyTable { source, .. } | Error::Insert { source, .. } => source.status_code(), @@ -560,7 +603,12 @@ impl ErrorExt for Error { Error::TableNotFound { .. } => StatusCode::TableNotFound, Error::ColumnNotFound { .. } => StatusCode::TableColumnNotFound, - Error::JoinTask { .. } => StatusCode::Unexpected, + Error::JoinTask { .. } + | Error::BuildParquetRecordBatchStream { .. } + | Error::ReadRecordBatch { .. } + | Error::BuildFileStream { .. } + | Error::WriteStreamToFile { .. } => StatusCode::Unexpected, + Error::Catalog { source, .. } => source.status_code(), Error::CatalogEntrySerde { source, .. } => source.status_code(), @@ -594,9 +642,7 @@ impl ErrorExt for Error { Error::TableScanExec { source, .. } => source.status_code(), - Error::ReadObject { .. } - | Error::ReadParquet { .. } - | Error::BuildParquetRecordBatchStream { .. } => StatusCode::StorageUnavailable, + Error::ReadObject { .. } | Error::ReadParquet { .. } => StatusCode::StorageUnavailable, Error::ListObjects { source } | Error::ParseUrl { source } diff --git a/src/frontend/src/statement/copy_table_from.rs b/src/frontend/src/statement/copy_table_from.rs index 5fce5517ae..1ef4ffb758 100644 --- a/src/frontend/src/statement/copy_table_from.rs +++ b/src/frontend/src/statement/copy_table_from.rs @@ -14,17 +14,28 @@ use std::collections::HashMap; use std::future::Future; +use std::sync::Arc; use async_compat::CompatExt; use common_base::readable_size::ReadableSize; +use common_datasource::file_format::csv::{CsvConfigBuilder, CsvOpener}; +use common_datasource::file_format::json::JsonOpener; +use common_datasource::file_format::{FileFormat, Format}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::{build_backend, parse_url}; use common_datasource::util::find_dir_and_filename; use common_query::Output; +use common_recordbatch::adapter::ParquetRecordBatchStreamAdapter; +use common_recordbatch::DfSendableRecordBatchStream; +use datafusion::datasource::listing::PartitionedFile; +use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::parquet::arrow::ParquetRecordBatchStreamBuilder; +use datafusion::physical_plan::file_format::{FileOpener, FileScanConfig, FileStream}; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datatypes::arrow::datatypes::{DataType, SchemaRef}; use datatypes::vectors::Helper; use futures_util::StreamExt; +use object_store::{Entry, EntryMode, Metakey, ObjectStore}; use regex::Regex; use snafu::ResultExt; use table::engine::TableReference; @@ -34,15 +45,13 @@ use tokio::io::BufReader; use crate::error::{self, IntoVectorsSnafu, Result}; use crate::statement::StatementExecutor; -impl StatementExecutor { - pub(crate) async fn copy_table_from(&self, req: CopyTableRequest) -> Result { - let table_ref = TableReference { - catalog: &req.catalog_name, - schema: &req.schema_name, - table: &req.table_name, - }; - let table = self.get_table(&table_ref).await?; +const DEFAULT_BATCH_SIZE: usize = 8192; +impl StatementExecutor { + async fn list_copy_from_entries( + &self, + req: &CopyTableRequest, + ) -> Result<(ObjectStore, Vec)> { let (_schema, _host, path) = parse_url(&req.location).context(error::ParseUrlSnafu)?; let object_store = @@ -66,6 +75,137 @@ impl StatementExecutor { let entries = lister.list().await.context(error::ListObjectsSnafu)?; + Ok((object_store, entries)) + } + + async fn infer_schema( + &self, + format: &Format, + object_store: ObjectStore, + path: &str, + ) -> Result { + match format { + Format::Csv(format) => Ok(Arc::new( + format + .infer_schema(&object_store, path) + .await + .context(error::InferSchemaSnafu { path })?, + )), + Format::Json(format) => Ok(Arc::new( + format + .infer_schema(&object_store, path) + .await + .context(error::InferSchemaSnafu { path })?, + )), + Format::Parquet(_) => { + let reader = object_store + .reader(path) + .await + .context(error::ReadObjectSnafu { path })?; + + let buf_reader = BufReader::new(reader); + + let builder = ParquetRecordBatchStreamBuilder::new(buf_reader) + .await + .context(error::ReadParquetSnafu)?; + Ok(builder.schema().clone()) + } + } + } + + async fn build_file_stream( + &self, + opener: F, + filename: &str, + file_schema: SchemaRef, + ) -> Result { + let stream = FileStream::new( + &FileScanConfig { + object_store_url: ObjectStoreUrl::parse("empty://").unwrap(), // won't be used + file_schema, + file_groups: vec![vec![PartitionedFile::new(filename.to_string(), 10)]], + statistics: Default::default(), + projection: None, + limit: None, + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }, + 0, + opener, + &ExecutionPlanMetricsSet::new(), + ) + .context(error::BuildFileStreamSnafu)?; + + Ok(Box::pin(stream)) + } + + async fn build_read_stream( + &self, + format: &Format, + object_store: ObjectStore, + path: &str, + schema: SchemaRef, + ) -> Result { + match format { + Format::Csv(format) => { + let csv_conf = CsvConfigBuilder::default() + .batch_size(DEFAULT_BATCH_SIZE) + .file_schema(schema.clone()) + .build() + .context(error::BuildCsvConfigSnafu)?; + + self.build_file_stream( + CsvOpener::new(csv_conf, object_store, format.compression_type), + path, + schema, + ) + .await + } + Format::Json(format) => { + self.build_file_stream( + JsonOpener::new( + DEFAULT_BATCH_SIZE, + schema.clone(), + object_store, + format.compression_type, + ), + path, + schema, + ) + .await + } + Format::Parquet(_) => { + let reader = object_store + .reader(path) + .await + .context(error::ReadObjectSnafu { path })?; + + let buf_reader = BufReader::new(reader.compat()); + + let builder = ParquetRecordBatchStreamBuilder::new(buf_reader) + .await + .context(error::ReadParquetSnafu)?; + + let upstream = builder + .build() + .context(error::BuildParquetRecordBatchStreamSnafu)?; + + Ok(Box::pin(ParquetRecordBatchStreamAdapter::new(upstream))) + } + } + } + + pub(crate) async fn copy_table_from(&self, req: CopyTableRequest) -> Result { + let table_ref = TableReference { + catalog: &req.catalog_name, + schema: &req.schema_name, + table: &req.table_name, + }; + let table = self.get_table(&table_ref).await?; + + let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?; + let fields = table .schema() .arrow_schema() @@ -74,29 +214,33 @@ impl StatementExecutor { .map(|f| f.name().to_string()) .collect::>(); - let mut rows_inserted = 0; + let (object_store, entries) = self.list_copy_from_entries(&req).await?; + + let mut files = Vec::with_capacity(entries.len()); + for entry in entries.iter() { - let path = entry.path(); - // skips directories. - if entry.path().ends_with('/') { + let metadata = object_store + .metadata(entry, Metakey::Mode) + .await + .context(error::ReadObjectSnafu { path: entry.path() })?; + if metadata.mode() != EntryMode::FILE { continue; } - let reader = object_store - .reader(path) - .await - .context(error::ReadObjectSnafu { path })?; + let path = entry.path(); + let file_schema = self + .infer_schema(&format, object_store.clone(), path) + .await?; - let buf_reader = BufReader::new(reader.compat()); + ensure_schema_matches_ignore_timezone(&file_schema, table.schema().arrow_schema())?; - let builder = ParquetRecordBatchStreamBuilder::new(buf_reader) - .await - .context(error::ReadParquetSnafu)?; + files.push((file_schema, path)) + } - ensure_schema_matches_ignore_timezone(builder.schema(), table.schema().arrow_schema())?; - - let mut stream = builder - .build() - .context(error::BuildParquetRecordBatchStreamSnafu)?; + let mut rows_inserted = 0; + for (schema, path) in files { + let mut stream = self + .build_read_stream(&format, object_store.clone(), path, schema) + .await?; // TODO(hl): make this configurable through options. let pending_mem_threshold = ReadableSize::mb(32).as_bytes(); @@ -104,7 +248,7 @@ impl StatementExecutor { let mut pending = vec![]; while let Some(r) = stream.next().await { - let record_batch = r.context(error::ReadParquetSnafu)?; + let record_batch = r.context(error::ReadRecordBatchSnafu)?; let vectors = Helper::try_into_vectors(record_batch.columns()).context(IntoVectorsSnafu)?; diff --git a/src/frontend/src/statement/copy_table_to.rs b/src/frontend/src/statement/copy_table_to.rs index 7f37478511..aed13b57e4 100644 --- a/src/frontend/src/statement/copy_table_to.rs +++ b/src/frontend/src/statement/copy_table_to.rs @@ -12,9 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_base::readable_size::ReadableSize; +use common_datasource::file_format::csv::stream_to_csv; +use common_datasource::file_format::json::stream_to_json; +use common_datasource::file_format::Format; use common_datasource::object_store::{build_backend, parse_url}; use common_query::physical_plan::SessionContext; use common_query::Output; +use common_recordbatch::adapter::DfRecordBatchStreamAdapter; +use common_recordbatch::SendableRecordBatchStream; +use object_store::ObjectStore; use snafu::ResultExt; use storage::sst::SstInfo; use storage::{ParquetWriter, Source}; @@ -25,6 +32,46 @@ use crate::error::{self, Result, WriteParquetSnafu}; use crate::statement::StatementExecutor; impl StatementExecutor { + async fn stream_to_file( + &self, + stream: SendableRecordBatchStream, + format: &Format, + object_store: ObjectStore, + path: &str, + ) -> Result { + let threshold = ReadableSize::mb(4).as_bytes() as usize; + + match format { + Format::Csv(_) => stream_to_csv( + Box::pin(DfRecordBatchStreamAdapter::new(stream)), + object_store, + path, + threshold, + ) + .await + .context(error::WriteStreamToFileSnafu { path }), + Format::Json(_) => stream_to_json( + Box::pin(DfRecordBatchStreamAdapter::new(stream)), + object_store, + path, + threshold, + ) + .await + .context(error::WriteStreamToFileSnafu { path }), + Format::Parquet(_) => { + let writer = ParquetWriter::new(path, Source::Stream(stream), object_store); + let rows_copied = writer + .write_sst(&storage::sst::WriteOptions::default()) + .await + .context(WriteParquetSnafu)? + .map(|SstInfo { num_rows, .. }| num_rows) + .unwrap_or(0); + + Ok(rows_copied) + } + } + } + pub(crate) async fn copy_table_to(&self, req: CopyTableRequest) -> Result { let table_ref = TableReference { catalog: &req.catalog_name, @@ -33,6 +80,8 @@ impl StatementExecutor { }; let table = self.get_table(&table_ref).await?; + let format = Format::try_from(&req.with).context(error::ParseFileFormatSnafu)?; + let stream = table .scan(None, &[], None) .await @@ -48,14 +97,9 @@ impl StatementExecutor { let object_store = build_backend(&req.location, &req.connection).context(error::BuildBackendSnafu)?; - let writer = ParquetWriter::new(&path, Source::Stream(stream), object_store); - - let rows_copied = writer - .write_sst(&storage::sst::WriteOptions::default()) - .await - .context(WriteParquetSnafu)? - .map(|SstInfo { num_rows, .. }| num_rows) - .unwrap_or(0); + let rows_copied = self + .stream_to_file(stream, &format, object_store, &path) + .await?; Ok(Output::AffectedRows(rows_copied)) } diff --git a/tests/cases/standalone/common/copy/copy_from_fs.result b/tests/cases/standalone/common/copy/copy_from_fs.result index 0662e9c759..6366bdbaa0 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs.result +++ b/tests/cases/standalone/common/copy/copy_from_fs.result @@ -6,7 +6,7 @@ insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 16552765570 Affected Rows: 2 -Copy demo TO '/tmp/demo/export/demo.parquet'; +Copy demo TO '/tmp/demo/export/parquet/demo.parquet'; Affected Rows: 2 @@ -14,7 +14,7 @@ CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp Affected Rows: 0 -Copy with_filename FROM '/tmp/demo/export/demo.parquet'; +Copy with_filename FROM '/tmp/demo/export/parquet/demo.parquet'; Affected Rows: 2 @@ -31,7 +31,7 @@ CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time Affected Rows: 0 -Copy with_path FROM '/tmp/demo/export/'; +Copy with_path FROM '/tmp/demo/export/parquet/'; Affected Rows: 2 @@ -48,7 +48,7 @@ CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp t Affected Rows: 0 -Copy with_pattern FROM '/tmp/demo/export/' WITH (PATTERN = 'demo.*'); +Copy with_pattern FROM '/tmp/demo/export/parquet/' WITH (PATTERN = 'demo.*'); Affected Rows: 2 diff --git a/tests/cases/standalone/common/copy/copy_from_fs.sql b/tests/cases/standalone/common/copy/copy_from_fs.sql index d7b924021e..6de053f190 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs.sql @@ -2,23 +2,23 @@ CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time inde insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); -Copy demo TO '/tmp/demo/export/demo.parquet'; +Copy demo TO '/tmp/demo/export/parquet/demo.parquet'; CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); -Copy with_filename FROM '/tmp/demo/export/demo.parquet'; +Copy with_filename FROM '/tmp/demo/export/parquet/demo.parquet'; select * from with_filename order by ts; CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); -Copy with_path FROM '/tmp/demo/export/'; +Copy with_path FROM '/tmp/demo/export/parquet/'; select * from with_path order by ts; CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); -Copy with_pattern FROM '/tmp/demo/export/' WITH (PATTERN = 'demo.*'); +Copy with_pattern FROM '/tmp/demo/export/parquet/' WITH (PATTERN = 'demo.*'); select * from with_pattern order by ts; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_csv.result b/tests/cases/standalone/common/copy/copy_from_fs_csv.result new file mode 100644 index 0000000000..2cdcf4b872 --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_from_fs_csv.result @@ -0,0 +1,79 @@ +CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); + +Affected Rows: 0 + +insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); + +Affected Rows: 2 + +Copy demo TO '/tmp/demo/export/csv/demo.csv' with (format='csv'); + +Affected Rows: 2 + +CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy with_filename FROM '/tmp/demo/export/csv/demo.csv' with (format='csv'); + +Affected Rows: 2 + +select * from with_filename order by ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | ++-------+------+--------+---------------------+ + +CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy with_path FROM '/tmp/demo/export/csv/' with (format='csv'); + +Affected Rows: 2 + +select * from with_path order by ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | ++-------+------+--------+---------------------+ + +CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy with_pattern FROM '/tmp/demo/export/csv/' WITH (pattern = 'demo.*',format='csv'); + +Affected Rows: 2 + +select * from with_pattern order by ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | ++-------+------+--------+---------------------+ + +drop table demo; + +Affected Rows: 1 + +drop table with_filename; + +Affected Rows: 1 + +drop table with_path; + +Affected Rows: 1 + +drop table with_pattern; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/copy/copy_from_fs_csv.sql b/tests/cases/standalone/common/copy/copy_from_fs_csv.sql new file mode 100644 index 0000000000..b38c566e2a --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_from_fs_csv.sql @@ -0,0 +1,31 @@ +CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); + +insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); + +Copy demo TO '/tmp/demo/export/csv/demo.csv' with (format='csv'); + +CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); + +Copy with_filename FROM '/tmp/demo/export/csv/demo.csv' with (format='csv'); + +select * from with_filename order by ts; + +CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); + +Copy with_path FROM '/tmp/demo/export/csv/' with (format='csv'); + +select * from with_path order by ts; + +CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); + +Copy with_pattern FROM '/tmp/demo/export/csv/' WITH (pattern = 'demo.*',format='csv'); + +select * from with_pattern order by ts; + +drop table demo; + +drop table with_filename; + +drop table with_path; + +drop table with_pattern; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_json.result b/tests/cases/standalone/common/copy/copy_from_fs_json.result new file mode 100644 index 0000000000..9ddda24dd5 --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_from_fs_json.result @@ -0,0 +1,79 @@ +CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); + +Affected Rows: 0 + +insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); + +Affected Rows: 2 + +Copy demo TO '/tmp/demo/export/json/demo.json' with (format='json'); + +Affected Rows: 2 + +CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy with_filename FROM '/tmp/demo/export/json/demo.json' with (format='json'); + +Affected Rows: 2 + +select * from with_filename order by ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | ++-------+------+--------+---------------------+ + +CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy with_path FROM '/tmp/demo/export/json/' with (format='json'); + +Affected Rows: 2 + +select * from with_path order by ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | ++-------+------+--------+---------------------+ + +CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); + +Affected Rows: 0 + +Copy with_pattern FROM '/tmp/demo/export/json/' WITH (pattern = 'demo.*',format='json'); + +Affected Rows: 2 + +select * from with_pattern order by ts; + ++-------+------+--------+---------------------+ +| host | cpu | memory | ts | ++-------+------+--------+---------------------+ +| host1 | 66.6 | 1024.0 | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | ++-------+------+--------+---------------------+ + +drop table demo; + +Affected Rows: 1 + +drop table with_filename; + +Affected Rows: 1 + +drop table with_path; + +Affected Rows: 1 + +drop table with_pattern; + +Affected Rows: 1 + diff --git a/tests/cases/standalone/common/copy/copy_from_fs_json.sql b/tests/cases/standalone/common/copy/copy_from_fs_json.sql new file mode 100644 index 0000000000..e1f751bd83 --- /dev/null +++ b/tests/cases/standalone/common/copy/copy_from_fs_json.sql @@ -0,0 +1,31 @@ +CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); + +insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); + +Copy demo TO '/tmp/demo/export/json/demo.json' with (format='json'); + +CREATE TABLE with_filename(host string, cpu double, memory double, ts timestamp time index); + +Copy with_filename FROM '/tmp/demo/export/json/demo.json' with (format='json'); + +select * from with_filename order by ts; + +CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); + +Copy with_path FROM '/tmp/demo/export/json/' with (format='json'); + +select * from with_path order by ts; + +CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); + +Copy with_pattern FROM '/tmp/demo/export/json/' WITH (pattern = 'demo.*',format='json'); + +select * from with_pattern order by ts; + +drop table demo; + +drop table with_filename; + +drop table with_path; + +drop table with_pattern; diff --git a/tests/cases/standalone/common/copy/copy_to_fs.result b/tests/cases/standalone/common/copy/copy_to_fs.result index 698f4f6cde..b8c16dc565 100644 --- a/tests/cases/standalone/common/copy/copy_to_fs.result +++ b/tests/cases/standalone/common/copy/copy_to_fs.result @@ -10,6 +10,14 @@ Copy demo TO '/tmp/export/demo.parquet'; Affected Rows: 2 +Copy demo TO '/tmp/export/demo.csv' with (format='csv'); + +Affected Rows: 2 + +Copy demo TO '/tmp/export/demo.json' with (format='json'); + +Affected Rows: 2 + drop table demo; Affected Rows: 1 diff --git a/tests/cases/standalone/common/copy/copy_to_fs.sql b/tests/cases/standalone/common/copy/copy_to_fs.sql index 0fb6c713cc..2329dda7ca 100644 --- a/tests/cases/standalone/common/copy/copy_to_fs.sql +++ b/tests/cases/standalone/common/copy/copy_to_fs.sql @@ -4,4 +4,8 @@ insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 16552765570 Copy demo TO '/tmp/export/demo.parquet'; +Copy demo TO '/tmp/export/demo.csv' with (format='csv'); + +Copy demo TO '/tmp/export/demo.json' with (format='json'); + drop table demo;