diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index fad03da2ee..b6d4d6c30a 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -54,8 +54,11 @@ pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record"; pub const FORMAT_HAS_HEADER: &str = "has_header"; pub const FORMAT_TYPE: &str = "format"; pub const FILE_PATTERN: &str = "pattern"; +pub const TIMESTAMP_FORMAT: &str = "timestamp_format"; +pub const TIME_FORMAT: &str = "time_format"; +pub const DATE_FORMAT: &str = "date_format"; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub enum Format { Csv(CsvFormat), Json(JsonFormat), diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index 9b817d35e3..efffce8d12 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -15,8 +15,8 @@ use std::collections::HashMap; use std::str::FromStr; -use arrow::csv; use arrow::csv::reader::Format; +use arrow::csv::{self, WriterBuilder}; use arrow::record_batch::RecordBatch; use arrow_schema::Schema; use async_trait::async_trait; @@ -33,12 +33,15 @@ use crate::error::{self, Result}; use crate::file_format::{self, FileFormat, stream_to_file}; use crate::share_buffer::SharedBuffer; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct CsvFormat { pub has_header: bool, pub delimiter: u8, pub schema_infer_max_record: Option, pub compression_type: CompressionType, + pub timestamp_format: Option, + pub time_format: Option, + pub date_format: Option, } impl TryFrom<&HashMap> for CsvFormat { @@ -79,6 +82,15 @@ impl TryFrom<&HashMap> for CsvFormat { } .build() })?; + }; + if let Some(timestamp_format) = value.get(file_format::TIMESTAMP_FORMAT) { + format.timestamp_format = Some(timestamp_format.clone()); + } + if let Some(time_format) = value.get(file_format::TIME_FORMAT) { + format.time_format = Some(time_format.clone()); + } + if let Some(date_format) = value.get(file_format::DATE_FORMAT) { + format.date_format = Some(date_format.clone()); } Ok(format) } @@ -91,6 +103,9 @@ impl Default for CsvFormat { delimiter: b',', schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD), compression_type: CompressionType::Uncompressed, + timestamp_format: None, + time_format: None, + date_format: None, } } } @@ -140,9 +155,20 @@ pub async fn stream_to_csv( path: &str, threshold: usize, concurrency: usize, + format: &CsvFormat, ) -> Result { stream_to_file(stream, store, path, threshold, concurrency, |buffer| { - csv::Writer::new(buffer) + let mut builder = WriterBuilder::new(); + if let Some(timestamp_format) = &format.timestamp_format { + builder = builder.with_timestamp_format(timestamp_format.to_owned()) + } + if let Some(date_format) = &format.date_format { + builder = builder.with_date_format(date_format.to_owned()) + } + if let Some(time_format) = &format.time_format { + builder = builder.with_time_format(time_format.to_owned()) + } + builder.build(buffer) }) .await } @@ -265,6 +291,9 @@ mod tests { schema_infer_max_record: Some(2000), delimiter: b'\t', has_header: false, + timestamp_format: None, + time_format: None, + date_format: None } ); } diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index d44f0f70fa..d3e5adf5a3 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -34,7 +34,7 @@ use object_store::ObjectStore; use super::FORMAT_TYPE; use crate::file_format::parquet::DefaultParquetFileReaderFactory; use crate::file_format::{FileFormat, Format, OrcFormat}; -use crate::test_util::{scan_config, test_basic_schema, test_store}; +use crate::test_util::{csv_basic_schema, scan_config, test_basic_schema, test_store}; use crate::{error, test_util}; struct Test<'a> { @@ -107,7 +107,7 @@ async fn test_json_opener() { #[tokio::test] async fn test_csv_opener() { let store = test_store("/"); - let schema = test_basic_schema(); + let schema = csv_basic_schema(); let path = &find_workspace_path("/src/common/datasource/tests/csv/basic.csv") .display() .to_string(); @@ -121,24 +121,24 @@ async fn test_csv_opener() { config: scan_config(schema.clone(), None, path, file_source.clone()), file_source: file_source.clone(), expected: vec![ - "+-----+-------+", - "| num | str |", - "+-----+-------+", - "| 5 | test |", - "| 2 | hello |", - "| 4 | foo |", - "+-----+-------+", + "+-----+-------+---------------------+----------+------------+", + "| num | str | ts | t | date |", + "+-----+-------+---------------------+----------+------------+", + "| 5 | test | 2023-04-01T00:00:00 | 00:00:10 | 2023-04-01 |", + "| 2 | hello | 2023-04-01T00:00:00 | 00:00:20 | 2023-04-01 |", + "| 4 | foo | 2023-04-01T00:00:00 | 00:00:30 | 2023-04-01 |", + "+-----+-------+---------------------+----------+------------+", ], }, Test { config: scan_config(schema, Some(1), path, file_source.clone()), file_source, expected: vec![ - "+-----+------+", - "| num | str |", - "+-----+------+", - "| 5 | test |", - "+-----+------+", + "+-----+------+---------------------+----------+------------+", + "| num | str | ts | t | date |", + "+-----+------+---------------------+----------+------------+", + "| 5 | test | 2023-04-01T00:00:00 | 00:00:10 | 2023-04-01 |", + "+-----+------+---------------------+----------+------------+", ], }, ]; diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index 5f9bffe5a1..0813030fa3 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit}; use common_test_util::temp_dir::{TempDir, create_temp_dir}; use datafusion::datasource::file_format::file_compression_type::FileCompressionType; use datafusion::datasource::listing::PartitionedFile; @@ -27,7 +27,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use object_store::ObjectStore; use object_store::services::Fs; -use crate::file_format::csv::stream_to_csv; +use crate::file_format::csv::{CsvFormat, stream_to_csv}; use crate::file_format::json::stream_to_json; use crate::test_util; @@ -68,6 +68,17 @@ pub fn test_basic_schema() -> SchemaRef { Arc::new(schema) } +pub fn csv_basic_schema() -> SchemaRef { + let schema = Schema::new(vec![ + Field::new("num", DataType::Int64, false), + Field::new("str", DataType::Utf8, false), + Field::new("ts", DataType::Timestamp(TimeUnit::Second, None), false), + Field::new("t", DataType::Time32(TimeUnit::Second), false), + Field::new("date", DataType::Date32, false), + ]); + Arc::new(schema) +} + pub(crate) fn scan_config( file_schema: SchemaRef, limit: Option, @@ -128,10 +139,14 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi assert_eq_lines(written.to_vec(), origin.to_vec()); } -pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) { +pub async fn setup_stream_to_csv_test( + origin_path: &str, + format_path: &str, + threshold: impl Fn(usize) -> usize, +) { let store = test_store("/"); - let schema = test_basic_schema(); + let schema = csv_basic_schema(); let csv_source = CsvSource::new(true, b',', b'"') .with_schema(schema.clone()) @@ -150,21 +165,29 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz let output_path = format!("{}/{}", dir.path().display(), "output"); + let csv_format = CsvFormat { + timestamp_format: Some("%m-%d-%Y".to_string()), + date_format: Some("%m-%d-%Y".to_string()), + time_format: Some("%Ss".to_string()), + ..Default::default() + }; + assert!( stream_to_csv( Box::pin(stream), tmp_store.clone(), &output_path, threshold(size), - 8 + 8, + &csv_format, ) .await .is_ok() ); let written = tmp_store.read(&output_path).await.unwrap(); - let origin = store.read(origin_path).await.unwrap(); - assert_eq_lines(written.to_vec(), origin.to_vec()); + let format_expect = store.read(format_path).await.unwrap(); + assert_eq_lines(written.to_vec(), format_expect.to_vec()); } // Ignore the CRLF difference across operating systems. diff --git a/src/common/datasource/src/tests.rs b/src/common/datasource/src/tests.rs index f6391028bf..8be9add7c8 100644 --- a/src/common/datasource/src/tests.rs +++ b/src/common/datasource/src/tests.rs @@ -37,11 +37,15 @@ async fn test_stream_to_csv() { .display() .to_string(); + let format_path = &find_workspace_path("/src/common/datasource/tests/csv/basic_format.csv") + .display() + .to_string(); + // A small threshold // Triggers the flush each writes - test_util::setup_stream_to_csv_test(origin_path, |size| size / 2).await; + test_util::setup_stream_to_csv_test(origin_path, format_path, |size| size / 2).await; // A large threshold // Only triggers the flush at last - test_util::setup_stream_to_csv_test(origin_path, |size| size * 2).await; + test_util::setup_stream_to_csv_test(origin_path, format_path, |size| size * 2).await; } diff --git a/src/common/datasource/tests/csv/basic.csv b/src/common/datasource/tests/csv/basic.csv index 77051beb27..85c8ef1ccf 100644 --- a/src/common/datasource/tests/csv/basic.csv +++ b/src/common/datasource/tests/csv/basic.csv @@ -1,4 +1,4 @@ -num,str -5,test -2,hello -4,foo \ No newline at end of file +num,str,ts,t,date +5,test,2023-04-01 00:00:00,10,2023-04-01 +2,hello,2023-04-01 00:00:00,20,2023-04-01 +4,foo,2023-04-01 00:00:00,30,2023-04-01 diff --git a/src/common/datasource/tests/csv/basic_format.csv b/src/common/datasource/tests/csv/basic_format.csv new file mode 100644 index 0000000000..fbc60e8491 --- /dev/null +++ b/src/common/datasource/tests/csv/basic_format.csv @@ -0,0 +1,4 @@ +num,str,ts,t,date +5,test,04-01-2023,10s,04-01-2023 +2,hello,04-01-2023,20s,04-01-2023 +4,foo,04-01-2023,30s,04-01-2023 diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index ddf370e7c7..b1006b0ba3 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -864,6 +864,18 @@ pub enum Error { location: Location, }, + #[snafu(display( + "{} not supported when transforming to {} format type", + format, + file_format + ))] + TimestampFormatNotSupported { + file_format: String, + format: String, + #[snafu(implicit)] + location: Location, + }, + #[cfg(feature = "enterprise")] #[snafu(display("Too large duration"))] TooLargeDuration { @@ -1002,6 +1014,7 @@ impl ErrorExt for Error { Error::InvalidProcessId { .. } => StatusCode::InvalidArguments, Error::ProcessManagerMissing { .. } => StatusCode::Unexpected, Error::PathNotFound { .. } => StatusCode::InvalidArguments, + Error::TimestampFormatNotSupported { .. } => StatusCode::InvalidArguments, Error::SqlCommon { source, .. } => source.status_code(), } } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index 9434cc37e9..ab0f94be80 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -640,6 +640,41 @@ fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result Result<()> { + let time_format = with.get(common_datasource::file_format::TIME_FORMAT); + let date_format = with.get(common_datasource::file_format::DATE_FORMAT); + let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT); + let file_format = with.get(common_datasource::file_format::FORMAT_TYPE); + + if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) { + ensure!( + time_format.is_none() && date_format.is_none() && timestamp_format.is_none(), + error::TimestampFormatNotSupportedSnafu { + format: "".to_string(), + file_format: file_format.cloned().unwrap_or_default(), + } + ); + } + + for (key, format_opt) in [ + (common_datasource::file_format::TIME_FORMAT, time_format), + (common_datasource::file_format::DATE_FORMAT, date_format), + ( + common_datasource::file_format::TIMESTAMP_FORMAT, + timestamp_format, + ), + ] { + if let Some(format) = format_opt { + chrono::format::strftime::StrftimeItems::new(format) + .parse() + .map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?; + } + } + + Ok(()) +} + fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result { let direction = match stmt { CopyTable::To(_) => CopyDirection::Export, @@ -664,6 +699,8 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result< let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?; + verify_time_related_format(&with)?; + let pattern = with .get(common_datasource::file_format::FILE_PATTERN) .cloned(); @@ -828,7 +865,7 @@ mod tests { use crate::statement::copy_database::{ COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY, }; - use crate::statement::timestamp_range_from_option_map; + use crate::statement::{timestamp_range_from_option_map, verify_time_related_format}; fn check_timestamp_range((start, end): (&str, &str)) -> error::Result> { let query_ctx = QueryContextBuilder::default() @@ -864,4 +901,62 @@ mod tests { error::Error::InvalidTimestampRange { .. } ); } + + #[test] + fn test_verify_timestamp_format() { + let map = OptionMap::from( + [ + ( + common_datasource::file_format::TIMESTAMP_FORMAT.to_string(), + "%Y-%m-%d %H:%M:%S".to_string(), + ), + ( + common_datasource::file_format::FORMAT_TYPE.to_string(), + "csv".to_string(), + ), + ] + .into_iter() + .collect::>(), + ); + assert!(verify_time_related_format(&map).is_ok()); + + let map = OptionMap::from( + [ + ( + common_datasource::file_format::TIMESTAMP_FORMAT.to_string(), + "%Y-%m-%d %H:%M:%S".to_string(), + ), + ( + common_datasource::file_format::FORMAT_TYPE.to_string(), + "json".to_string(), + ), + ] + .into_iter() + .collect::>(), + ); + + assert_matches!( + verify_time_related_format(&map).unwrap_err(), + error::Error::TimestampFormatNotSupported { .. } + ); + let map = OptionMap::from( + [ + ( + common_datasource::file_format::TIMESTAMP_FORMAT.to_string(), + "%111112".to_string(), + ), + ( + common_datasource::file_format::FORMAT_TYPE.to_string(), + "csv".to_string(), + ), + ] + .into_iter() + .collect::>(), + ); + + assert_matches!( + verify_time_related_format(&map).unwrap_err(), + error::Error::InvalidCopyParameter { .. } + ); + } } diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 550bdf52a6..3a6a9d15c6 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -386,7 +386,7 @@ impl StatementExecutor { } let path = entry.path(); let file_metadata = self - .collect_metadata(&object_store, format, path.to_string()) + .collect_metadata(&object_store, format.clone(), path.to_string()) .await?; let file_schema = file_metadata.schema(); diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index 00261139b1..d542f8acbb 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -66,12 +66,13 @@ impl StatementExecutor { map_json_type_to_string_schema, )); match format { - Format::Csv(_) => stream_to_csv( + Format::Csv(format) => stream_to_csv( Box::pin(DfRecordBatchStreamAdapter::new(stream)), object_store, path, threshold, WRITE_CONCURRENCY, + format, ) .await .context(error::WriteStreamToFileSnafu { path }), @@ -96,7 +97,10 @@ impl StatementExecutor { .await .context(error::WriteStreamToFileSnafu { path }) } - _ => error::UnsupportedFormatSnafu { format: *format }.fail(), + _ => error::UnsupportedFormatSnafu { + format: format.clone(), + } + .fail(), } } diff --git a/src/sql/src/statements/copy.rs b/src/sql/src/statements/copy.rs index 48f13f1fb2..8c18a836f9 100644 --- a/src/sql/src/statements/copy.rs +++ b/src/sql/src/statements/copy.rs @@ -160,6 +160,12 @@ impl CopyTableArgument { .get(common_datasource::file_format::FILE_PATTERN) .cloned() } + + pub fn timestamp_pattern(&self) -> Option { + self.with + .get(common_datasource::file_format::TIMESTAMP_FORMAT) + .cloned() + } } #[cfg(test)] diff --git a/tests/cases/standalone/common/copy/copy_to_fs.result b/tests/cases/standalone/common/copy/copy_to_fs.result index eed4281806..6f5632ac60 100644 --- a/tests/cases/standalone/common/copy/copy_to_fs.result +++ b/tests/cases/standalone/common/copy/copy_to_fs.result @@ -1,4 +1,4 @@ -CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX); +CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX, ts2 DATE DEFAULT '2022-06-15'); Affected Rows: 0 @@ -34,6 +34,10 @@ COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_ Affected Rows: 1 +COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d'); + +Affected Rows: 1 + drop table demo; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_to_fs.sql b/tests/cases/standalone/common/copy/copy_to_fs.sql index 405ea87518..5d2bf72c71 100644 --- a/tests/cases/standalone/common/copy/copy_to_fs.sql +++ b/tests/cases/standalone/common/copy/copy_to_fs.sql @@ -1,4 +1,4 @@ -CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX); +CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX, ts2 DATE DEFAULT '2022-06-15'); insert into demo(host, cpu, memory, jsons, ts) values ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000); @@ -16,4 +16,6 @@ COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_ COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json'); +COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d'); + drop table demo;