diff --git a/Cargo.lock b/Cargo.lock index 91090f3087..a1e82d45e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7596,6 +7596,7 @@ dependencies = [ "file-engine", "futures", "futures-util", + "jsonb", "lazy_static", "meta-client", "meter-core", diff --git a/src/common/grpc-expr/src/util.rs b/src/common/grpc-expr/src/util.rs index 9cbc4ad8cd..725adf82a1 100644 --- a/src/common/grpc-expr/src/util.rs +++ b/src/common/grpc-expr/src/util.rs @@ -14,10 +14,11 @@ use std::collections::HashSet; +use api::v1::column_data_type_extension::TypeExt; use api::v1::column_def::contains_fulltext; use api::v1::{ AddColumn, AddColumns, Column, ColumnDataType, ColumnDataTypeExtension, ColumnDef, - ColumnOptions, ColumnSchema, CreateTableExpr, SemanticType, + ColumnOptions, ColumnSchema, CreateTableExpr, JsonTypeExtension, SemanticType, }; use datatypes::schema::Schema; use snafu::{ensure, OptionExt, ResultExt}; @@ -25,8 +26,9 @@ use table::metadata::TableId; use table::table_reference::TableReference; use crate::error::{ - DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu, InvalidFulltextColumnTypeSnafu, - MissingTimestampColumnSnafu, Result, UnknownColumnDataTypeSnafu, + self, DuplicatedColumnNameSnafu, DuplicatedTimestampColumnSnafu, + InvalidFulltextColumnTypeSnafu, MissingTimestampColumnSnafu, Result, + UnknownColumnDataTypeSnafu, }; pub struct ColumnExpr<'a> { pub column_name: &'a str, @@ -72,6 +74,28 @@ impl<'a> From<&'a ColumnSchema> for ColumnExpr<'a> { } } +fn infer_column_datatype( + datatype: i32, + datatype_extension: &Option, +) -> Result { + let column_type = + ColumnDataType::try_from(datatype).context(UnknownColumnDataTypeSnafu { datatype })?; + + if matches!(&column_type, ColumnDataType::Binary) { + if let Some(ext) = datatype_extension { + let type_ext = ext + .type_ext + .as_ref() + .context(error::MissingFieldSnafu { field: "type_ext" })?; + if *type_ext == TypeExt::JsonType(JsonTypeExtension::JsonBinary.into()) { + return Ok(ColumnDataType::Json); + } + } + } + + Ok(column_type) +} + pub fn build_create_table_expr( table_id: Option, table_name: &TableReference<'_>, @@ -124,8 +148,7 @@ pub fn build_create_table_expr( _ => {} } - let column_type = - ColumnDataType::try_from(datatype).context(UnknownColumnDataTypeSnafu { datatype })?; + let column_type = infer_column_datatype(datatype, datatype_extension)?; ensure!( !contains_fulltext(options) || column_type == ColumnDataType::String, diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 665fb211d8..ca8ea462a8 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -19,7 +19,7 @@ use api::v1::{DeleteRequests, DropFlowExpr, InsertRequests, RowDeleteRequests, R use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_query::Output; -use common_telemetry::tracing; +use common_telemetry::tracing::{self}; use query::parser::PromQuery; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; use servers::query_handler::grpc::GrpcQueryHandler; diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index dda47abebe..5d3d18f8aa 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -36,6 +36,7 @@ datatypes.workspace = true file-engine.workspace = true futures = "0.3" futures-util.workspace = true +jsonb.workspace = true lazy_static.workspace = true meta-client.workspace = true meter-core.workspace = true diff --git a/src/operator/src/delete.rs b/src/operator/src/delete.rs index 756195c83a..b139cfc29a 100644 --- a/src/operator/src/delete.rs +++ b/src/operator/src/delete.rs @@ -35,6 +35,7 @@ use crate::error::{ MissingTimeIndexColumnSnafu, RequestDeletesSnafu, Result, TableNotFoundSnafu, }; use crate::region_req_factory::RegionRequestFactory; +use crate::req_convert::common::preprocess_row_delete_requests; use crate::req_convert::delete::{ColumnToRow, RowToRegion, TableToRegion}; pub struct Deleter { @@ -72,6 +73,7 @@ impl Deleter { mut requests: RowDeleteRequests, ctx: QueryContextRef, ) -> Result { + preprocess_row_delete_requests(&mut requests.deletes)?; // remove empty requests requests.deletes.retain(|req| { req.rows diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 1931814c8b..15b4e4e15b 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -770,6 +770,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Invalid json text: {}", json))] + InvalidJsonFormat { + #[snafu(implicit)] + location: Location, + json: String, + }, } pub type Result = std::result::Result; @@ -808,7 +815,8 @@ impl ErrorExt for Error { | Error::BuildAdminFunctionArgs { .. } | Error::FunctionArityMismatch { .. } | Error::InvalidPartition { .. } - | Error::PhysicalExpr { .. } => StatusCode::InvalidArguments, + | Error::PhysicalExpr { .. } + | Error::InvalidJsonFormat { .. } => StatusCode::InvalidArguments, Error::TableAlreadyExists { .. } | Error::ViewAlreadyExists { .. } => { StatusCode::TableAlreadyExists diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index e0f0d39236..c2fc69d7db 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -54,6 +54,7 @@ use crate::error::{ }; use crate::expr_factory::CreateExprFactory; use crate::region_req_factory::RegionRequestFactory; +use crate::req_convert::common::preprocess_row_insert_requests; use crate::req_convert::insert::{ColumnToRow, RowToRegion, StatementToRegion, TableToRegion}; use crate::statement::StatementExecutor; @@ -119,10 +120,11 @@ impl Inserter { /// Handles row inserts request and creates a physical table on demand. pub async fn handle_row_inserts( &self, - requests: RowInsertRequests, + mut requests: RowInsertRequests, ctx: QueryContextRef, statement_executor: &StatementExecutor, ) -> Result { + preprocess_row_insert_requests(&mut requests.inserts)?; self.handle_row_inserts_with_create_type( requests, ctx, diff --git a/src/operator/src/req_convert/common.rs b/src/operator/src/req_convert/common.rs index 355a908d2a..d619a602e8 100644 --- a/src/operator/src/req_convert/common.rs +++ b/src/operator/src/req_convert/common.rs @@ -17,9 +17,13 @@ pub(crate) mod partitioner; use std::collections::HashMap; use api::helper::ColumnDataTypeWrapper; +use api::v1::column_data_type_extension::TypeExt; use api::v1::column_def::options_from_column_schema; use api::v1::value::ValueData; -use api::v1::{Column, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; +use api::v1::{ + Column, ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row, + RowDeleteRequest, RowInsertRequest, Rows, SemanticType, Value, +}; use common_base::BitVec; use datatypes::vectors::VectorRef; use snafu::prelude::*; @@ -27,10 +31,77 @@ use snafu::ResultExt; use table::metadata::TableInfo; use crate::error::{ - ColumnDataTypeSnafu, ColumnNotFoundSnafu, InvalidInsertRequestSnafu, - MissingTimeIndexColumnSnafu, Result, + ColumnDataTypeSnafu, ColumnNotFoundSnafu, InvalidInsertRequestSnafu, InvalidJsonFormatSnafu, + MissingTimeIndexColumnSnafu, Result, UnexpectedSnafu, }; +/// Encodes a string value as JSONB binary data if the value is of `StringValue` type. +fn encode_string_to_jsonb_binary(value_data: ValueData) -> Result { + if let ValueData::StringValue(json) = &value_data { + let binary = jsonb::parse_value(json.as_bytes()) + .map_err(|_| InvalidJsonFormatSnafu { json }.build()) + .map(|jsonb| jsonb.to_vec())?; + Ok(ValueData::BinaryValue(binary)) + } else { + UnexpectedSnafu { + violated: "Expected to value data to be a string.", + } + .fail() + } +} + +/// Prepares row insertion requests by converting any JSON values to binary JSONB format. +pub fn preprocess_row_insert_requests(requests: &mut Vec) -> Result<()> { + for request in requests { + prepare_rows(&mut request.rows)?; + } + + Ok(()) +} + +/// Prepares row deletion requests by converting any JSON values to binary JSONB format. +pub fn preprocess_row_delete_requests(requests: &mut Vec) -> Result<()> { + for request in requests { + prepare_rows(&mut request.rows)?; + } + + Ok(()) +} + +fn prepare_rows(rows: &mut Option) -> Result<()> { + if let Some(rows) = rows { + let indexes = rows + .schema + .iter() + .enumerate() + .filter_map(|(idx, schema)| { + if schema.datatype() == ColumnDataType::Json { + Some(idx) + } else { + None + } + }) + .collect::>(); + for idx in &indexes { + let column = &mut rows.schema[*idx]; + column.datatype_extension = Some(ColumnDataTypeExtension { + type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())), + }); + column.datatype = ColumnDataType::Binary.into(); + } + + for idx in &indexes { + for row in &mut rows.rows { + if let Some(value_data) = row.values[*idx].value_data.take() { + row.values[*idx].value_data = Some(encode_string_to_jsonb_binary(value_data)?); + } + } + } + } + + Ok(()) +} + pub fn columns_to_rows(columns: Vec, row_count: u32) -> Result { let row_count = row_count as usize; let column_count = columns.len(); diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index b785f4ad37..3acfa0d863 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -203,9 +203,10 @@ mod test { CREATE TABLE {table_name} ( a INT, b STRING, + c JSON, ts TIMESTAMP, TIME INDEX (ts), - PRIMARY KEY (a, b) + PRIMARY KEY (a, b, c) ) PARTITION ON COLUMNS(a) ( a < 10, a >= 10 AND a < 20, @@ -291,13 +292,14 @@ CREATE TABLE {table_name} ( #[tokio::test(flavor = "multi_thread")] async fn test_standalone_insert_and_query() { + common_telemetry::init_default_ut_logging(); let standalone = GreptimeDbStandaloneBuilder::new("test_standalone_insert_and_query") .build() .await; let instance = &standalone.instance; let table_name = "my_table"; - let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b))"); + let sql = format!("CREATE TABLE {table_name} (a INT, b STRING, c JSON, ts TIMESTAMP, TIME INDEX (ts), PRIMARY KEY (a, b, c))"); create_table(instance, sql).await; test_insert_delete_and_query_on_existing_table(instance, table_name).await; @@ -332,6 +334,25 @@ CREATE TABLE {table_name} ( 1672557986000, 1672557987000, ]; + let json_strings = vec![ + r#"{ "id": 1, "name": "Alice", "age": 30, "active": true }"#.to_string(), + r#"{ "id": 2, "name": "Bob", "balance": 1234.56, "active": false }"#.to_string(), + r#"{ "id": 3, "tags": ["rust", "testing", "json"], "age": 28 }"#.to_string(), + r#"{ "id": 4, "metadata": { "created_at": "2024-10-30T12:00:00Z", "status": "inactive" } }"#.to_string(), + r#"{ "id": 5, "name": null, "phone": "+1234567890" }"#.to_string(), + r#"{ "id": 6, "height": 5.9, "weight": 72.5, "active": true }"#.to_string(), + r#"{ "id": 7, "languages": ["English", "Spanish"], "age": 29 }"#.to_string(), + r#"{ "id": 8, "contact": { "email": "hank@example.com", "phone": "+0987654321" } }"#.to_string(), + r#"{ "id": 9, "preferences": { "notifications": true, "theme": "dark" } }"#.to_string(), + r#"{ "id": 10, "scores": [88, 92, 76], "active": false }"#.to_string(), + r#"{ "id": 11, "birthday": "1996-07-20", "location": { "city": "New York", "zip": "10001" } }"#.to_string(), + r#"{ "id": 12, "subscription": { "type": "premium", "expires": "2025-01-01" } }"#.to_string(), + r#"{ "id": 13, "settings": { "volume": 0.8, "brightness": 0.6 }, "active": true }"#.to_string(), + r#"{ "id": 14, "notes": ["first note", "second note"], "priority": 1 }"#.to_string(), + r#"{ "id": 15, "transactions": [{ "amount": 500, "date": "2024-01-01" }, { "amount": -200, "date": "2024-02-01" }] }"#.to_string(), + r#"{ "id": 16, "transactions": [{ "amount": 500, "date": "2024-01-01" }] }"#.to_string(), + ]; + let insert = InsertRequest { table_name: table_name.to_string(), columns: vec![ @@ -359,6 +380,16 @@ CREATE TABLE {table_name} ( datatype: ColumnDataType::String as i32, ..Default::default() }, + Column { + column_name: "c".to_string(), + values: Some(Values { + string_values: json_strings, + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::Json as i32, + ..Default::default() + }, Column { column_name: "ts".to_string(), values: Some(Values { @@ -383,7 +414,7 @@ CREATE TABLE {table_name} ( let request = Request::Query(QueryRequest { query: Some(Query::Sql(format!( - "SELECT ts, a, b FROM {table_name} ORDER BY ts" + "SELECT ts, a, b, json_to_string(c) as c FROM {table_name} ORDER BY ts" ))), }); let output = query(instance, request.clone()).await; @@ -391,30 +422,29 @@ CREATE TABLE {table_name} ( unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+----+-------------------+ -| ts | a | b | -+---------------------+----+-------------------+ -| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | -| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 | -| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | -| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | -| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | -| 2023-01-01T07:26:17 | | ts: 1672557977000 | -| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | -| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 | -| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | -| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | -| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 | -| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | -| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | -| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | -| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 | -| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | -+---------------------+----+-------------------+"; + let expected = r#"+---------------------+----+-------------------+---------------------------------------------------------------------------------------------------+ +| ts | a | b | c | ++---------------------+----+-------------------+---------------------------------------------------------------------------------------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | {"active":true,"age":30,"id":1,"name":"Alice"} | +| 2023-01-01T07:26:13 | 2 | ts: 1672557973000 | {"active":false,"balance":1234.56,"id":2,"name":"Bob"} | +| 2023-01-01T07:26:14 | 3 | ts: 1672557974000 | {"age":28,"id":3,"tags":["rust","testing","json"]} | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | {"id":4,"metadata":{"created_at":"2024-10-30T12:00:00Z","status":"inactive"}} | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | {"id":5,"name":null,"phone":"+1234567890"} | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | {"active":true,"height":5.9,"id":6,"weight":72.5} | +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | {"age":29,"id":7,"languages":["English","Spanish"]} | +| 2023-01-01T07:26:19 | 12 | ts: 1672557979000 | {"contact":{"email":"hank@example.com","phone":"+0987654321"},"id":8} | +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | {"id":9,"preferences":{"notifications":true,"theme":"dark"}} | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | {"active":false,"id":10,"scores":[88,92,76]} | +| 2023-01-01T07:26:22 | 22 | ts: 1672557982000 | {"birthday":"1996-07-20","id":11,"location":{"city":"New York","zip":"10001"}} | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | {"id":12,"subscription":{"expires":"2025-01-01","type":"premium"}} | +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | {"active":true,"id":13,"settings":{"brightness":0.6,"volume":0.8}} | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | {"id":14,"notes":["first note","second note"],"priority":1} | +| 2023-01-01T07:26:26 | 52 | ts: 1672557986000 | {"id":15,"transactions":[{"amount":500,"date":"2024-01-01"},{"amount":-200,"date":"2024-02-01"}]} | +| 2023-01-01T07:26:27 | 53 | ts: 1672557987000 | {"id":16,"transactions":[{"amount":500,"date":"2024-01-01"}]} | ++---------------------+----+-------------------+---------------------------------------------------------------------------------------------------+"#; assert_eq!(recordbatches.pretty_print().unwrap(), expected); - let new_grpc_delete_request = |a, b, ts, row_count| DeleteRequest { + let new_grpc_delete_request = |a, b, c, ts, row_count| DeleteRequest { table_name: table_name.to_string(), key_columns: vec![ Column { @@ -437,6 +467,16 @@ CREATE TABLE {table_name} ( datatype: ColumnDataType::String as i32, ..Default::default() }, + Column { + column_name: "c".to_string(), + values: Some(Values { + string_values: c, + ..Default::default() + }), + semantic_type: SemanticType::Tag as i32, + datatype: ColumnDataType::Json as i32, + ..Default::default() + }, Column { column_name: "ts".to_string(), semantic_type: SemanticType::Timestamp as i32, @@ -458,6 +498,12 @@ CREATE TABLE {table_name} ( "ts: 1672557982000".to_string(), "ts: 1672557986000".to_string(), ], + vec![ + r#"{ "id": 2, "name": "Bob", "balance": 1234.56, "active": false }"#.to_string(), + r#"{ "id": 8, "contact": { "email": "hank@example.com", "phone": "+0987654321" } }"#.to_string(), + r#"{ "id": 11, "birthday": "1996-07-20", "location": { "city": "New York", "zip": "10001" } }"#.to_string(), + r#"{ "id": 15, "transactions": [{ "amount": 500, "date": "2024-01-01" }, { "amount": -200, "date": "2024-02-01" }] }"#.to_string(), + ], vec![1672557973000, 1672557979000, 1672557982000, 1672557986000], 4, ); @@ -467,6 +513,11 @@ CREATE TABLE {table_name} ( "ts: 1672557974000".to_string(), "ts: 1672557987000".to_string(), ], + vec![ + r#"{ "id": 3, "tags": ["rust", "testing", "json"], "age": 28 }"#.to_string(), + r#"{ "id": 16, "transactions": [{ "amount": 500, "date": "2024-01-01" }] }"# + .to_string(), + ], vec![1672557974000, 1672557987000], 2, ); @@ -484,22 +535,21 @@ CREATE TABLE {table_name} ( unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+----+-------------------+ -| ts | a | b | -+---------------------+----+-------------------+ -| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | -| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | -| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | -| 2023-01-01T07:26:17 | | ts: 1672557977000 | -| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | -| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | -| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | -| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | -| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | -| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | -+---------------------+----+-------------------+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); + let expected = r#"+---------------------+----+-------------------+-------------------------------------------------------------------------------+ +| ts | a | b | c | ++---------------------+----+-------------------+-------------------------------------------------------------------------------+ +| 2023-01-01T07:26:12 | 1 | ts: 1672557972000 | {"active":true,"age":30,"id":1,"name":"Alice"} | +| 2023-01-01T07:26:15 | 4 | ts: 1672557975000 | {"id":4,"metadata":{"created_at":"2024-10-30T12:00:00Z","status":"inactive"}} | +| 2023-01-01T07:26:16 | 5 | ts: 1672557976000 | {"id":5,"name":null,"phone":"+1234567890"} | +| 2023-01-01T07:26:17 | | ts: 1672557977000 | {"active":true,"height":5.9,"id":6,"weight":72.5} | +| 2023-01-01T07:26:18 | 11 | ts: 1672557978000 | {"age":29,"id":7,"languages":["English","Spanish"]} | +| 2023-01-01T07:26:20 | 20 | ts: 1672557980000 | {"id":9,"preferences":{"notifications":true,"theme":"dark"}} | +| 2023-01-01T07:26:21 | 21 | ts: 1672557981000 | {"active":false,"id":10,"scores":[88,92,76]} | +| 2023-01-01T07:26:23 | 23 | ts: 1672557983000 | {"id":12,"subscription":{"expires":"2025-01-01","type":"premium"}} | +| 2023-01-01T07:26:24 | 50 | ts: 1672557984000 | {"active":true,"id":13,"settings":{"brightness":0.6,"volume":0.8}} | +| 2023-01-01T07:26:25 | 51 | ts: 1672557985000 | {"id":14,"notes":["first note","second note"],"priority":1} | ++---------------------+----+-------------------+-------------------------------------------------------------------------------+"#; + similar_asserts::assert_eq!(recordbatches.pretty_print().unwrap(), expected); } async fn verify_data_distribution( @@ -586,6 +636,22 @@ CREATE TABLE {table_name} ( datatype: ColumnDataType::Int32 as i32, ..Default::default() }, + Column { + column_name: "c".to_string(), + values: Some(Values { + string_values: vec![ + r#"{ "id": 1, "name": "Alice", "age": 30, "active": true }"# + .to_string(), + r#"{ "id": 2, "name": "Bob", "balance": 1234.56, "active": false }"# + .to_string(), + ], + ..Default::default() + }), + null_mask: vec![2], + semantic_type: SemanticType::Field as i32, + datatype: ColumnDataType::Json as i32, + ..Default::default() + }, Column { column_name: "ts".to_string(), values: Some(Values { @@ -652,7 +718,8 @@ CREATE TABLE {table_name} ( let request = Request::Query(QueryRequest { query: Some(Query::Sql( - "SELECT ts, a, b FROM auto_created_table order by ts".to_string(), + "SELECT ts, a, b, json_to_string(c) as c FROM auto_created_table order by ts" + .to_string(), )), }); let output = query(instance, request.clone()).await; @@ -660,18 +727,17 @@ CREATE TABLE {table_name} ( unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2023-01-01T07:26:15 | 4 | | -| 2023-01-01T07:26:16 | | | -| 2023-01-01T07:26:17 | 6 | | -| 2023-01-01T07:26:18 | | x | -| 2023-01-01T07:26:19 | | | -| 2023-01-01T07:26:20 | | z | -+---------------------+---+---+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); + let expected = r#"+---------------------+---+---+--------------------------------------------------------+ +| ts | a | b | c | ++---------------------+---+---+--------------------------------------------------------+ +| 2023-01-01T07:26:15 | 4 | | {"active":true,"age":30,"id":1,"name":"Alice"} | +| 2023-01-01T07:26:16 | | | | +| 2023-01-01T07:26:17 | 6 | | {"active":false,"balance":1234.56,"id":2,"name":"Bob"} | +| 2023-01-01T07:26:18 | | x | | +| 2023-01-01T07:26:19 | | | | +| 2023-01-01T07:26:20 | | z | | ++---------------------+---+---+--------------------------------------------------------+"#; + similar_asserts::assert_eq!(recordbatches.pretty_print().unwrap(), expected); let delete = DeleteRequest { table_name: "auto_created_table".to_string(), @@ -702,16 +768,15 @@ CREATE TABLE {table_name} ( unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); - let expected = "\ -+---------------------+---+---+ -| ts | a | b | -+---------------------+---+---+ -| 2023-01-01T07:26:16 | | | -| 2023-01-01T07:26:17 | 6 | | -| 2023-01-01T07:26:18 | | x | -| 2023-01-01T07:26:20 | | z | -+---------------------+---+---+"; - assert_eq!(recordbatches.pretty_print().unwrap(), expected); + let expected = r#"+---------------------+---+---+--------------------------------------------------------+ +| ts | a | b | c | ++---------------------+---+---+--------------------------------------------------------+ +| 2023-01-01T07:26:16 | | | | +| 2023-01-01T07:26:17 | 6 | | {"active":false,"balance":1234.56,"id":2,"name":"Bob"} | +| 2023-01-01T07:26:18 | | x | | +| 2023-01-01T07:26:20 | | z | | ++---------------------+---+---+--------------------------------------------------------+"#; + similar_asserts::assert_eq!(recordbatches.pretty_print().unwrap(), expected); } #[tokio::test(flavor = "multi_thread")]