diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index 3dd62e8d95..3c24a4b7e2 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -17,6 +17,7 @@ use std::io::BufReader; use std::str::FromStr; use arrow::json; +use arrow::json::WriterBuilder; use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator}; use arrow::json::writer::LineDelimited; use arrow::record_batch::RecordBatch; @@ -36,10 +37,13 @@ use crate::file_format::{self, FileFormat, stream_to_file}; use crate::share_buffer::SharedBuffer; use crate::util::normalize_infer_schema; -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct JsonFormat { pub schema_infer_max_record: Option, pub compression_type: CompressionType, + pub timestamp_format: Option, + pub time_format: Option, + pub date_format: Option, } impl TryFrom<&HashMap> for JsonFormat { @@ -62,6 +66,9 @@ impl TryFrom<&HashMap> for JsonFormat { .build() })?); }; + format.timestamp_format = value.get(file_format::TIMESTAMP_FORMAT).cloned(); + format.time_format = value.get(file_format::TIME_FORMAT).cloned(); + format.date_format = value.get(file_format::DATE_FORMAT).cloned(); Ok(format) } } @@ -71,6 +78,9 @@ impl Default for JsonFormat { Self { schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD), compression_type: CompressionType::Uncompressed, + timestamp_format: None, + time_format: None, + date_format: None, } } } @@ -125,10 +135,18 @@ pub async fn stream_to_json( threshold, concurrency, format.compression_type, - |b| { - arrow::json::writer::WriterBuilder::new() - .with_explicit_nulls(true) - .build::<_, LineDelimited>(b) + |buffer| { + let mut builder = WriterBuilder::new().with_explicit_nulls(true); + if let Some(timestamp_format) = &format.timestamp_format { + builder = builder.with_timestamp_format(timestamp_format.to_owned()); + } + if let Some(time_format) = &format.time_format { + builder = builder.with_time_format(time_format.to_owned()); + } + if let Some(date_format) = &format.date_format { + builder = builder.with_date_format(date_format.to_owned()); + } + builder.build::<_, LineDelimited>(buffer) }, ) .await @@ -236,6 +254,7 @@ mod tests { JsonFormat { compression_type: CompressionType::Zstd, schema_infer_max_record: Some(2000), + ..JsonFormat::default() } ); } diff --git a/src/common/datasource/src/file_format/tests.rs b/src/common/datasource/src/file_format/tests.rs index 29a23c07da..50570d6d3c 100644 --- a/src/common/datasource/src/file_format/tests.rs +++ b/src/common/datasource/src/file_format/tests.rs @@ -34,7 +34,7 @@ use object_store::ObjectStore; use super::FORMAT_TYPE; use crate::file_format::parquet::DefaultParquetFileReaderFactory; use crate::file_format::{FileFormat, Format, OrcFormat}; -use crate::test_util::{csv_basic_schema, scan_config, test_basic_schema, test_store}; +use crate::test_util::{basic_schema_with_time_format, scan_config, test_basic_schema, test_store}; use crate::{error, test_util}; struct Test<'a> { @@ -66,35 +66,36 @@ impl Test<'_> { #[tokio::test] async fn test_json_opener() { let store = test_store("/"); - let schema = test_basic_schema(); + let schema = basic_schema_with_time_format(); let file_source = Arc::new(JsonSource::new(schema)).with_batch_size(test_util::TEST_BATCH_SIZE); let path = &find_workspace_path("/src/common/datasource/tests/json/basic.json") .display() .to_string(); + let tests = [ Test { config: scan_config(None, path, file_source.clone()), file_source: file_source.clone(), expected: vec![ - "+-----+-------+", - "| num | str |", - "+-----+-------+", - "| 5 | test |", - "| 2 | hello |", - "| 4 | foo |", - "+-----+-------+", + "+-----+-------+---------------------+----------+------------+", + "| num | str | ts | t | date |", + "+-----+-------+---------------------+----------+------------+", + "| 5 | test | 2023-04-01T00:00:00 | 00:00:10 | 2023-04-01 |", + "| 2 | hello | 2023-04-01T00:00:00 | 00:00:20 | 2023-04-01 |", + "| 4 | foo | 2023-04-01T00:00:00 | 00:00:30 | 2023-04-01 |", + "+-----+-------+---------------------+----------+------------+", ], }, Test { config: scan_config(Some(1), path, file_source.clone()), file_source, expected: vec![ - "+-----+------+", - "| num | str |", - "+-----+------+", - "| 5 | test |", - "+-----+------+", + "+-----+------+---------------------+----------+------------+", + "| num | str | ts | t | date |", + "+-----+------+---------------------+----------+------------+", + "| 5 | test | 2023-04-01T00:00:00 | 00:00:10 | 2023-04-01 |", + "+-----+------+---------------------+----------+------------+", ], }, ]; @@ -107,7 +108,7 @@ async fn test_json_opener() { #[tokio::test] async fn test_csv_opener() { let store = test_store("/"); - let schema = csv_basic_schema(); + let schema = basic_schema_with_time_format(); let path = &find_workspace_path("/src/common/datasource/tests/csv/basic.csv") .display() .to_string(); diff --git a/src/common/datasource/src/test_util.rs b/src/common/datasource/src/test_util.rs index f647d424ad..88f0dc37ad 100644 --- a/src/common/datasource/src/test_util.rs +++ b/src/common/datasource/src/test_util.rs @@ -68,7 +68,7 @@ pub fn test_basic_schema() -> SchemaRef { Arc::new(schema) } -pub fn csv_basic_schema() -> SchemaRef { +pub fn basic_schema_with_time_format() -> SchemaRef { let schema = Schema::new(vec![ Field::new("num", DataType::Int64, false), Field::new("str", DataType::Utf8, false), @@ -97,7 +97,7 @@ pub(crate) fn scan_config( pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usize) -> usize) { let store = test_store("/"); - let schema = test_basic_schema(); + let schema = basic_schema_with_time_format(); let json_opener = JsonOpener::new( test_util::TEST_BATCH_SIZE, @@ -148,7 +148,7 @@ pub async fn setup_stream_to_csv_test( ) { let store = test_store("/"); - let schema = csv_basic_schema(); + let schema = basic_schema_with_time_format(); let csv_source = CsvSource::new(schema).with_batch_size(TEST_BATCH_SIZE); let config = scan_config(None, origin_path, csv_source.clone()); diff --git a/src/common/datasource/tests/json/basic.json b/src/common/datasource/tests/json/basic.json index 8abf0d6f83..ccabaf6f4f 100644 --- a/src/common/datasource/tests/json/basic.json +++ b/src/common/datasource/tests/json/basic.json @@ -1,3 +1,3 @@ -{"num":5,"str":"test"} -{"num":2,"str":"hello"} -{"num":4,"str":"foo"} \ No newline at end of file +{"num":5,"str":"test","ts":"2023-04-01T00:00:00","t":"00:00:10","date":"2023-04-01"} +{"num":2,"str":"hello","ts":"2023-04-01T00:00:00","t":"00:00:20","date":"2023-04-01"} +{"num":4,"str":"foo","ts":"2023-04-01T00:00:00","t":"00:00:30","date":"2023-04-01"} \ No newline at end of file diff --git a/tests/cases/standalone/common/copy/copy_to_fs.result b/tests/cases/standalone/common/copy/copy_to_fs.result index 6f5632ac60..9cd9b6a4f7 100644 --- a/tests/cases/standalone/common/copy/copy_to_fs.result +++ b/tests/cases/standalone/common/copy/copy_to_fs.result @@ -38,6 +38,10 @@ COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQL Affected Rows: 1 +COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d'); + +Affected Rows: 1 + drop table demo; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_to_fs.sql b/tests/cases/standalone/common/copy/copy_to_fs.sql index 5d2bf72c71..4246136902 100644 --- a/tests/cases/standalone/common/copy/copy_to_fs.sql +++ b/tests/cases/standalone/common/copy/copy_to_fs.sql @@ -18,4 +18,6 @@ COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_ COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d'); +COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d'); + drop table demo;