From 6c72dc8e5754d0980247031bece373258ec2fd8b Mon Sep 17 00:00:00 2001 From: Yingwen Date: Wed, 8 Apr 2026 05:59:29 +0800 Subject: [PATCH] fix: add overflow check before interleave() (#7921) * fix: add overflow check before interleave() Signed-off-by: evenyag * refactor: pass batches and column index to check_interleave_bytes_overflow Refactor check_interleave_bytes_overflow to accept batches and a column index directly, avoiding the intermediate Vec collection of arrays. Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/read/flat_merge.rs | 63 +++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/read/flat_merge.rs b/src/mito2/src/read/flat_merge.rs index 90df227ae9..946f2a610c 100644 --- a/src/mito2/src/read/flat_merge.rs +++ b/src/mito2/src/read/flat_merge.rs @@ -19,9 +19,10 @@ use std::time::Instant; use async_stream::try_stream; use common_telemetry::debug; -use datatypes::arrow::array::{Int64Array, UInt64Array}; +use datatypes::arrow::array::{Array, AsArray, Int64Array, UInt64Array}; use datatypes::arrow::compute::interleave; -use datatypes::arrow::datatypes::SchemaRef; +use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaRef, Utf8Type}; +use datatypes::arrow::error::ArrowError; use datatypes::arrow::record_batch::RecordBatch; use datatypes::arrow_array::BinaryArray; use datatypes::timestamp::timestamp_array_to_primitive; @@ -39,6 +40,62 @@ use crate::sst::parquet::flat_format::{ }; use crate::sst::parquet::format::PrimaryKeyArray; +/// Checks whether interleaving the selected rows from byte columns would overflow +/// i32 offsets. Similar to arrow-rs `interleave_bytes()`, accumulates offsets and +/// returns an error if the capacity exceeds `i32::MAX`. +/// +/// TODO(yingwen): Remove this after upgrading to arrow >= 58.1.0, which handles +/// offset overflow in `interleave_bytes()` natively. +/// +/// See: +fn check_interleave_bytes_overflow( + batches: &[(usize, RecordBatch)], + col_idx: usize, + indices: &[(usize, usize)], +) -> std::result::Result<(), ArrowError> { + // Quick check: if concatenating all value data won't overflow, interleaving + // a subset of rows definitely won't either. + let total: usize = batches + .iter() + .map(|(_, batch)| batch.column(col_idx).as_bytes::().value_data().len()) + .sum(); + if T::Offset::from_usize(total).is_some() { + return Ok(()); + } + // Total exceeds the offset limit, do the precise per-row check. + let mut capacity: usize = 0; + for &(a, b) in indices { + let array = batches[a].1.column(col_idx).as_bytes::(); + let o = array.value_offsets(); + let element_len = o[b + 1].as_usize() - o[b].as_usize(); + capacity += element_len; + T::Offset::from_usize(capacity).ok_or(ArrowError::OffsetOverflowError(capacity))?; + } + Ok(()) +} + +/// Checks whether `interleave()` would overflow i32 offsets for `Utf8` or `Binary` columns. +fn check_interleave_overflow( + batches: &[(usize, RecordBatch)], + schema: &SchemaRef, + indices: &[(usize, usize)], +) -> Result<()> { + for (col_idx, field) in schema.fields.iter().enumerate() { + match field.data_type() { + DataType::Utf8 => { + check_interleave_bytes_overflow::(batches, col_idx, indices) + .context(ComputeArrowSnafu)?; + } + DataType::Binary => { + check_interleave_bytes_overflow::(batches, col_idx, indices) + .context(ComputeArrowSnafu)?; + } + _ => continue, + } + } + Ok(()) +} + /// Keeps track of the current position in a batch #[derive(Debug, Copy, Clone, Default)] struct BatchCursor { @@ -121,6 +178,8 @@ impl BatchBuilder { return Ok(None); } + check_interleave_overflow(&self.batches, &self.schema, &self.indices)?; + let columns = (0..self.schema.fields.len()) .map(|column_idx| { let arrays: Vec<_> = self