fix: add overflow check before interleave() (#7921)

* fix: add overflow check before interleave()

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: pass batches and column index to check_interleave_bytes_overflow

Refactor check_interleave_bytes_overflow to accept batches and a column
index directly, avoiding the intermediate Vec collection of arrays.

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-04-08 05:59:29 +08:00
committed by GitHub
parent 1df9837538
commit 6c72dc8e57

View File

@@ -19,9 +19,10 @@ use std::time::Instant;
use async_stream::try_stream;
use common_telemetry::debug;
use datatypes::arrow::array::{Int64Array, UInt64Array};
use datatypes::arrow::array::{Array, AsArray, Int64Array, UInt64Array};
use datatypes::arrow::compute::interleave;
use datatypes::arrow::datatypes::SchemaRef;
use datatypes::arrow::datatypes::{ArrowNativeType, BinaryType, DataType, SchemaRef, Utf8Type};
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow_array::BinaryArray;
use datatypes::timestamp::timestamp_array_to_primitive;
@@ -39,6 +40,62 @@ use crate::sst::parquet::flat_format::{
};
use crate::sst::parquet::format::PrimaryKeyArray;
/// Checks whether interleaving the selected rows from byte columns would overflow
/// i32 offsets. Similar to arrow-rs `interleave_bytes()`, accumulates offsets and
/// returns an error if the capacity exceeds `i32::MAX`.
///
/// TODO(yingwen): Remove this after upgrading to arrow >= 58.1.0, which handles
/// offset overflow in `interleave_bytes()` natively.
///
/// See: <https://github.com/apache/arrow-rs/blob/65ad652f2410fc51ad77da1805e85c0a76d9a7ea/arrow-select/src/interleave.rs#L208-L225>
fn check_interleave_bytes_overflow<T: datatypes::arrow::datatypes::ByteArrayType>(
batches: &[(usize, RecordBatch)],
col_idx: usize,
indices: &[(usize, usize)],
) -> std::result::Result<(), ArrowError> {
// Quick check: if concatenating all value data won't overflow, interleaving
// a subset of rows definitely won't either.
let total: usize = batches
.iter()
.map(|(_, batch)| batch.column(col_idx).as_bytes::<T>().value_data().len())
.sum();
if T::Offset::from_usize(total).is_some() {
return Ok(());
}
// Total exceeds the offset limit, do the precise per-row check.
let mut capacity: usize = 0;
for &(a, b) in indices {
let array = batches[a].1.column(col_idx).as_bytes::<T>();
let o = array.value_offsets();
let element_len = o[b + 1].as_usize() - o[b].as_usize();
capacity += element_len;
T::Offset::from_usize(capacity).ok_or(ArrowError::OffsetOverflowError(capacity))?;
}
Ok(())
}
/// Checks whether `interleave()` would overflow i32 offsets for `Utf8` or `Binary` columns.
fn check_interleave_overflow(
batches: &[(usize, RecordBatch)],
schema: &SchemaRef,
indices: &[(usize, usize)],
) -> Result<()> {
for (col_idx, field) in schema.fields.iter().enumerate() {
match field.data_type() {
DataType::Utf8 => {
check_interleave_bytes_overflow::<Utf8Type>(batches, col_idx, indices)
.context(ComputeArrowSnafu)?;
}
DataType::Binary => {
check_interleave_bytes_overflow::<BinaryType>(batches, col_idx, indices)
.context(ComputeArrowSnafu)?;
}
_ => continue,
}
}
Ok(())
}
/// Keeps track of the current position in a batch
#[derive(Debug, Copy, Clone, Default)]
struct BatchCursor {
@@ -121,6 +178,8 @@ impl BatchBuilder {
return Ok(None);
}
check_interleave_overflow(&self.batches, &self.schema, &self.indices)?;
let columns = (0..self.schema.fields.len())
.map(|column_idx| {
let arrays: Vec<_> = self