Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-04-07 17:18:07 +08:00
parent 4cf5d23df4
commit 08348cf37a

View File

@@ -18,19 +18,20 @@ use std::sync::Arc;
use std::time::Instant;
use async_stream::try_stream;
use common_recordbatch::recordbatch::maybe_align_json_array_with_schema;
use common_telemetry::debug;
use datatypes::arrow::array::{Int64Array, UInt64Array};
use datatypes::arrow::compute::interleave;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow_array::BinaryArray;
use datatypes::extension::json::is_json_extension_type;
use datatypes::timestamp::timestamp_array_to_primitive;
use datatypes::vectors::json::array::JsonArray;
use futures::{Stream, TryStreamExt};
use snafu::ResultExt;
use store_api::storage::SequenceNumber;
use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result};
use crate::error::{ComputeArrowSnafu, DataTypeMismatchSnafu, Result};
use crate::memtable::BoxedRecordBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::BoxedRecordBatchStream;
@@ -122,24 +123,28 @@ impl BatchBuilder {
return Ok(None);
}
let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self
let columns = self
.schema
.fields()
.iter()
.enumerate()
.map(|(column_idx, field)| {
let arrays = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).clone())
.collect();
common_telemetry::debug!("before aligned:");
for (i, array) in arrays.iter().enumerate() {
common_telemetry::debug!("{i}. {}", array.data_type());
}
let aligned = maybe_align_json_array_with_schema(&self.schema, arrays)
.context(RecordBatchSnafu)?;
let aligned = aligned.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
common_telemetry::debug!("after aligned:");
for (i, array) in aligned.iter().enumerate() {
common_telemetry::debug!("{i}. {}", array.data_type());
}
.map(|(_, batch)| {
let column = batch.column(column_idx);
let column = if is_json_extension_type(field) {
JsonArray::from(column)
.try_align(field.data_type())
.context(DataTypeMismatchSnafu)?
} else {
column.clone()
};
Ok(column)
})
.collect::<Result<Vec<_>>>()?;
let aligned = arrays.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
interleave(&aligned, &self.indices).context(ComputeArrowSnafu)
})
.collect::<Result<Vec<_>>>()?;