From 08348cf37ac3bf3893a0e9cd50461986b630dfd9 Mon Sep 17 00:00:00 2001 From: luofucong Date: Tue, 7 Apr 2026 17:18:07 +0800 Subject: [PATCH] x Signed-off-by: luofucong --- src/mito2/src/read/flat_merge.rs | 41 ++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs index 2b1d5fb00d..4876f1232a 100644 --- a/src/mito2/src/read/flat_merge.rs +++ b/src/mito2/src/read/flat_merge.rs @@ -18,19 +18,20 @@ use std::sync::Arc; use std::time::Instant; use async_stream::try_stream; -use common_recordbatch::recordbatch::maybe_align_json_array_with_schema; use common_telemetry::debug; use datatypes::arrow::array::{Int64Array, UInt64Array}; use datatypes::arrow::compute::interleave; use datatypes::arrow::datatypes::SchemaRef; use datatypes::arrow::record_batch::RecordBatch; use datatypes::arrow_array::BinaryArray; +use datatypes::extension::json::is_json_extension_type; use datatypes::timestamp::timestamp_array_to_primitive; +use datatypes::vectors::json::array::JsonArray; use futures::{Stream, TryStreamExt}; use snafu::ResultExt; use store_api::storage::SequenceNumber; -use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result}; +use crate::error::{ComputeArrowSnafu, DataTypeMismatchSnafu, Result}; use crate::memtable::BoxedRecordBatchIterator; use crate::metrics::READ_STAGE_ELAPSED; use crate::read::BoxedRecordBatchStream; @@ -122,24 +123,28 @@ impl BatchBuilder { return Ok(None); } - let columns = (0..self.schema.fields.len()) - .map(|column_idx| { - let arrays: Vec<_> = self + let columns = self + .schema + .fields() + .iter() + .enumerate() + .map(|(column_idx, field)| { + let arrays = self .batches .iter() - .map(|(_, batch)| batch.column(column_idx).clone()) - .collect(); - common_telemetry::debug!("before aligned:"); - for (i, array) in arrays.iter().enumerate() { - common_telemetry::debug!("{i}. {}", array.data_type()); - } - let aligned = maybe_align_json_array_with_schema(&self.schema, arrays) - .context(RecordBatchSnafu)?; - let aligned = aligned.iter().map(|x| x.as_ref()).collect::>(); - common_telemetry::debug!("after aligned:"); - for (i, array) in aligned.iter().enumerate() { - common_telemetry::debug!("{i}. {}", array.data_type()); - } + .map(|(_, batch)| { + let column = batch.column(column_idx); + let column = if is_json_extension_type(field) { + JsonArray::from(column) + .try_align(field.data_type()) + .context(DataTypeMismatchSnafu)? + } else { + column.clone() + }; + Ok(column) + }) + .collect::>>()?; + let aligned = arrays.iter().map(|x| x.as_ref()).collect::>(); interleave(&aligned, &self.indices).context(ComputeArrowSnafu) }) .collect::>>()?;