feat(copy_to_json): add date_format/timestamp_format/time_format for JSON format copy (#7633)

* feat(copy_to_json): add `date_format`/`timestamp_format`/`time_format` for  JSON format.

Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>

* Update src/common/datasource/src/file_format/json.rs

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>

* chore: Use predefined constants as the time format.

Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>

---------

Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Lin Yihai
2026-02-04 21:30:41 +08:00
committed by GitHub
parent 26f7c12ffd
commit 64ac9b60e9
6 changed files with 52 additions and 26 deletions

View File

@@ -17,6 +17,7 @@ use std::io::BufReader;
use std::str::FromStr;
use arrow::json;
use arrow::json::WriterBuilder;
use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator};
use arrow::json::writer::LineDelimited;
use arrow::record_batch::RecordBatch;
@@ -36,10 +37,13 @@ use crate::file_format::{self, FileFormat, stream_to_file};
use crate::share_buffer::SharedBuffer;
use crate::util::normalize_infer_schema;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct JsonFormat {
pub schema_infer_max_record: Option<usize>,
pub compression_type: CompressionType,
pub timestamp_format: Option<String>,
pub time_format: Option<String>,
pub date_format: Option<String>,
}
impl TryFrom<&HashMap<String, String>> for JsonFormat {
@@ -62,6 +66,9 @@ impl TryFrom<&HashMap<String, String>> for JsonFormat {
.build()
})?);
};
format.timestamp_format = value.get(file_format::TIMESTAMP_FORMAT).cloned();
format.time_format = value.get(file_format::TIME_FORMAT).cloned();
format.date_format = value.get(file_format::DATE_FORMAT).cloned();
Ok(format)
}
}
@@ -71,6 +78,9 @@ impl Default for JsonFormat {
Self {
schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
compression_type: CompressionType::Uncompressed,
timestamp_format: None,
time_format: None,
date_format: None,
}
}
}
@@ -125,10 +135,18 @@ pub async fn stream_to_json(
threshold,
concurrency,
format.compression_type,
|b| {
arrow::json::writer::WriterBuilder::new()
.with_explicit_nulls(true)
.build::<_, LineDelimited>(b)
|buffer| {
let mut builder = WriterBuilder::new().with_explicit_nulls(true);
if let Some(timestamp_format) = &format.timestamp_format {
builder = builder.with_timestamp_format(timestamp_format.to_owned());
}
if let Some(time_format) = &format.time_format {
builder = builder.with_time_format(time_format.to_owned());
}
if let Some(date_format) = &format.date_format {
builder = builder.with_date_format(date_format.to_owned());
}
builder.build::<_, LineDelimited>(buffer)
},
)
.await
@@ -236,6 +254,7 @@ mod tests {
JsonFormat {
compression_type: CompressionType::Zstd,
schema_infer_max_record: Some(2000),
..JsonFormat::default()
}
);
}

View File

@@ -34,7 +34,7 @@ use object_store::ObjectStore;
use super::FORMAT_TYPE;
use crate::file_format::parquet::DefaultParquetFileReaderFactory;
use crate::file_format::{FileFormat, Format, OrcFormat};
use crate::test_util::{csv_basic_schema, scan_config, test_basic_schema, test_store};
use crate::test_util::{basic_schema_with_time_format, scan_config, test_basic_schema, test_store};
use crate::{error, test_util};
struct Test<'a> {
@@ -66,35 +66,36 @@ impl Test<'_> {
#[tokio::test]
async fn test_json_opener() {
let store = test_store("/");
let schema = test_basic_schema();
let schema = basic_schema_with_time_format();
let file_source = Arc::new(JsonSource::new(schema)).with_batch_size(test_util::TEST_BATCH_SIZE);
let path = &find_workspace_path("/src/common/datasource/tests/json/basic.json")
.display()
.to_string();
let tests = [
Test {
config: scan_config(None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+",
"| num | str |",
"+-----+-------+",
"| 5 | test |",
"| 2 | hello |",
"| 4 | foo |",
"+-----+-------+",
"+-----+-------+---------------------+----------+------------+",
"| num | str | ts | t | date |",
"+-----+-------+---------------------+----------+------------+",
"| 5 | test | 2023-04-01T00:00:00 | 00:00:10 | 2023-04-01 |",
"| 2 | hello | 2023-04-01T00:00:00 | 00:00:20 | 2023-04-01 |",
"| 4 | foo | 2023-04-01T00:00:00 | 00:00:30 | 2023-04-01 |",
"+-----+-------+---------------------+----------+------------+",
],
},
Test {
config: scan_config(Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+",
"| num | str |",
"+-----+------+",
"| 5 | test |",
"+-----+------+",
"+-----+------+---------------------+----------+------------+",
"| num | str | ts | t | date |",
"+-----+------+---------------------+----------+------------+",
"| 5 | test | 2023-04-01T00:00:00 | 00:00:10 | 2023-04-01 |",
"+-----+------+---------------------+----------+------------+",
],
},
];
@@ -107,7 +108,7 @@ async fn test_json_opener() {
#[tokio::test]
async fn test_csv_opener() {
let store = test_store("/");
let schema = csv_basic_schema();
let schema = basic_schema_with_time_format();
let path = &find_workspace_path("/src/common/datasource/tests/csv/basic.csv")
.display()
.to_string();

View File

@@ -68,7 +68,7 @@ pub fn test_basic_schema() -> SchemaRef {
Arc::new(schema)
}
pub fn csv_basic_schema() -> SchemaRef {
pub fn basic_schema_with_time_format() -> SchemaRef {
let schema = Schema::new(vec![
Field::new("num", DataType::Int64, false),
Field::new("str", DataType::Utf8, false),
@@ -97,7 +97,7 @@ pub(crate) fn scan_config(
pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
let store = test_store("/");
let schema = test_basic_schema();
let schema = basic_schema_with_time_format();
let json_opener = JsonOpener::new(
test_util::TEST_BATCH_SIZE,
@@ -148,7 +148,7 @@ pub async fn setup_stream_to_csv_test(
) {
let store = test_store("/");
let schema = csv_basic_schema();
let schema = basic_schema_with_time_format();
let csv_source = CsvSource::new(schema).with_batch_size(TEST_BATCH_SIZE);
let config = scan_config(None, origin_path, csv_source.clone());

View File

@@ -1,3 +1,3 @@
{"num":5,"str":"test"}
{"num":2,"str":"hello"}
{"num":4,"str":"foo"}
{"num":5,"str":"test","ts":"2023-04-01T00:00:00","t":"00:00:10","date":"2023-04-01"}
{"num":2,"str":"hello","ts":"2023-04-01T00:00:00","t":"00:00:20","date":"2023-04-01"}
{"num":4,"str":"foo","ts":"2023-04-01T00:00:00","t":"00:00:30","date":"2023-04-01"}

View File

@@ -38,6 +38,10 @@ COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQL
Affected Rows: 1
COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d');
Affected Rows: 1
drop table demo;
Affected Rows: 0

View File

@@ -18,4 +18,6 @@ COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_
COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d');
COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d');
drop table demo;