apply gemini's sugg

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2026-03-20 06:25:02 +08:00
parent 483daeebe9
commit ac6aeb319e
2 changed files with 9 additions and 56 deletions

View File

@@ -18,21 +18,21 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu, NewDfRecordBatchSnafu};
use common_recordbatch::{DfRecordBatch, RecordBatch};
use datatypes::arrow::array::Array;
use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field};
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
use datatypes::vectors::{Helper, VectorRef};
use datatypes::vectors::Helper;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::storage::ColumnId;
use crate::cache::CacheStrategy;
use crate::error::{InvalidRequestSnafu, RecordBatchSnafu, Result};
use crate::read::projection::read_column_ids_from_projection;
use crate::read::projection::{read_column_ids_from_projection, repeated_vector_with_cache};
use crate::sst::parquet::flat_format::sst_column_id_indices;
use crate::sst::parquet::format::FormatProjection;
use crate::sst::{
@@ -66,9 +66,6 @@ pub struct FlatProjectionMapper {
input_arrow_schema: datatypes::arrow::datatypes::SchemaRef,
}
/// Max length of a repeated vector we will store in cache.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
impl FlatProjectionMapper {
/// Returns a new mapper with projection.
/// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
@@ -307,13 +304,7 @@ impl FlatProjectionMapper {
let df_record_batch =
DfRecordBatch::try_new(self.output_schema.arrow_schema().clone(), arrays)
.map_err(|e| {
BoxedError::new(common_error::ext::PlainError::new(
e.to_string(),
common_error::status_code::StatusCode::Internal,
))
})
.context(ExternalSnafu)?;
.context(NewDfRecordBatchSnafu)?;
Ok(RecordBatch::from_df_record_batch(
self.output_schema.clone(),
df_record_batch,
@@ -345,43 +336,6 @@ impl FlatProjectionMapper {
}
}
fn repeated_vector_with_cache(
data_type: &ConcreteDataType,
value: &Value,
num_rows: usize,
cache_strategy: &CacheStrategy,
) -> common_recordbatch::error::Result<VectorRef> {
if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
// If the cached vector doesn't have enough length, create a new one.
match vector.len().cmp(&num_rows) {
std::cmp::Ordering::Less => {}
std::cmp::Ordering::Equal => return Ok(vector),
std::cmp::Ordering::Greater => return Ok(vector.slice(0, num_rows)),
}
}
let vector = new_repeated_vector(data_type, value, num_rows)?;
if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
cache_strategy.put_repeated_vector(value.clone(), vector.clone());
}
Ok(vector)
}
fn new_repeated_vector(
data_type: &ConcreteDataType,
value: &Value,
num_rows: usize,
) -> common_recordbatch::error::Result<VectorRef> {
let mut mutable_vector = data_type.create_mutable_vector(1);
mutable_vector
.try_push_value_ref(&value.as_value_ref())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let base_vector = mutable_vector.to_vector();
Ok(base_vector.replicate(&[num_rows]))
}
/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
///
/// It adds the time index column if it doesn't present in the projection.

View File

@@ -21,7 +21,7 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::RecordBatch;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::error::{DataTypesSnafu, ExternalSnafu};
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
use datatypes::value::Value;
@@ -37,7 +37,7 @@ use crate::read::Batch;
use crate::read::flat_projection::FlatProjectionMapper;
/// Only cache vector when its length `<=` this value.
const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
/// Wrapper enum for different projection mapper implementations.
pub enum ProjectionMapper {
@@ -423,7 +423,7 @@ enum BatchIndex {
}
/// Gets a vector with repeated values from specific cache or creates a new one.
fn repeated_vector_with_cache(
pub(crate) fn repeated_vector_with_cache(
data_type: &ConcreteDataType,
value: &Value,
num_rows: usize,
@@ -450,7 +450,7 @@ fn repeated_vector_with_cache(
}
/// Returns a vector with repeated values.
fn new_repeated_vector(
pub(crate) fn new_repeated_vector(
data_type: &ConcreteDataType,
value: &Value,
num_rows: usize,
@@ -458,8 +458,7 @@ fn new_repeated_vector(
let mut mutable_vector = data_type.create_mutable_vector(1);
mutable_vector
.try_push_value_ref(&value.as_value_ref())
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
.context(DataTypesSnafu)?;
// This requires an additional allocation.
let base_vector = mutable_vector.to_vector();
Ok(base_vector.replicate(&[num_rows]))