diff --git a/src/mito2/src/read/batch_adapter.rs b/src/mito2/src/read/batch_adapter.rs index 660d56c8ef..360f591658 100644 --- a/src/mito2/src/read/batch_adapter.rs +++ b/src/mito2/src/read/batch_adapter.rs @@ -682,7 +682,7 @@ mod tests { let rb = adapter.into_iter().next().unwrap().unwrap(); let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap(); - assert_eq!(rb.schema(), mapper.input_arrow_schema()); + assert_eq!(rb.schema(), mapper.input_arrow_schema(false, None)); // tag_0 + field_1 + ts + 3 internal columns. assert_eq!(6, rb.num_columns()); assert_eq!(3, rb.num_rows()); diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs index 90df227ae9..1284daa5fa 100644 --- a/src/mito2/src/read/flat_merge.rs +++ b/src/mito2/src/read/flat_merge.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use std::time::Instant; use async_stream::try_stream; +use common_recordbatch::recordbatch::maybe_align_json_array_with_schema; use common_telemetry::debug; use datatypes::arrow::array::{Int64Array, UInt64Array}; use datatypes::arrow::compute::interleave; @@ -29,7 +30,7 @@ use futures::{Stream, TryStreamExt}; use snafu::ResultExt; use store_api::storage::SequenceNumber; -use crate::error::{ComputeArrowSnafu, Result}; +use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result}; use crate::memtable::BoxedRecordBatchIterator; use crate::metrics::READ_STAGE_ELAPSED; use crate::read::BoxedRecordBatchStream; @@ -126,9 +127,12 @@ impl BatchBuilder { let arrays: Vec<_> = self .batches .iter() - .map(|(_, batch)| batch.column(column_idx).as_ref()) + .map(|(_, batch)| batch.column(column_idx).clone()) .collect(); - interleave(&arrays, &self.indices).context(ComputeArrowSnafu) + let aligned = maybe_align_json_array_with_schema(&self.schema, arrays) + .context(RecordBatchSnafu)?; + let aligned = aligned.iter().map(|x| x.as_ref()).collect::>(); + interleave(&aligned, &self.indices).context(ComputeArrowSnafu) }) .collect::>>()?; diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs index ff4fe498d4..8552c04f7d 100644 --- a/src/mito2/src/read/flat_projection.rs +++ b/src/mito2/src/read/flat_projection.rs @@ -39,7 +39,6 @@ use store_api::storage::ColumnId; use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result}; use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache}; -use crate::read::scan_region::ScanInput; use crate::sst::parquet::flat_format::sst_column_id_indices; use crate::sst::parquet::format::FormatProjection; use crate::sst::{ @@ -223,20 +222,15 @@ impl FlatProjectionMapper { &self.batch_schema } - #[cfg(test)] - pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef { - self.input_arrow_schema.clone() - } - /// Returns the input arrow schema from sources. /// /// The merge reader can use this schema. - pub(crate) fn arrow_schema_by_compaction( + pub(crate) fn input_arrow_schema( &self, - scan_input: &ScanInput, + compaction: bool, json_concretized_schema: Option, ) -> datatypes::arrow::datatypes::SchemaRef { - if !scan_input.compaction { + if !compaction { self.input_arrow_schema.clone() } else { // For compaction, we need to build a different schema from encoding. diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 1a78428564..60c523a732 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -216,7 +216,8 @@ impl SeqScan { } let mapper = stream_ctx.input.mapper.as_flat().unwrap(); - let schema = mapper.arrow_schema_by_compaction(&stream_ctx.input, json_concretized_schema); + let schema = + mapper.input_arrow_schema(stream_ctx.input.compaction, json_concretized_schema); let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter()); let reader =