wip: naive impl

This commit is contained in:
Lei, HUANG
2025-03-27 14:03:12 +00:00
parent 09ef24fd75
commit 11c5cb44d8
2 changed files with 69 additions and 5 deletions

View File

@@ -173,6 +173,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to convert to vector"))]
ConvertToVector {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for Error {
@@ -180,26 +187,23 @@ impl ErrorExt for Error {
match self {
Error::GetCache { .. } | Error::FindLeader { .. } => StatusCode::StorageUnavailable,
Error::FindRegionRoutes { .. } => StatusCode::RegionNotReady,
Error::ConjunctExprWithNonExpr { .. }
| Error::UnclosedValue { .. }
| Error::InvalidExpr { .. }
| Error::UndefinedColumn { .. } => StatusCode::InvalidArguments,
Error::RegionKeysSize { .. }
| Error::InvalidInsertRequest { .. }
| Error::InvalidDeleteRequest { .. } => StatusCode::InvalidArguments,
Error::ConvertScalarValue { .. }
| Error::SerializeJson { .. }
| Error::DeserializeJson { .. } => StatusCode::Internal,
Error::Unexpected { .. } => StatusCode::Unexpected,
Error::InvalidTableRouteData { .. } => StatusCode::TableUnavailable,
Error::FindTableRoutes { .. } => StatusCode::TableUnavailable,
Error::TableRouteNotFound { .. } => StatusCode::TableNotFound,
Error::TableRouteManager { source, .. } => source.status_code(),
Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(),
Error::ConvertToVector { source, .. } => source.status_code(),
}
}

View File

@@ -16,9 +16,12 @@ use std::any::Any;
use std::cmp::Ordering;
use std::collections::HashMap;
use datatypes::arrow::array::{BooleanArray, BooleanBufferBuilder, RecordBatch};
use datatypes::arrow::buffer::BooleanBuffer;
use datatypes::prelude::Value;
use datatypes::vectors::{Helper, VectorRef};
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::RegionNumber;
use crate::error::{
@@ -134,6 +137,63 @@ impl MultiDimPartitionRule {
Ok(result)
}
fn row_at(&self, cols: &[VectorRef], index: usize, row: &mut [Value]) -> Result<()> {
for (col_idx, col) in cols.iter().enumerate() {
row[col_idx] = col.get(index);
}
Ok(())
}
fn record_batch_to_cols(&self, record_batch: &RecordBatch) -> Result<Vec<VectorRef>> {
self.partition_columns
.iter()
.map(|col_name| {
record_batch
.column_by_name(col_name)
.context(error::UndefinedColumnSnafu { column: col_name })
.and_then(|array| {
Helper::try_into_vector(array).context(error::ConvertToVectorSnafu)
})
})
.collect::<Result<Vec<_>>>()
}
fn split_record_batch_naive(
&self,
record_batch: &RecordBatch,
) -> Result<HashMap<RegionNumber, BooleanArray>> {
let num_rows = record_batch.num_rows();
let mut result = self
.regions
.iter()
.map(|region| (*region, BooleanBufferBuilder::new(num_rows)))
.collect::<HashMap<_, _>>();
let cols = self.record_batch_to_cols(record_batch)?;
let mut current_row = vec![Value::Null; self.partition_columns.len()];
for row_idx in 0..num_rows {
self.row_at(&cols, row_idx, &mut current_row)?;
let current_region = self.find_region(&current_row)?;
let region_mask = result
.get_mut(&current_region)
.expect(&format!("Region {} must be initialized", current_region));
region_mask.set_bit(row_idx, true);
}
Ok(result
.into_iter()
.map(|(region, mut mask)| (region, BooleanArray::new(mask.finish(), None)))
.collect())
}
fn split_record_batch(
&self,
record_batch: &RecordBatch,
) -> Result<HashMap<RegionNumber, BooleanArray>> {
todo!()
}
}
impl PartitionRule for MultiDimPartitionRule {