feat(copy_to_csv): add date_format/timestamp_format/time_format. (#6995)

feat(copy_to_csv): add `date_format` and so on to `Copy ... to with` syntax

Signed-off-by: Yihai Lin <yihai-lin@foxmail.com>
This commit is contained in:
Lin Yihai
2025-09-24 14:22:53 +08:00
committed by GitHub
parent c7050831db
commit b5a8725582
14 changed files with 224 additions and 37 deletions

View File

@@ -54,8 +54,11 @@ pub const FORMAT_SCHEMA_INFER_MAX_RECORD: &str = "schema_infer_max_record";
pub const FORMAT_HAS_HEADER: &str = "has_header";
pub const FORMAT_TYPE: &str = "format";
pub const FILE_PATTERN: &str = "pattern";
pub const TIMESTAMP_FORMAT: &str = "timestamp_format";
pub const TIME_FORMAT: &str = "time_format";
pub const DATE_FORMAT: &str = "date_format";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Format {
Csv(CsvFormat),
Json(JsonFormat),

View File

@@ -15,8 +15,8 @@
use std::collections::HashMap;
use std::str::FromStr;
use arrow::csv;
use arrow::csv::reader::Format;
use arrow::csv::{self, WriterBuilder};
use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use async_trait::async_trait;
@@ -33,12 +33,15 @@ use crate::error::{self, Result};
use crate::file_format::{self, FileFormat, stream_to_file};
use crate::share_buffer::SharedBuffer;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CsvFormat {
pub has_header: bool,
pub delimiter: u8,
pub schema_infer_max_record: Option<usize>,
pub compression_type: CompressionType,
pub timestamp_format: Option<String>,
pub time_format: Option<String>,
pub date_format: Option<String>,
}
impl TryFrom<&HashMap<String, String>> for CsvFormat {
@@ -79,6 +82,15 @@ impl TryFrom<&HashMap<String, String>> for CsvFormat {
}
.build()
})?;
};
if let Some(timestamp_format) = value.get(file_format::TIMESTAMP_FORMAT) {
format.timestamp_format = Some(timestamp_format.clone());
}
if let Some(time_format) = value.get(file_format::TIME_FORMAT) {
format.time_format = Some(time_format.clone());
}
if let Some(date_format) = value.get(file_format::DATE_FORMAT) {
format.date_format = Some(date_format.clone());
}
Ok(format)
}
@@ -91,6 +103,9 @@ impl Default for CsvFormat {
delimiter: b',',
schema_infer_max_record: Some(file_format::DEFAULT_SCHEMA_INFER_MAX_RECORD),
compression_type: CompressionType::Uncompressed,
timestamp_format: None,
time_format: None,
date_format: None,
}
}
}
@@ -140,9 +155,20 @@ pub async fn stream_to_csv(
path: &str,
threshold: usize,
concurrency: usize,
format: &CsvFormat,
) -> Result<usize> {
stream_to_file(stream, store, path, threshold, concurrency, |buffer| {
csv::Writer::new(buffer)
let mut builder = WriterBuilder::new();
if let Some(timestamp_format) = &format.timestamp_format {
builder = builder.with_timestamp_format(timestamp_format.to_owned())
}
if let Some(date_format) = &format.date_format {
builder = builder.with_date_format(date_format.to_owned())
}
if let Some(time_format) = &format.time_format {
builder = builder.with_time_format(time_format.to_owned())
}
builder.build(buffer)
})
.await
}
@@ -265,6 +291,9 @@ mod tests {
schema_infer_max_record: Some(2000),
delimiter: b'\t',
has_header: false,
timestamp_format: None,
time_format: None,
date_format: None
}
);
}

View File

@@ -34,7 +34,7 @@ use object_store::ObjectStore;
use super::FORMAT_TYPE;
use crate::file_format::parquet::DefaultParquetFileReaderFactory;
use crate::file_format::{FileFormat, Format, OrcFormat};
use crate::test_util::{scan_config, test_basic_schema, test_store};
use crate::test_util::{csv_basic_schema, scan_config, test_basic_schema, test_store};
use crate::{error, test_util};
struct Test<'a> {
@@ -107,7 +107,7 @@ async fn test_json_opener() {
#[tokio::test]
async fn test_csv_opener() {
let store = test_store("/");
let schema = test_basic_schema();
let schema = csv_basic_schema();
let path = &find_workspace_path("/src/common/datasource/tests/csv/basic.csv")
.display()
.to_string();
@@ -121,24 +121,24 @@ async fn test_csv_opener() {
config: scan_config(schema.clone(), None, path, file_source.clone()),
file_source: file_source.clone(),
expected: vec![
"+-----+-------+",
"| num | str |",
"+-----+-------+",
"| 5 | test |",
"| 2 | hello |",
"| 4 | foo |",
"+-----+-------+",
"+-----+-------+---------------------+----------+------------+",
"| num | str | ts | t | date |",
"+-----+-------+---------------------+----------+------------+",
"| 5 | test | 2023-04-01T00:00:00 | 00:00:10 | 2023-04-01 |",
"| 2 | hello | 2023-04-01T00:00:00 | 00:00:20 | 2023-04-01 |",
"| 4 | foo | 2023-04-01T00:00:00 | 00:00:30 | 2023-04-01 |",
"+-----+-------+---------------------+----------+------------+",
],
},
Test {
config: scan_config(schema, Some(1), path, file_source.clone()),
file_source,
expected: vec![
"+-----+------+",
"| num | str |",
"+-----+------+",
"| 5 | test |",
"+-----+------+",
"+-----+------+---------------------+----------+------------+",
"| num | str | ts | t | date |",
"+-----+------+---------------------+----------+------------+",
"| 5 | test | 2023-04-01T00:00:00 | 00:00:10 | 2023-04-01 |",
"+-----+------+---------------------+----------+------------+",
],
},
];

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use common_test_util::temp_dir::{TempDir, create_temp_dir};
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::listing::PartitionedFile;
@@ -27,7 +27,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use object_store::ObjectStore;
use object_store::services::Fs;
use crate::file_format::csv::stream_to_csv;
use crate::file_format::csv::{CsvFormat, stream_to_csv};
use crate::file_format::json::stream_to_json;
use crate::test_util;
@@ -68,6 +68,17 @@ pub fn test_basic_schema() -> SchemaRef {
Arc::new(schema)
}
pub fn csv_basic_schema() -> SchemaRef {
let schema = Schema::new(vec![
Field::new("num", DataType::Int64, false),
Field::new("str", DataType::Utf8, false),
Field::new("ts", DataType::Timestamp(TimeUnit::Second, None), false),
Field::new("t", DataType::Time32(TimeUnit::Second), false),
Field::new("date", DataType::Date32, false),
]);
Arc::new(schema)
}
pub(crate) fn scan_config(
file_schema: SchemaRef,
limit: Option<usize>,
@@ -128,10 +139,14 @@ pub async fn setup_stream_to_json_test(origin_path: &str, threshold: impl Fn(usi
assert_eq_lines(written.to_vec(), origin.to_vec());
}
pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usize) -> usize) {
pub async fn setup_stream_to_csv_test(
origin_path: &str,
format_path: &str,
threshold: impl Fn(usize) -> usize,
) {
let store = test_store("/");
let schema = test_basic_schema();
let schema = csv_basic_schema();
let csv_source = CsvSource::new(true, b',', b'"')
.with_schema(schema.clone())
@@ -150,21 +165,29 @@ pub async fn setup_stream_to_csv_test(origin_path: &str, threshold: impl Fn(usiz
let output_path = format!("{}/{}", dir.path().display(), "output");
let csv_format = CsvFormat {
timestamp_format: Some("%m-%d-%Y".to_string()),
date_format: Some("%m-%d-%Y".to_string()),
time_format: Some("%Ss".to_string()),
..Default::default()
};
assert!(
stream_to_csv(
Box::pin(stream),
tmp_store.clone(),
&output_path,
threshold(size),
8
8,
&csv_format,
)
.await
.is_ok()
);
let written = tmp_store.read(&output_path).await.unwrap();
let origin = store.read(origin_path).await.unwrap();
assert_eq_lines(written.to_vec(), origin.to_vec());
let format_expect = store.read(format_path).await.unwrap();
assert_eq_lines(written.to_vec(), format_expect.to_vec());
}
// Ignore the CRLF difference across operating systems.

View File

@@ -37,11 +37,15 @@ async fn test_stream_to_csv() {
.display()
.to_string();
let format_path = &find_workspace_path("/src/common/datasource/tests/csv/basic_format.csv")
.display()
.to_string();
// A small threshold
// Triggers the flush each writes
test_util::setup_stream_to_csv_test(origin_path, |size| size / 2).await;
test_util::setup_stream_to_csv_test(origin_path, format_path, |size| size / 2).await;
// A large threshold
// Only triggers the flush at last
test_util::setup_stream_to_csv_test(origin_path, |size| size * 2).await;
test_util::setup_stream_to_csv_test(origin_path, format_path, |size| size * 2).await;
}

View File

@@ -1,4 +1,4 @@
num,str
5,test
2,hello
4,foo
num,str,ts,t,date
5,test,2023-04-01 00:00:00,10,2023-04-01
2,hello,2023-04-01 00:00:00,20,2023-04-01
4,foo,2023-04-01 00:00:00,30,2023-04-01
1 num str ts t date
2 5 test 2023-04-01 00:00:00 10 2023-04-01
3 2 hello 2023-04-01 00:00:00 20 2023-04-01
4 4 foo 2023-04-01 00:00:00 30 2023-04-01

View File

@@ -0,0 +1,4 @@
num,str,ts,t,date
5,test,04-01-2023,10s,04-01-2023
2,hello,04-01-2023,20s,04-01-2023
4,foo,04-01-2023,30s,04-01-2023
1 num str ts t date
2 5 test 04-01-2023 10s 04-01-2023
3 2 hello 04-01-2023 20s 04-01-2023
4 4 foo 04-01-2023 30s 04-01-2023

View File

@@ -864,6 +864,18 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"{} not supported when transforming to {} format type",
format,
file_format
))]
TimestampFormatNotSupported {
file_format: String,
format: String,
#[snafu(implicit)]
location: Location,
},
#[cfg(feature = "enterprise")]
#[snafu(display("Too large duration"))]
TooLargeDuration {
@@ -1002,6 +1014,7 @@ impl ErrorExt for Error {
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
Error::TimestampFormatNotSupported { .. } => StatusCode::InvalidArguments,
Error::SqlCommon { source, .. } => source.status_code(),
}
}

View File

@@ -640,6 +640,41 @@ fn to_copy_query_request(stmt: CopyQueryToArgument) -> Result<CopyQueryToRequest
})
}
// Verifies time related format is valid
fn verify_time_related_format(with: &OptionMap) -> Result<()> {
let time_format = with.get(common_datasource::file_format::TIME_FORMAT);
let date_format = with.get(common_datasource::file_format::DATE_FORMAT);
let timestamp_format = with.get(common_datasource::file_format::TIMESTAMP_FORMAT);
let file_format = with.get(common_datasource::file_format::FORMAT_TYPE);
if !matches!(file_format, Some(f) if f.eq_ignore_ascii_case("csv")) {
ensure!(
time_format.is_none() && date_format.is_none() && timestamp_format.is_none(),
error::TimestampFormatNotSupportedSnafu {
format: "<unknown>".to_string(),
file_format: file_format.cloned().unwrap_or_default(),
}
);
}
for (key, format_opt) in [
(common_datasource::file_format::TIME_FORMAT, time_format),
(common_datasource::file_format::DATE_FORMAT, date_format),
(
common_datasource::file_format::TIMESTAMP_FORMAT,
timestamp_format,
),
] {
if let Some(format) = format_opt {
chrono::format::strftime::StrftimeItems::new(format)
.parse()
.map_err(|_| error::InvalidCopyParameterSnafu { key, value: format }.build())?;
}
}
Ok(())
}
fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<CopyTableRequest> {
let direction = match stmt {
CopyTable::To(_) => CopyDirection::Export,
@@ -664,6 +699,8 @@ fn to_copy_table_request(stmt: CopyTable, query_ctx: QueryContextRef) -> Result<
let timestamp_range = timestamp_range_from_option_map(&with, &query_ctx)?;
verify_time_related_format(&with)?;
let pattern = with
.get(common_datasource::file_format::FILE_PATTERN)
.cloned();
@@ -828,7 +865,7 @@ mod tests {
use crate::statement::copy_database::{
COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY,
};
use crate::statement::timestamp_range_from_option_map;
use crate::statement::{timestamp_range_from_option_map, verify_time_related_format};
fn check_timestamp_range((start, end): (&str, &str)) -> error::Result<Option<TimestampRange>> {
let query_ctx = QueryContextBuilder::default()
@@ -864,4 +901,62 @@ mod tests {
error::Error::InvalidTimestampRange { .. }
);
}
#[test]
fn test_verify_timestamp_format() {
let map = OptionMap::from(
[
(
common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
"%Y-%m-%d %H:%M:%S".to_string(),
),
(
common_datasource::file_format::FORMAT_TYPE.to_string(),
"csv".to_string(),
),
]
.into_iter()
.collect::<HashMap<_, _>>(),
);
assert!(verify_time_related_format(&map).is_ok());
let map = OptionMap::from(
[
(
common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
"%Y-%m-%d %H:%M:%S".to_string(),
),
(
common_datasource::file_format::FORMAT_TYPE.to_string(),
"json".to_string(),
),
]
.into_iter()
.collect::<HashMap<_, _>>(),
);
assert_matches!(
verify_time_related_format(&map).unwrap_err(),
error::Error::TimestampFormatNotSupported { .. }
);
let map = OptionMap::from(
[
(
common_datasource::file_format::TIMESTAMP_FORMAT.to_string(),
"%111112".to_string(),
),
(
common_datasource::file_format::FORMAT_TYPE.to_string(),
"csv".to_string(),
),
]
.into_iter()
.collect::<HashMap<_, _>>(),
);
assert_matches!(
verify_time_related_format(&map).unwrap_err(),
error::Error::InvalidCopyParameter { .. }
);
}
}

View File

@@ -386,7 +386,7 @@ impl StatementExecutor {
}
let path = entry.path();
let file_metadata = self
.collect_metadata(&object_store, format, path.to_string())
.collect_metadata(&object_store, format.clone(), path.to_string())
.await?;
let file_schema = file_metadata.schema();

View File

@@ -66,12 +66,13 @@ impl StatementExecutor {
map_json_type_to_string_schema,
));
match format {
Format::Csv(_) => stream_to_csv(
Format::Csv(format) => stream_to_csv(
Box::pin(DfRecordBatchStreamAdapter::new(stream)),
object_store,
path,
threshold,
WRITE_CONCURRENCY,
format,
)
.await
.context(error::WriteStreamToFileSnafu { path }),
@@ -96,7 +97,10 @@ impl StatementExecutor {
.await
.context(error::WriteStreamToFileSnafu { path })
}
_ => error::UnsupportedFormatSnafu { format: *format }.fail(),
_ => error::UnsupportedFormatSnafu {
format: format.clone(),
}
.fail(),
}
}

View File

@@ -160,6 +160,12 @@ impl CopyTableArgument {
.get(common_datasource::file_format::FILE_PATTERN)
.cloned()
}
pub fn timestamp_pattern(&self) -> Option<String> {
self.with
.get(common_datasource::file_format::TIMESTAMP_FORMAT)
.cloned()
}
}
#[cfg(test)]

View File

@@ -1,4 +1,4 @@
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX);
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX, ts2 DATE DEFAULT '2022-06-15');
Affected Rows: 0
@@ -34,6 +34,10 @@ COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_
Affected Rows: 1
COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d');
Affected Rows: 1
drop table demo;
Affected Rows: 0

View File

@@ -1,4 +1,4 @@
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX);
CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX, ts2 DATE DEFAULT '2022-06-15');
insert into demo(host, cpu, memory, jsons, ts) values ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000);
@@ -16,4 +16,6 @@ COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_
COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json');
COPY (select host, cpu, jsons, ts, ts2 from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv', timestamp_format='%m-%d-%Y', date_format='%Y/%m/%d');
drop table demo;