diff --git a/src/partition/src/error.rs b/src/partition/src/error.rs index 00c18695dc..a78e82f5b1 100644 --- a/src/partition/src/error.rs +++ b/src/partition/src/error.rs @@ -173,6 +173,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to convert to vector"))] + ConvertToVector { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -180,26 +187,23 @@ impl ErrorExt for Error { match self { Error::GetCache { .. } | Error::FindLeader { .. } => StatusCode::StorageUnavailable, Error::FindRegionRoutes { .. } => StatusCode::RegionNotReady, - Error::ConjunctExprWithNonExpr { .. } | Error::UnclosedValue { .. } | Error::InvalidExpr { .. } | Error::UndefinedColumn { .. } => StatusCode::InvalidArguments, - Error::RegionKeysSize { .. } | Error::InvalidInsertRequest { .. } | Error::InvalidDeleteRequest { .. } => StatusCode::InvalidArguments, - Error::ConvertScalarValue { .. } | Error::SerializeJson { .. } | Error::DeserializeJson { .. } => StatusCode::Internal, - Error::Unexpected { .. } => StatusCode::Unexpected, Error::InvalidTableRouteData { .. } => StatusCode::TableUnavailable, Error::FindTableRoutes { .. } => StatusCode::TableUnavailable, Error::TableRouteNotFound { .. } => StatusCode::TableNotFound, Error::TableRouteManager { source, .. } => source.status_code(), Error::UnexpectedLogicalRouteTable { source, .. } => source.status_code(), + Error::ConvertToVector { source, .. } => source.status_code(), } } diff --git a/src/partition/src/multi_dim.rs b/src/partition/src/multi_dim.rs index f47d71f98b..ae4044fe7e 100644 --- a/src/partition/src/multi_dim.rs +++ b/src/partition/src/multi_dim.rs @@ -16,9 +16,12 @@ use std::any::Any; use std::cmp::Ordering; use std::collections::HashMap; +use datatypes::arrow::array::{BooleanArray, BooleanBufferBuilder, RecordBatch}; +use datatypes::arrow::buffer::BooleanBuffer; use datatypes::prelude::Value; +use datatypes::vectors::{Helper, VectorRef}; use serde::{Deserialize, Serialize}; -use snafu::{ensure, OptionExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionNumber; use crate::error::{ @@ -134,6 +137,63 @@ impl MultiDimPartitionRule { Ok(result) } + + fn row_at(&self, cols: &[VectorRef], index: usize, row: &mut [Value]) -> Result<()> { + for (col_idx, col) in cols.iter().enumerate() { + row[col_idx] = col.get(index); + } + Ok(()) + } + + fn record_batch_to_cols(&self, record_batch: &RecordBatch) -> Result> { + self.partition_columns + .iter() + .map(|col_name| { + record_batch + .column_by_name(col_name) + .context(error::UndefinedColumnSnafu { column: col_name }) + .and_then(|array| { + Helper::try_into_vector(array).context(error::ConvertToVectorSnafu) + }) + }) + .collect::>>() + } + + fn split_record_batch_naive( + &self, + record_batch: &RecordBatch, + ) -> Result> { + let num_rows = record_batch.num_rows(); + + let mut result = self + .regions + .iter() + .map(|region| (*region, BooleanBufferBuilder::new(num_rows))) + .collect::>(); + + let cols = self.record_batch_to_cols(record_batch)?; + let mut current_row = vec![Value::Null; self.partition_columns.len()]; + for row_idx in 0..num_rows { + self.row_at(&cols, row_idx, &mut current_row)?; + let current_region = self.find_region(¤t_row)?; + let region_mask = result + .get_mut(¤t_region) + .expect(&format!("Region {} must be initialized", current_region)); + region_mask.set_bit(row_idx, true); + } + + Ok(result + .into_iter() + .map(|(region, mut mask)| (region, BooleanArray::new(mask.finish(), None))) + .collect()) + } + + fn split_record_batch( + &self, + record_batch: &RecordBatch, + ) -> Result> { + todo!() + } } impl PartitionRule for MultiDimPartitionRule {