diff --git a/rust/lancedb/src/table/add_data.rs b/rust/lancedb/src/table/add_data.rs index be8ec28ad..6c92e1b43 100644 --- a/rust/lancedb/src/table/add_data.rs +++ b/rust/lancedb/src/table/add_data.rs @@ -982,4 +982,105 @@ mod tests { table2.add(struct_batch).execute().await.unwrap(); assert_eq!(table2.count_rows(None).await.unwrap(), 2); } + + /// Regression test: appending `arrow.json` (PyArrow `pa.json_()`) data into a table + /// whose schema was created with `pa.json_()` (internally stored as `lance.json`, backed + /// by `LargeBinary`) must succeed without a schema-mismatch error. + /// + /// Previously `build_field_exprs` would attempt a `Utf8 → LargeBinary` DataFusion cast, + /// which produced a field whose Arrow extension metadata still read `arrow.json` instead + /// of `lance.json`. Lance-core then rejected the append with + /// `"json vs large_binary" schema mismatch`. + /// + /// PyArrow's `pa.json_()` may be backed by either `Utf8` or `LargeUtf8` depending on the + /// constructor used, so the test is parameterized over the input backing type. + #[rstest::rstest] + #[case::utf8(DataType::Utf8)] + #[case::large_utf8(DataType::LargeUtf8)] + #[tokio::test] + async fn test_add_arrow_json_into_lance_json_table(#[case] input_type: DataType) { + use arrow_array::{Array, cast::AsArray}; + use lance_arrow::ARROW_EXT_NAME_KEY; + use lance_arrow::json::{ARROW_JSON_EXT_NAME, JSON_EXT_NAME}; + + // Build a table whose "data" column is lance.json (LargeBinary + + // ARROW:extension:name = "lance.json"). + let lance_json_field = lance_arrow::json::json_field("data", true); + let table_schema = Arc::new(Schema::new(vec![lance_json_field])); + + let db = connect("memory://").execute().await.unwrap(); + let table = db + .create_empty_table("json_test", table_schema) + .execute() + .await + .unwrap(); + + // Sanity-check the stored schema. + let stored_field = table.schema().await.unwrap(); + let data_field = stored_field.field_with_name("data").unwrap(); + assert_eq!(data_field.data_type(), &DataType::LargeBinary); + assert_eq!( + data_field + .metadata() + .get(ARROW_EXT_NAME_KEY) + .map(|s| s.as_str()), + Some(JSON_EXT_NAME), + ); + + // Build an arrow.json input field (Utf8/LargeUtf8 + arrow.json extension). + // This is what PyArrow produces for pa.json_() arrays. + let arrow_json_metadata = std::collections::HashMap::from([( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + )]); + let arrow_json_field = + Field::new("data", input_type.clone(), true).with_metadata(arrow_json_metadata); + let arrow_json_schema = Arc::new(Schema::new(vec![arrow_json_field])); + + let rows: Vec> = vec![None, Some(r#"{"a": 1}"#), Some(r#"{"b": 2}"#)]; + let string_array: Arc = match input_type { + DataType::Utf8 => Arc::new(arrow_array::StringArray::from(rows.clone())), + DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(rows.clone())), + other => panic!("unsupported arrow.json backing type for this test: {other:?}"), + }; + let batch = RecordBatch::try_new(arrow_json_schema, vec![string_array]).unwrap(); + + // This must not fail with a schema-mismatch error. + table.add(batch).execute().await.unwrap(); + + assert_eq!(table.count_rows(None).await.unwrap(), rows.len()); + + // A lance.json column is read back as Utf8 carrying arrow.json extension metadata. + let results: Vec = table + .query() + .select(Select::columns(&["data"])) + .execute() + .await + .unwrap() + .try_collect() + .await + .unwrap(); + + assert_eq!(results.len(), 1); + let batch = &results[0]; + assert_eq!(batch.num_rows(), rows.len()); + + let json_col = batch.column(0); + assert_eq!(json_col.data_type(), &DataType::Utf8); + let json_strs = json_col.as_string::(); + + for (i, expected) in rows.iter().enumerate() { + match expected { + None => assert!(json_strs.is_null(i), "row {i} expected null"), + Some(raw) => { + assert!(!json_strs.is_null(i), "row {i} expected non-null"); + let actual: serde_json::Value = serde_json::from_str(json_strs.value(i)) + .expect("read-back JSON should be valid"); + let expected: serde_json::Value = + serde_json::from_str(raw).expect("expected JSON should be valid"); + assert_eq!(actual, expected, "row {i} JSON mismatch"); + } + } + } + } } diff --git a/rust/lancedb/src/table/datafusion/cast.rs b/rust/lancedb/src/table/datafusion/cast.rs index b4abb16c5..ccf72ccb8 100644 --- a/rust/lancedb/src/table/datafusion/cast.rs +++ b/rust/lancedb/src/table/datafusion/cast.rs @@ -13,6 +13,7 @@ use datafusion_physical_expr::expressions::{CastExpr, Literal}; use datafusion_physical_plan::expressions::Column; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::{ExecutionPlan, PhysicalExpr}; +use lance_arrow::json::{is_arrow_json_field, is_json_field}; use crate::{Error, Result}; @@ -64,6 +65,18 @@ fn build_field_exprs( let input_field = &input_fields[input_idx]; let input_expr = get_input_expr(input_idx); + // Special case: input is arrow.json (PyArrow pa.json_() extension type backed by + // Utf8/LargeUtf8) and the table field is lance.json (backed by LargeBinary). + // Lance-core's write path already handles the arrow.json → lance.json conversion + // (including JSONB encoding), so we pass the expression through unchanged and let + // lance-core deal with it. Attempting to cast Utf8 → LargeBinary here would + // produce a field whose metadata still identifies it as arrow.json, which then + // causes a schema-mismatch error inside lance-core. + if is_arrow_json_field(input_field) && is_json_field(table_field) { + result.push((input_expr, Arc::clone(input_field) as FieldRef)); + continue; + } + let expr = match (input_field.data_type(), table_field.data_type()) { // Both are structs: recurse into sub-fields to handle subschemas and casts. (DataType::Struct(in_children), DataType::Struct(tbl_children)) @@ -618,4 +631,75 @@ mod tests { .unwrap(); assert_eq!(a.values(), &[1, 3]); } + + /// `arrow.json` input (PyArrow `pa.json_()`, Utf8/LargeUtf8 + extension metadata) against a + /// `lance.json` table field (LargeBinary + extension metadata) must be passed through + /// without a cast so that lance-core can perform its own arrow.json → JSONB conversion. + /// + /// Before the fix, `cast_to_table_schema` attempted a `Utf8 → LargeBinary` DataFusion + /// cast that preserved the wrong extension metadata, causing lance-core to reject the + /// batch with a "json vs large_binary" schema-mismatch error. + #[rstest::rstest] + #[case::utf8(DataType::Utf8)] + #[case::large_utf8(DataType::LargeUtf8)] + #[tokio::test] + async fn test_arrow_json_passthrough_to_lance_json(#[case] input_type: DataType) { + use lance_arrow::ARROW_EXT_NAME_KEY; + use lance_arrow::json::{ARROW_JSON_EXT_NAME, json_field}; + + // Build a table schema with a lance.json field (LargeBinary + lance.json metadata). + let lance_field = json_field("data", true); + let table_schema = Schema::new(vec![lance_field]); + + // Build an input batch with an arrow.json field (Utf8/LargeUtf8 + arrow.json metadata). + let arrow_meta = std::collections::HashMap::from([( + ARROW_EXT_NAME_KEY.to_string(), + ARROW_JSON_EXT_NAME.to_string(), + )]); + let arrow_field = Field::new("data", input_type.clone(), true).with_metadata(arrow_meta); + let input_schema = Arc::new(Schema::new(vec![arrow_field])); + + let values = vec![Some(r#"{"x": 1}"#), None, Some(r#"{"y": 2}"#)]; + let input_array: Arc = match input_type { + DataType::Utf8 => Arc::new(StringArray::from(values)), + DataType::LargeUtf8 => Arc::new(arrow_array::LargeStringArray::from(values)), + other => panic!("unsupported arrow.json backing type for this test: {other:?}"), + }; + let input_batch = RecordBatch::try_new(input_schema, vec![input_array]).unwrap(); + + let plan = plan_from_batch(input_batch).await; + let projected = cast_to_table_schema(plan, &table_schema).unwrap(); + + // The projected schema's "data" field must carry arrow.json metadata + // (the input field), not be silently dropped or miscast. + let out_field = projected.schema().field_with_name("data").unwrap().clone(); + assert_eq!(out_field.data_type(), &input_type); + assert_eq!( + out_field + .metadata() + .get(ARROW_EXT_NAME_KEY) + .map(|s| s.as_str()), + Some(ARROW_JSON_EXT_NAME), + "output field must still carry arrow.json metadata so lance-core can handle it" + ); + + // The data must flow through correctly (3 rows, no panic). + let result = collect(projected).await; + assert_eq!(result.num_rows(), 3); + let (v0, v2) = match input_type { + DataType::Utf8 => { + let col: &StringArray = result.column(0).as_any().downcast_ref().unwrap(); + (col.value(0).to_string(), col.value(2).to_string()) + } + DataType::LargeUtf8 => { + let col: &arrow_array::LargeStringArray = + result.column(0).as_any().downcast_ref().unwrap(); + (col.value(0).to_string(), col.value(2).to_string()) + } + _ => unreachable!(), + }; + assert_eq!(v0, r#"{"x": 1}"#); + assert!(result.column(0).is_null(1)); + assert_eq!(v2, r#"{"y": 2}"#); + } }