Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
luofucong
2026-04-07 15:41:27 +08:00
parent f0adf296d6
commit fadb71298d
4 changed files with 13 additions and 14 deletions

View File

@@ -682,7 +682,7 @@ mod tests {
let rb = adapter.into_iter().next().unwrap().unwrap();
let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap();
assert_eq!(rb.schema(), mapper.input_arrow_schema());
assert_eq!(rb.schema(), mapper.input_arrow_schema(false, None));
// tag_0 + field_1 + ts + 3 internal columns.
assert_eq!(6, rb.num_columns());
assert_eq!(3, rb.num_rows());

View File

@@ -18,6 +18,7 @@ use std::sync::Arc;
use std::time::Instant;
use async_stream::try_stream;
use common_recordbatch::recordbatch::maybe_align_json_array_with_schema;
use common_telemetry::debug;
use datatypes::arrow::array::{Int64Array, UInt64Array};
use datatypes::arrow::compute::interleave;
@@ -29,7 +30,7 @@ use futures::{Stream, TryStreamExt};
use snafu::ResultExt;
use store_api::storage::SequenceNumber;
use crate::error::{ComputeArrowSnafu, Result};
use crate::error::{ComputeArrowSnafu, RecordBatchSnafu, Result};
use crate::memtable::BoxedRecordBatchIterator;
use crate::metrics::READ_STAGE_ELAPSED;
use crate::read::BoxedRecordBatchStream;
@@ -126,9 +127,12 @@ impl BatchBuilder {
let arrays: Vec<_> = self
.batches
.iter()
.map(|(_, batch)| batch.column(column_idx).as_ref())
.map(|(_, batch)| batch.column(column_idx).clone())
.collect();
interleave(&arrays, &self.indices).context(ComputeArrowSnafu)
let aligned = maybe_align_json_array_with_schema(&self.schema, arrays)
.context(RecordBatchSnafu)?;
let aligned = aligned.iter().map(|x| x.as_ref()).collect::<Vec<_>>();
interleave(&aligned, &self.indices).context(ComputeArrowSnafu)
})
.collect::<Result<Vec<_>>>()?;

View File

@@ -39,7 +39,6 @@ use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
use crate::read::scan_region::ScanInput;
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
use crate::sst::{
@@ -223,20 +222,15 @@ impl FlatProjectionMapper {
&self.batch_schema
}
#[cfg(test)]
pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef {
self.input_arrow_schema.clone()
}
/// Returns the input arrow schema from sources.
///
/// The merge reader can use this schema.
pub(crate) fn arrow_schema_by_compaction(
pub(crate) fn input_arrow_schema(
&self,
scan_input: &ScanInput,
compaction: bool,
json_concretized_schema: Option<ArrowSchemaRef>,
) -> datatypes::arrow::datatypes::SchemaRef {
if !scan_input.compaction {
if !compaction {
self.input_arrow_schema.clone()
} else {
// For compaction, we need to build a different schema from encoding.

View File

@@ -216,7 +216,8 @@ impl SeqScan {
}
let mapper = stream_ctx.input.mapper.as_flat().unwrap();
let schema = mapper.arrow_schema_by_compaction(&stream_ctx.input, json_concretized_schema);
let schema =
mapper.input_arrow_schema(stream_ctx.input.compaction, json_concretized_schema);
let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
let reader =