diff --git a/Cargo.lock b/Cargo.lock index f40a67dce1..6c4e421fae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2662,6 +2662,7 @@ dependencies = [ "datafusion-common", "datatypes", "futures", + "jsonb", "pin-project", "regex", "serde", @@ -12849,6 +12850,7 @@ dependencies = [ "http 1.1.0", "hyper-util", "itertools 0.14.0", + "jsonb", "log-query", "loki-proto", "meta-client", diff --git a/src/common/recordbatch/Cargo.toml b/src/common/recordbatch/Cargo.toml index bb8ba13907..4b0629db16 100644 --- a/src/common/recordbatch/Cargo.toml +++ b/src/common/recordbatch/Cargo.toml @@ -16,6 +16,7 @@ datafusion.workspace = true datafusion-common.workspace = true datatypes.workspace = true futures.workspace = true +jsonb.workspace = true pin-project.workspace = true regex.workspace = true serde.workspace = true diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index ec3754a242..af35d2c24a 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -16,6 +16,7 @@ use std::fmt::{self, Display}; use std::future::Future; use std::marker::PhantomData; use std::pin::Pin; +use std::str::FromStr; use std::sync::Arc; use std::task::{Context, Poll}; @@ -34,8 +35,10 @@ use datafusion::physical_plan::{ use datafusion_common::arrow::error::ArrowError; use datafusion_common::{DataFusionError, ToDFSchema}; use datatypes::arrow::array::Array; -use datatypes::schema::{Schema, SchemaRef}; +use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::schema::{ColumnExtType, Schema, SchemaRef}; use futures::ready; +use jsonb; use pin_project::pin_project; use snafu::ResultExt; @@ -141,12 +144,9 @@ where let mut columns = Vec::with_capacity(projected_schema.fields.len()); for (idx,field) in projected_schema.fields.iter().enumerate() { let column = projected_column.column(idx); - if column.data_type() != field.data_type() { - let output = cast(&column, field.data_type())?; - columns.push(output) - } else { - columns.push(column.clone()) - } + let extype = field.metadata().get("greptime:type").and_then(|s| ColumnExtType::from_str(s).ok()); + let output = custom_cast(&column, field.data_type(), extype)?; + columns.push(output) } let record_batch = DfRecordBatch::try_new(projected_schema, columns)?; let record_batch = if let Some(predicate) = predicate { @@ -542,11 +542,137 @@ impl Stream for AsyncRecordBatchStreamAdapter { } } +/// Custom cast function that handles Map -> Binary (JSON) conversion +fn custom_cast( + array: &dyn Array, + target_type: &ArrowDataType, + extype: Option, +) -> std::result::Result, ArrowError> { + if let ArrowDataType::Map(_, _) = array.data_type() { + if let ArrowDataType::Binary = target_type { + return convert_map_to_json_binary(array, extype); + } + } + + cast(array, target_type) +} + +/// Convert a Map array to a Binary array containing JSON data +fn convert_map_to_json_binary( + array: &dyn Array, + extype: Option, +) -> std::result::Result, ArrowError> { + use datatypes::arrow::array::{BinaryArray, MapArray}; + use serde_json::Value; + + let map_array = array + .as_any() + .downcast_ref::() + .ok_or_else(|| ArrowError::CastError("Failed to downcast to MapArray".to_string()))?; + + let mut json_values = Vec::with_capacity(map_array.len()); + + for i in 0..map_array.len() { + if map_array.is_null(i) { + json_values.push(None); + } else { + // Extract the map entry at index i + let map_entry = map_array.value(i); + let key_value_array = map_entry + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ArrowError::CastError("Failed to downcast to StructArray".to_string()) + })?; + + // Convert to JSON object + let mut json_obj = serde_json::Map::with_capacity(key_value_array.len()); + + for j in 0..key_value_array.len() { + if key_value_array.is_null(j) { + continue; + } + let key_field = key_value_array.column(0); + let value_field = key_value_array.column(1); + + if key_field.is_null(j) { + continue; + } + + let key = key_field + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ArrowError::CastError("Failed to downcast key to StringArray".to_string()) + })? + .value(j); + + let value = if value_field.is_null(j) { + Value::Null + } else { + let value_str = value_field + .as_any() + .downcast_ref::() + .ok_or_else(|| { + ArrowError::CastError( + "Failed to downcast value to StringArray".to_string(), + ) + })? + .value(j); + Value::String(value_str.to_string()) + }; + + json_obj.insert(key.to_string(), value); + } + + let json_value = Value::Object(json_obj); + let json_bytes = match extype { + Some(ColumnExtType::Json) => { + let json_string = match serde_json::to_string(&json_value) { + Ok(s) => s, + Err(e) => { + return Err(ArrowError::CastError(format!( + "Failed to serialize JSON: {}", + e + ))) + } + }; + match jsonb::parse_value(json_string.as_bytes()) { + Ok(jsonb_value) => jsonb_value.to_vec(), + Err(e) => { + return Err(ArrowError::CastError(format!( + "Failed to serialize JSONB: {}", + e + ))) + } + } + } + _ => match serde_json::to_vec(&json_value) { + Ok(b) => b, + Err(e) => { + return Err(ArrowError::CastError(format!( + "Failed to serialize JSON: {}", + e + ))) + } + }, + }; + json_values.push(Some(json_bytes)); + } + } + + let binary_array = BinaryArray::from_iter(json_values); + Ok(Arc::new(binary_array)) +} + #[cfg(test)] mod test { use common_error::ext::BoxedError; use common_error::mock::MockError; use common_error::status_code::StatusCode; + use datatypes::arrow::array::{ArrayRef, MapArray, StringArray, StructArray}; + use datatypes::arrow::buffer::OffsetBuffer; + use datatypes::arrow::datatypes::Field; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::vectors::Int32Vector; @@ -650,4 +776,77 @@ mod test { "unexpected err {err}" ); } + + #[test] + fn test_convert_map_to_json_binary() { + let keys = StringArray::from(vec![Some("a"), Some("b"), Some("c"), Some("x")]); + let values = StringArray::from(vec![Some("1"), None, Some("3"), Some("42")]); + let key_field = Arc::new(Field::new("key", ArrowDataType::Utf8, false)); + let value_field = Arc::new(Field::new("value", ArrowDataType::Utf8, true)); + let struct_type = ArrowDataType::Struct(vec![key_field, value_field].into()); + + let entries_field = Arc::new(Field::new("entries", struct_type, false)); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("key", ArrowDataType::Utf8, false)), + Arc::new(keys) as ArrayRef, + ), + ( + Arc::new(Field::new("value", ArrowDataType::Utf8, true)), + Arc::new(values) as ArrayRef, + ), + ]); + + let offsets = OffsetBuffer::from_lengths([3, 0, 1]); + let nulls = datatypes::arrow::buffer::NullBuffer::from(vec![true, false, true]); + + let map_array = MapArray::new( + entries_field, + offsets, + struct_array, + Some(nulls), // nulls + false, + ); + + let result = convert_map_to_json_binary(&map_array, None).unwrap(); + let binary_array = result + .as_any() + .downcast_ref::() + .unwrap(); + + let expected_jsons = [ + Some(r#"{"a":"1","b":null,"c":"3"}"#), + None, + Some(r#"{"x":"42"}"#), + ]; + + for (i, _) in expected_jsons.iter().enumerate() { + if let Some(expected) = &expected_jsons[i] { + assert!(!binary_array.is_null(i)); + let actual_bytes = binary_array.value(i); + let actual_str = std::str::from_utf8(actual_bytes).unwrap(); + assert_eq!(actual_str, *expected); + } else { + assert!(binary_array.is_null(i)); + } + } + + let result_json = + convert_map_to_json_binary(&map_array, Some(ColumnExtType::Json)).unwrap(); + let binary_array_json = result_json + .as_any() + .downcast_ref::() + .unwrap(); + + for (i, _) in expected_jsons.iter().enumerate() { + if expected_jsons[i].is_some() { + assert!(!binary_array_json.is_null(i)); + let actual_bytes = binary_array_json.value(i); + assert_ne!(actual_bytes, expected_jsons[i].unwrap().as_bytes()); + } else { + assert!(binary_array_json.is_null(i)); + } + } + } } diff --git a/src/datatypes/src/schema.rs b/src/datatypes/src/schema.rs index 1f27eb01c2..d792e93c8b 100644 --- a/src/datatypes/src/schema.rs +++ b/src/datatypes/src/schema.rs @@ -21,14 +21,13 @@ use std::fmt; use std::sync::Arc; use arrow::datatypes::{Field, Schema as ArrowSchema}; -use column_schema::ColumnExtType; use datafusion_common::DFSchemaRef; use snafu::{ensure, ResultExt}; use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result}; use crate::prelude::ConcreteDataType; pub use crate::schema::column_schema::{ - ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, Metadata, + ColumnExtType, ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, Metadata, SkippingIndexOptions, SkippingIndexType, COLUMN_FULLTEXT_CHANGE_OPT_KEY_ENABLE, COLUMN_FULLTEXT_OPT_KEY_ANALYZER, COLUMN_FULLTEXT_OPT_KEY_BACKEND, COLUMN_FULLTEXT_OPT_KEY_CASE_SENSITIVE, COLUMN_FULLTEXT_OPT_KEY_FALSE_POSITIVE_RATE, diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index a85dca024f..6ee85ada67 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -41,7 +41,7 @@ use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_common::{Constraints, Statistics}; use datafusion_expr::Expr; use datatypes::arrow::compute::can_cast_types; -use datatypes::arrow::datatypes::{Schema, SchemaRef}; +use datatypes::arrow::datatypes::{DataType as ArrowDataType, Schema, SchemaRef}; use datatypes::vectors::Helper; use futures_util::StreamExt; use object_store::{Entry, EntryMode, ObjectStore}; @@ -532,6 +532,19 @@ async fn batch_insert( Ok(result) } +/// Custom type compatibility check for GreptimeDB that handles Map -> Binary (JSON) conversion +fn can_cast_types_for_greptime(from: &ArrowDataType, to: &ArrowDataType) -> bool { + // Handle Map -> Binary conversion for JSON types + if let ArrowDataType::Map(_, _) = from { + if let ArrowDataType::Binary = to { + return true; + } + } + + // For all other cases, use Arrow's built-in can_cast_types + can_cast_types(from, to) +} + fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> { let not_match = from .fields @@ -539,7 +552,7 @@ fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> { .zip(to.fields.iter()) .map(|(l, r)| (l.data_type(), r.data_type())) .enumerate() - .find(|(_, (l, r))| !can_cast_types(l, r)); + .find(|(_, (l, r))| !can_cast_types_for_greptime(l, r)); if let Some((index, _)) = not_match { error::InvalidSchemaSnafu { @@ -719,6 +732,30 @@ mod tests { ); } + #[test] + fn test_map_to_binary_json_compatibility() { + // Test Map -> Binary conversion for JSON types + let map_type = DataType::Map( + Arc::new(Field::new( + "key_value", + DataType::Struct( + vec![ + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Utf8, false), + ] + .into(), + ), + false, + )), + false, + ); + + test_schema_matches((map_type, false), (DataType::Binary, true), true); + + test_schema_matches((DataType::Int8, true), (DataType::Int16, true), true); + test_schema_matches((DataType::Utf8, true), (DataType::Binary, true), true); + } + fn make_test_schema(v: &[Field]) -> Arc { Arc::new(Schema::new(v.to_vec())) } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index f2f538f528..a9cabe1eb2 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -90,6 +90,7 @@ datafusion-expr.workspace = true hex.workspace = true http.workspace = true itertools.workspace = true +jsonb.workspace = true opentelemetry-proto.workspace = true partition.workspace = true paste.workspace = true diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index 8bf4675939..189ff379fe 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -2199,3 +2199,117 @@ WITH( } } } + +#[apply(both_instances_cases)] +async fn test_copy_parquet_map_to_json(instance: Arc) { + let instance = instance.frontend(); + + let output = execute_sql( + &instance, + r#"CREATE TABLE map_json_test ( + "id" INT, + map_data JSON, + ts TIMESTAMP TIME INDEX + );"#, + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(0))); + + let parquet_path = find_testing_resource("/tests/data/parquet/map_to_json.parquet"); + let output = execute_sql( + &instance, + &format!( + "COPY map_json_test FROM '{}' WITH (FORMAT='parquet');", + parquet_path + ), + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(5))); + + let output = execute_sql( + &instance, + "SELECT \"id\", json_to_string(map_data), map_data FROM map_json_test ORDER BY \"id\";", + ) + .await + .data; + + let expected_jsons = [ + r#"{"a":"1","b":"2","c":"hello"}"#, + r#"{"x":"42","y":"test"}"#, + r#"{}"#, + r#"{"single":"value"}"#, + r#"{"complex":"structure","nested":"data"}"#, + ]; + + let binary: Vec = expected_jsons + .iter() + .map(|json_str| { + let jsonb_value = jsonb::parse_value(json_str.as_bytes()).unwrap(); + hex::encode(jsonb_value.to_vec()) + }) + .collect(); + + let expected = format!( + r#"+----+-----------------------------------------+----------------------------------------------------------------------------------------------+ +| id | json_to_string(map_json_test.map_data) | map_data | ++----+-----------------------------------------+----------------------------------------------------------------------------------------------+ +| 1 | {{"a":"1","b":"2","c":"hello"}} | {:<92} | +| 2 | {{"x":"42","y":"test"}} | {:<92} | +| 3 | {{}} | {:<92} | +| 4 | {{"single":"value"}} | {:<92} | +| 5 | {{"complex":"structure","nested":"data"}} | {:<92} | ++----+-----------------------------------------+----------------------------------------------------------------------------------------------+"#, + binary[0], binary[1], binary[2], binary[3], binary[4], + ); + + check_output_stream(output, &expected).await; +} + +#[apply(both_instances_cases)] +async fn test_copy_parquet_map_to_binary(instance: Arc) { + let instance = instance.frontend(); + + let output = execute_sql( + &instance, + r#"CREATE TABLE map_bin_test ( + "id" INT, + map_data BINARY, + ts TIMESTAMP TIME INDEX + );"#, + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(0))); + + let parquet_path = find_testing_resource("/tests/data/parquet/map_to_json.parquet"); + let output = execute_sql( + &instance, + &format!( + "COPY map_bin_test FROM '{}' WITH (FORMAT='parquet');", + parquet_path + ), + ) + .await + .data; + assert!(matches!(output, OutputData::AffectedRows(5))); + + let output = execute_sql( + &instance, + "SELECT \"id\", CAST(map_data AS STRING) FROM map_bin_test ORDER BY \"id\";", + ) + .await + .data; + + let expected = r#"+----+-----------------------------------------+ +| id | map_bin_test.map_data | ++----+-----------------------------------------+ +| 1 | {"a":"1","b":"2","c":"hello"} | +| 2 | {"x":"42","y":"test"} | +| 3 | {} | +| 4 | {"single":"value"} | +| 5 | {"complex":"structure","nested":"data"} | ++----+-----------------------------------------+"#; + check_output_stream(output, expected).await; +} diff --git a/tests/data/parquet/README.md b/tests/data/parquet/README.md index c1abae4e6c..5c43dc1bb1 100644 --- a/tests/data/parquet/README.md +++ b/tests/data/parquet/README.md @@ -26,4 +26,32 @@ Data: | 4 | 4000000 | | false | | | | 4 | 0.000004 | | false | | | +-------+----------+----------+--------+------------+---------------------+ +``` + +### map_to_json.parquet + +Schema: + +``` ++----------+-------------------------+-------------+ +| column_name | data_type | is_nullable | ++----------+-------------------------+-------------+ +| id | UInt32 | NO | +| map_data | Map | YES | +| ts | String | YES | ++----------+-------------------------+-------------+ +``` + +Data: + +``` ++----+----------------------------------------+---------------------+ +| id | map_data | ts | ++----+----------------------------------------+---------------------+ +| 1 | {"a":"1","b":"2","c":"hello"} | 2023-01-01T10:00:00 | +| 2 | {"x":"42","y":"test"} | 2023-01-01T11:00:00 | +| 3 | {} | 2023-01-01T12:00:00 | +| 4 | {"single":"value"} | 2023-01-01T13:00:00 | +| 5 | {"nested":"data","complex":"structure"} | 2023-01-01T14:00:00 | ++----+----------------------------------------+---------------------+ ``` \ No newline at end of file diff --git a/tests/data/parquet/map_to_json.parquet b/tests/data/parquet/map_to_json.parquet new file mode 100644 index 0000000000..a6fef11aa5 Binary files /dev/null and b/tests/data/parquet/map_to_json.parquet differ