diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 2dc5d98e41..a62fcee536 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -47,6 +47,7 @@ use store_api::metric_engine_consts::{ }; use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY}; use store_api::storage::{RegionId, TableId}; +use table::metadata::TableInfoRef; use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY}; use table::table_reference::TableReference; use table::TableRef; @@ -200,11 +201,11 @@ impl Inserter { }); validate_column_count_match(&requests)?; - let (table_name_to_ids, instant_table_ids) = self + let (tables_info, instant_table_ids) = self .create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor) .await?; let inserts = RowToRegion::new( - table_name_to_ids, + tables_info, instant_table_ids, self.partition_manager.as_ref(), ) @@ -236,7 +237,7 @@ impl Inserter { .await?; // check and create logical tables - let (table_name_to_ids, instant_table_ids) = self + let (tables_info, instant_table_ids) = self .create_or_alter_tables_on_demand( &requests, &ctx, @@ -244,13 +245,9 @@ impl Inserter { statement_executor, ) .await?; - let inserts = RowToRegion::new( - table_name_to_ids, - instant_table_ids, - &self.partition_manager, - ) - .convert(requests) - .await?; + let inserts = RowToRegion::new(tables_info, instant_table_ids, &self.partition_manager) + .convert(requests) + .await?; self.do_request(inserts, &ctx).await } @@ -499,14 +496,14 @@ impl Inserter { ctx: &QueryContextRef, auto_create_table_type: AutoCreateTableType, statement_executor: &StatementExecutor, - ) -> Result<(HashMap, HashSet)> { + ) -> Result<(HashMap, HashSet)> { let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND .with_label_values(&[auto_create_table_type.as_str()]) .start_timer(); let catalog = ctx.current_catalog(); let schema = ctx.current_schema(); - let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len()); + let mut tables_info = HashMap::with_capacity(requests.inserts.len()); // If `auto_create_table` hint is disabled, skip creating/altering tables. let auto_create_table_hint = ctx .extension(AUTO_CREATE_TABLE_KEY) @@ -535,9 +532,9 @@ impl Inserter { if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); } - table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); + tables_info.insert(table_info.name.clone(), table_info); } - return Ok((table_name_to_ids, instant_table_ids)); + return Ok((tables_info, instant_table_ids)); } let mut create_tables = vec![]; @@ -551,7 +548,7 @@ impl Inserter { if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); } - table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); + tables_info.insert(table_info.name.clone(), table_info); if let Some(alter_expr) = self.get_alter_table_expr_on_demand(req, &table, ctx)? { @@ -579,7 +576,7 @@ impl Inserter { if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); } - table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); + tables_info.insert(table_info.name.clone(), table_info); } } if !alter_tables.is_empty() { @@ -602,7 +599,7 @@ impl Inserter { if table_info.is_ttl_instant_table() { instant_table_ids.insert(table_info.table_id()); } - table_name_to_ids.insert(table_info.name.clone(), table_info.table_id()); + tables_info.insert(table_info.name.clone(), table_info); } for alter_expr in alter_tables.into_iter() { statement_executor @@ -612,7 +609,7 @@ impl Inserter { } } - Ok((table_name_to_ids, instant_table_ids)) + Ok((tables_info, instant_table_ids)) } async fn create_physical_table_on_demand( diff --git a/src/operator/src/req_convert/insert/row_to_region.rs b/src/operator/src/req_convert/insert/row_to_region.rs index 125910ba45..1e74704825 100644 --- a/src/operator/src/req_convert/insert/row_to_region.rs +++ b/src/operator/src/req_convert/insert/row_to_region.rs @@ -13,30 +13,31 @@ // limitations under the License. use ahash::{HashMap, HashSet}; -use api::v1::region::InsertRequests as RegionInsertRequests; +use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests}; use api::v1::RowInsertRequests; use partition::manager::PartitionRuleManager; use snafu::OptionExt; -use table::metadata::TableId; +use store_api::storage::{RegionId, RegionNumber}; +use table::metadata::{TableId, TableInfoRef}; use crate::error::{Result, TableNotFoundSnafu}; use crate::insert::InstantAndNormalInsertRequests; use crate::req_convert::common::partitioner::Partitioner; pub struct RowToRegion<'a> { - table_name_to_ids: HashMap, + tables_info: HashMap, instant_table_ids: HashSet, partition_manager: &'a PartitionRuleManager, } impl<'a> RowToRegion<'a> { pub fn new( - table_name_to_ids: HashMap, + tables_info: HashMap, instant_table_ids: HashSet, partition_manager: &'a PartitionRuleManager, ) -> Self { Self { - table_name_to_ids, + tables_info, instant_table_ids, partition_manager, } @@ -49,10 +50,24 @@ impl<'a> RowToRegion<'a> { let mut region_request = Vec::with_capacity(requests.inserts.len()); let mut instant_request = Vec::with_capacity(requests.inserts.len()); for request in requests.inserts { + let Some(rows) = request.rows else { continue }; + let table_id = self.get_table_id(&request.table_name)?; - let requests = Partitioner::new(self.partition_manager) - .partition_insert_requests(table_id, request.rows.unwrap_or_default()) - .await?; + let region_numbers = self.region_numbers(&request.table_name)?; + let requests = if let Some(region_id) = match region_numbers[..] { + [singular] => Some(RegionId::new(table_id, singular)), + _ => None, + } { + vec![InsertRequest { + region_id: region_id.as_u64(), + rows: Some(rows), + }] + } else { + Partitioner::new(self.partition_manager) + .partition_insert_requests(table_id, rows) + .await? + }; + if self.instant_table_ids.contains(&table_id) { instant_request.extend(requests); } else { @@ -71,9 +86,16 @@ impl<'a> RowToRegion<'a> { } fn get_table_id(&self, table_name: &str) -> Result { - self.table_name_to_ids + self.tables_info .get(table_name) - .cloned() + .map(|x| x.table_id()) + .context(TableNotFoundSnafu { table_name }) + } + + fn region_numbers(&self, table_name: &str) -> Result<&Vec> { + self.tables_info + .get(table_name) + .map(|x| &x.meta.region_numbers) .context(TableNotFoundSnafu { table_name }) } } diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index f2feef68ed..73a5a01892 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -80,35 +80,20 @@ impl<'a> SplitReadRowHelper<'a> { fn split_rows(mut self) -> Result> { let regions = self.split_to_regions()?; - let request_splits = if regions.len() == 1 { - // fast path, zero copy - regions - .into_keys() - .map(|region_number| { - let rows = std::mem::take(&mut self.rows); - let rows = Rows { - schema: self.schema.clone(), - rows, - }; - (region_number, rows) - }) - .collect::>() - } else { - regions - .into_iter() - .map(|(region_number, row_indexes)| { - let rows = row_indexes - .into_iter() - .map(|row_idx| std::mem::take(&mut self.rows[row_idx])) - .collect(); - let rows = Rows { - schema: self.schema.clone(), - rows, - }; - (region_number, rows) - }) - .collect::>() - }; + let request_splits = regions + .into_iter() + .map(|(region_number, row_indexes)| { + let rows = row_indexes + .into_iter() + .map(|row_idx| std::mem::take(&mut self.rows[row_idx])) + .collect(); + let rows = Rows { + schema: self.schema.clone(), + rows, + }; + (region_number, rows) + }) + .collect::>(); Ok(request_splits) }