diff --git a/src/common/recordbatch/src/error.rs b/src/common/recordbatch/src/error.rs index e0b66eb903..6c89d975e5 100644 --- a/src/common/recordbatch/src/error.rs +++ b/src/common/recordbatch/src/error.rs @@ -133,6 +133,18 @@ pub enum Error { source: datatypes::error::Error, }, + #[snafu(display( + "Failed to downcast vector of type '{:?}' to type '{:?}'", + from_type, + to_type + ))] + DowncastVector { + from_type: ConcreteDataType, + to_type: ConcreteDataType, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Error occurs when performing arrow computation"))] ArrowCompute { #[snafu(source)] @@ -192,6 +204,8 @@ impl ErrorExt for Error { | Error::PhysicalExpr { .. } | Error::RecordBatchSliceIndexOverflow { .. } => StatusCode::Internal, + Error::DowncastVector { .. } => StatusCode::Unexpected, + Error::PollStream { .. } => StatusCode::EngineExecuteQuery, Error::ArrowCompute { .. } => StatusCode::IllegalState, diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 0281b45749..863f384a03 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -30,13 +30,16 @@ pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecord use datatypes::arrow::compute::SortOptions; pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch; use datatypes::arrow::util::pretty; -use datatypes::prelude::VectorRef; -use datatypes::schema::{Schema, SchemaRef}; +use datatypes::prelude::{ConcreteDataType, VectorRef}; +use datatypes::scalars::{ScalarVector, ScalarVectorBuilder}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::types::json_type_value_to_string; +use datatypes::vectors::{BinaryVector, StringVectorBuilder}; use error::Result; use futures::task::{Context, Poll}; use futures::{Stream, TryStreamExt}; pub use recordbatch::RecordBatch; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; pub trait RecordBatchStream: Stream> { fn name(&self) -> &str { @@ -58,6 +61,146 @@ pub struct OrderOption { pub options: SortOptions, } +/// A wrapper that maps a [RecordBatchStream] to a new [RecordBatchStream] by applying a function to each [RecordBatch]. +/// +/// The mapper function is applied to each [RecordBatch] in the stream. +/// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function. +/// The output ordering of the new [RecordBatchStream] is the same as the output ordering of the inner [RecordBatchStream]. +/// The metrics of the new [RecordBatchStream] is the same as the metrics of the inner [RecordBatchStream] if it is not `None`. +pub struct SendableRecordBatchMapper { + inner: SendableRecordBatchStream, + /// The mapper function is applied to each [RecordBatch] in the stream. + /// The original schema and the mapped schema are passed to the mapper function. + mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result, + /// The schema of the new [RecordBatchStream] is the same as the schema of the inner [RecordBatchStream] after applying the schema mapper function. + schema: SchemaRef, + /// Whether the mapper function is applied to each [RecordBatch] in the stream. + apply_mapper: bool, +} + +/// Maps the json type to string in the batch. +/// +/// The json type is mapped to string by converting the json value to string. +/// The batch is updated to have the same number of columns as the original batch, +/// but with the json type mapped to string. +pub fn map_json_type_to_string( + batch: RecordBatch, + original_schema: &SchemaRef, + mapped_schema: &SchemaRef, +) -> Result { + let mut vectors = Vec::with_capacity(original_schema.column_schemas().len()); + for (vector, schema) in batch.columns.iter().zip(original_schema.column_schemas()) { + if let ConcreteDataType::Json(j) = schema.data_type { + let mut string_vector_builder = StringVectorBuilder::with_capacity(vector.len()); + let binary_vector = vector + .as_any() + .downcast_ref::() + .with_context(|| error::DowncastVectorSnafu { + from_type: schema.data_type.clone(), + to_type: ConcreteDataType::binary_datatype(), + })?; + for value in binary_vector.iter_data() { + let Some(value) = value else { + string_vector_builder.push(None); + continue; + }; + let string_value = + json_type_value_to_string(value, &j.format).with_context(|_| { + error::CastVectorSnafu { + from_type: schema.data_type.clone(), + to_type: ConcreteDataType::string_datatype(), + } + })?; + string_vector_builder.push(Some(string_value.as_str())); + } + + let string_vector = string_vector_builder.finish(); + vectors.push(Arc::new(string_vector) as VectorRef); + } else { + vectors.push(vector.clone()); + } + } + + RecordBatch::new(mapped_schema.clone(), vectors) +} + +/// Maps the json type to string in the schema. +/// +/// The json type is mapped to string by converting the json value to string. +/// The schema is updated to have the same number of columns as the original schema, +/// but with the json type mapped to string. +/// +/// Returns the new schema and whether the schema needs to be mapped to string. +pub fn map_json_type_to_string_schema(schema: SchemaRef) -> (SchemaRef, bool) { + let mut new_columns = Vec::with_capacity(schema.column_schemas().len()); + let mut apply_mapper = false; + for column in schema.column_schemas() { + if matches!(column.data_type, ConcreteDataType::Json(_)) { + new_columns.push(ColumnSchema::new( + column.name.to_string(), + ConcreteDataType::string_datatype(), + column.is_nullable(), + )); + apply_mapper = true; + } else { + new_columns.push(column.clone()); + } + } + (Arc::new(Schema::new(new_columns)), apply_mapper) +} + +impl SendableRecordBatchMapper { + /// Creates a new [SendableRecordBatchMapper] with the given inner [RecordBatchStream], mapper function, and schema mapper function. + pub fn new( + inner: SendableRecordBatchStream, + mapper: fn(RecordBatch, &SchemaRef, &SchemaRef) -> Result, + schema_mapper: fn(SchemaRef) -> (SchemaRef, bool), + ) -> Self { + let (mapped_schema, apply_mapper) = schema_mapper(inner.schema()); + Self { + inner, + mapper, + schema: mapped_schema, + apply_mapper, + } + } +} + +impl RecordBatchStream for SendableRecordBatchMapper { + fn name(&self) -> &str { + "SendableRecordBatchMapper" + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn output_ordering(&self) -> Option<&[OrderOption]> { + self.inner.output_ordering() + } + + fn metrics(&self) -> Option { + self.inner.metrics() + } +} + +impl Stream for SendableRecordBatchMapper { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.apply_mapper { + Pin::new(&mut self.inner).poll_next(cx).map(|opt| { + opt.map(|result| { + result + .and_then(|batch| (self.mapper)(batch, &self.inner.schema(), &self.schema)) + }) + }) + } else { + Pin::new(&mut self.inner).poll_next(cx) + } + } +} + /// EmptyRecordBatchStream can be used to create a RecordBatchStream /// that will produce no results pub struct EmptyRecordBatchStream { diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index 012bebbf4a..023641e83f 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -25,7 +25,10 @@ use common_datasource::object_store::{build_backend, parse_url}; use common_datasource::util::find_dir_and_filename; use common_query::Output; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; -use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::{ + map_json_type_to_string, map_json_type_to_string_schema, RecordBatchStream, + SendableRecordBatchMapper, SendableRecordBatchStream, +}; use common_telemetry::{debug, tracing}; use datafusion::datasource::DefaultTableSource; use datafusion_common::TableReference as DfTableReference; @@ -57,6 +60,11 @@ impl StatementExecutor { ) -> Result { let threshold = WRITE_BUFFER_THRESHOLD.as_bytes() as usize; + let stream = Box::pin(SendableRecordBatchMapper::new( + stream, + map_json_type_to_string, + map_json_type_to_string_schema, + )); match format { Format::Csv(_) => stream_to_csv( Box::pin(DfRecordBatchStreamAdapter::new(stream)), diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index f6d03a2a8f..a756156d0b 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -569,6 +569,7 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display("Convert SQL value error"))] ConvertSqlValue { source: datatypes::error::Error, diff --git a/tests/cases/standalone/common/copy/copy_from_fs_csv.result b/tests/cases/standalone/common/copy/copy_from_fs_csv.result index d1838a5079..28864ed460 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_csv.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_csv.result @@ -1,15 +1,21 @@ -CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); +CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index); Affected Rows: 0 +insert into + demo(host, cpu, memory, jsons, ts) +values + ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), + ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000); + +Affected Rows: 2 + insert into demo(host, cpu, memory, ts) values - ('host1', 66.6, 1024, 1655276557000), - ('host2', 88.8, 333.3, 1655276558000), ('host3', 99.9, 444.4, 1722077263000); -Affected Rows: 3 +Affected Rows: 1 Copy demo TO '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format='csv'); @@ -32,6 +38,44 @@ select * from with_filename order by ts; | host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | +-------+------+--------+---------------------+ +CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index); + +Affected Rows: 0 + +Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json'); + +Affected Rows: 3 + +select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts; + ++-------+------+--------+---------------------------------+---------------------+ +| host | cpu | memory | json_to_string(with_json.jsons) | ts | ++-------+------+--------+---------------------------------+---------------------+ +| host1 | 66.6 | 1024.0 | {"foo":"bar"} | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15T07:02:38 | +| host3 | 99.9 | 444.4 | | 2024-07-27T10:47:43 | ++-------+------+--------+---------------------------------+---------------------+ + +-- SQLNESS PROTOCOL MYSQL +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + ++-------+------+--------+------------------------+---------------------+ +| host | cpu | memory | jsons | ts | ++-------+------+--------+------------------------+---------------------+ +| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37 | +| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38 | ++-------+------+--------+------------------------+---------------------+ + +-- SQLNESS PROTOCOL POSTGRES +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + ++-------+------+--------+------------------------+----------------------------+ +| host | cpu | memory | jsons | ts | ++-------+------+--------+------------------------+----------------------------+ +| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37.000000 | +| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38.000000 | ++-------+------+--------+------------------------+----------------------------+ + CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 @@ -110,6 +154,10 @@ drop table with_filename; Affected Rows: 0 +drop table with_json; + +Affected Rows: 0 + drop table with_path; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_from_fs_csv.sql b/tests/cases/standalone/common/copy/copy_from_fs_csv.sql index 8c4e9b3aa9..7d3e712da3 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_csv.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_csv.sql @@ -1,10 +1,14 @@ -CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); +CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index); + +insert into + demo(host, cpu, memory, jsons, ts) +values + ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), + ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000); insert into demo(host, cpu, memory, ts) values - ('host1', 66.6, 1024, 1655276557000), - ('host2', 88.8, 333.3, 1655276558000), ('host3', 99.9, 444.4, 1722077263000); Copy demo TO '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format='csv'); @@ -15,6 +19,18 @@ Copy with_filename FROM '${SQLNESS_HOME}/demo/export/csv/demo.csv' with (format= select * from with_filename order by ts; +CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index); + +Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json'); + +select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts; + +-- SQLNESS PROTOCOL MYSQL +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + +-- SQLNESS PROTOCOL POSTGRES +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); Copy with_path FROM '${SQLNESS_HOME}/demo/export/csv/' with (format='csv', start_time='2023-06-15 07:02:37'); @@ -43,6 +59,8 @@ drop table demo; drop table with_filename; +drop table with_json; + drop table with_path; drop table with_pattern; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_json.result b/tests/cases/standalone/common/copy/copy_from_fs_json.result index 153fac1e87..0c415b1a84 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_json.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_json.result @@ -1,15 +1,21 @@ -CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); +CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index); Affected Rows: 0 insert into - demo(host, cpu, memory, ts) + demo(host, cpu, memory, jsons, ts) values - ('host1', 66.6, 1024, 1655276557000), - ('host2', 88.8, 333.3, 1655276558000), + ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), + ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000); + +Affected Rows: 2 + +insert into + demo(host, cpu, memory, ts) +values ('host3', 99.9, 444.4, 1722077263000); -Affected Rows: 3 +Affected Rows: 1 Copy demo TO '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json'); @@ -32,6 +38,44 @@ select * from with_filename order by ts; | host2 | 88.8 | 333.3 | 2022-06-15T07:02:38 | +-------+------+--------+---------------------+ +CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index); + +Affected Rows: 0 + +Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json'); + +Affected Rows: 3 + +select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts; + ++-------+------+--------+---------------------------------+---------------------+ +| host | cpu | memory | json_to_string(with_json.jsons) | ts | ++-------+------+--------+---------------------------------+---------------------+ +| host1 | 66.6 | 1024.0 | {"foo":"bar"} | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15T07:02:38 | +| host3 | 99.9 | 444.4 | | 2024-07-27T10:47:43 | ++-------+------+--------+---------------------------------+---------------------+ + +-- SQLNESS PROTOCOL MYSQL +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + ++-------+------+--------+------------------------+---------------------+ +| host | cpu | memory | jsons | ts | ++-------+------+--------+------------------------+---------------------+ +| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37 | +| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38 | ++-------+------+--------+------------------------+---------------------+ + +-- SQLNESS PROTOCOL POSTGRES +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + ++-------+------+--------+------------------------+----------------------------+ +| host | cpu | memory | jsons | ts | ++-------+------+--------+------------------------+----------------------------+ +| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37.000000 | +| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38.000000 | ++-------+------+--------+------------------------+----------------------------+ + CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 @@ -110,6 +154,10 @@ drop table with_filename; Affected Rows: 0 +drop table with_json; + +Affected Rows: 0 + drop table with_path; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_from_fs_json.sql b/tests/cases/standalone/common/copy/copy_from_fs_json.sql index b032332262..215877fae6 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_json.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_json.sql @@ -1,10 +1,14 @@ -CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); +CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index); insert into - demo(host, cpu, memory, ts) + demo(host, cpu, memory, jsons, ts) values - ('host1', 66.6, 1024, 1655276557000), - ('host2', 88.8, 333.3, 1655276558000), + ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), + ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000); + +insert into + demo(host, cpu, memory, ts) +values ('host3', 99.9, 444.4, 1722077263000); Copy demo TO '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json'); @@ -15,6 +19,18 @@ Copy with_filename FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (forma select * from with_filename order by ts; +CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index); + +Copy with_json FROM '${SQLNESS_HOME}/demo/export/json/demo.json' with (format='json'); + +select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts; + +-- SQLNESS PROTOCOL MYSQL +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + +-- SQLNESS PROTOCOL POSTGRES +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + CREATE TABLE with_path(host string, cpu double, memory double, ts timestamp time index); Copy with_path FROM '${SQLNESS_HOME}/demo/export/json/' with (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:39'); @@ -43,6 +59,8 @@ drop table demo; drop table with_filename; +drop table with_json; + drop table with_path; drop table with_pattern; diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result index ac7fbcddba..c377160f31 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.result +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.result @@ -1,4 +1,4 @@ -CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); +CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index); Affected Rows: 0 @@ -6,14 +6,20 @@ CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time in Affected Rows: 0 +insert into + demo(host, cpu, memory, jsons, ts) +values + ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), + ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000); + +Affected Rows: 2 + insert into demo(host, cpu, memory, ts) values - ('host1', 66.6, 1024, 1655276557000), - ('host2', 88.8, 333.3, 1655276558000), ('host3', 111.1, 444.4, 1722077263000); -Affected Rows: 3 +Affected Rows: 1 insert into demo_2(host, cpu, memory, ts) @@ -70,6 +76,44 @@ select * from with_path order by ts; | host6 | 222.2 | 555.5 | 2024-07-27T10:47:44 | +-------+-------+--------+---------------------+ +CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index); + +Affected Rows: 0 + +Copy with_json FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet'; + +Affected Rows: 3 + +select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts; + ++-------+-------+--------+---------------------------------+---------------------+ +| host | cpu | memory | json_to_string(with_json.jsons) | ts | ++-------+-------+--------+---------------------------------+---------------------+ +| host1 | 66.6 | 1024.0 | {"foo":"bar"} | 2022-06-15T07:02:37 | +| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15T07:02:38 | +| host3 | 111.1 | 444.4 | | 2024-07-27T10:47:43 | ++-------+-------+--------+---------------------------------+---------------------+ + +-- SQLNESS PROTOCOL MYSQL +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + ++-------+------+--------+------------------------+---------------------+ +| host | cpu | memory | jsons | ts | ++-------+------+--------+------------------------+---------------------+ +| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37 | +| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38 | ++-------+------+--------+------------------------+---------------------+ + +-- SQLNESS PROTOCOL POSTGRES +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + ++-------+------+--------+------------------------+----------------------------+ +| host | cpu | memory | jsons | ts | ++-------+------+--------+------------------------+----------------------------+ +| host1 | 66.6 | 1024 | {"foo":"bar"} | 2022-06-15 07:02:37.000000 | +| host2 | 88.8 | 333.3 | {"a":null,"foo":"bar"} | 2022-06-15 07:02:38.000000 | ++-------+------+--------+------------------------+----------------------------+ + CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); Affected Rows: 0 @@ -171,6 +215,10 @@ drop table with_filename; Affected Rows: 0 +drop table with_json; + +Affected Rows: 0 + drop table with_path; Affected Rows: 0 diff --git a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql index 13e15bd91e..8b8fd55ab4 100644 --- a/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql +++ b/tests/cases/standalone/common/copy/copy_from_fs_parquet.sql @@ -1,12 +1,16 @@ -CREATE TABLE demo(host string, cpu double, memory double, ts TIMESTAMP time index); +CREATE TABLE demo(host string, cpu double, memory double, jsons JSON, ts TIMESTAMP time index); CREATE TABLE demo_2(host string, cpu double, memory double, ts TIMESTAMP time index); +insert into + demo(host, cpu, memory, jsons, ts) +values + ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), + ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000); + insert into demo(host, cpu, memory, ts) values - ('host1', 66.6, 1024, 1655276557000), - ('host2', 88.8, 333.3, 1655276558000), ('host3', 111.1, 444.4, 1722077263000); insert into @@ -32,6 +36,18 @@ Copy with_path FROM '${SQLNESS_HOME}/demo/export/parquet_files/'; select * from with_path order by ts; +CREATE TABLE with_json(host string, cpu double, memory double, jsons JSON, ts timestamp time index); + +Copy with_json FROM '${SQLNESS_HOME}/demo/export/parquet_files/demo.parquet'; + +select host, cpu, memory, json_to_string(jsons), ts from with_json order by ts; + +-- SQLNESS PROTOCOL MYSQL +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + +-- SQLNESS PROTOCOL POSTGRES +select host, cpu, memory, jsons, ts from demo where host != 'host3'; + CREATE TABLE with_pattern(host string, cpu double, memory double, ts timestamp time index); Copy with_pattern FROM '${SQLNESS_HOME}/demo/export/parquet_files/' WITH (PATTERN = 'demo.*', start_time='2022-06-15 07:02:39'); @@ -70,6 +86,8 @@ drop table demo_2; drop table with_filename; +drop table with_json; + drop table with_path; drop table with_pattern; diff --git a/tests/cases/standalone/common/copy/copy_to_fs.result b/tests/cases/standalone/common/copy/copy_to_fs.result index 13ac66b79e..eed4281806 100644 --- a/tests/cases/standalone/common/copy/copy_to_fs.result +++ b/tests/cases/standalone/common/copy/copy_to_fs.result @@ -1,11 +1,15 @@ -CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, ts TIMESTAMP TIME INDEX); +CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX); Affected Rows: 0 -insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); +insert into demo(host, cpu, memory, jsons, ts) values ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000); Affected Rows: 2 +insert into demo(host, cpu, memory, ts) values ('host3', 111.1, 444.4, 1722077263000); + +Affected Rows: 1 + COPY demo TO '${SQLNESS_HOME}/export/demo.parquet' WITH (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38'); Affected Rows: 1 @@ -18,15 +22,15 @@ COPY demo TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', start_time= Affected Rows: 1 -COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet'; +COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet'; Affected Rows: 1 -COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv'); +COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv'); Affected Rows: 1 -COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json'); +COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json'); Affected Rows: 1 diff --git a/tests/cases/standalone/common/copy/copy_to_fs.sql b/tests/cases/standalone/common/copy/copy_to_fs.sql index d59ef14dd9..405ea87518 100644 --- a/tests/cases/standalone/common/copy/copy_to_fs.sql +++ b/tests/cases/standalone/common/copy/copy_to_fs.sql @@ -1,6 +1,8 @@ -CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, ts TIMESTAMP TIME INDEX); +CREATE TABLE demo(host string, cpu DOUBLE, memory DOUBLE, jsons JSON, ts TIMESTAMP TIME INDEX); -insert into demo(host, cpu, memory, ts) values ('host1', 66.6, 1024, 1655276557000), ('host2', 88.8, 333.3, 1655276558000); +insert into demo(host, cpu, memory, jsons, ts) values ('host1', 66.6, 1024, '{"foo":"bar"}', 1655276557000), ('host2', 88.8, 333.3, '{"a":null,"foo":"bar"}', 1655276558000); + +insert into demo(host, cpu, memory, ts) values ('host3', 111.1, 444.4, 1722077263000); COPY demo TO '${SQLNESS_HOME}/export/demo.parquet' WITH (start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38'); @@ -8,10 +10,10 @@ COPY demo TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv', start_time='2 COPY demo TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json', start_time='2022-06-15 07:02:37', end_time='2022-06-15 07:02:38'); -COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet'; +COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.parquet'; -COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv'); +COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.csv' WITH (format='csv'); -COPY (select host, cpu, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json'); +COPY (select host, cpu, jsons, ts from demo where host = 'host2') TO '${SQLNESS_HOME}/export/demo.json' WITH (format='json'); drop table demo;