mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
refactor: optimize out partition split insert requests (#5298)
* test: optimize out partition split insert requests if there is only one region * Now that the optimization for single region insert has been lifted up, the original "fast path" can be obsoleted. * resolve PR comments
This commit is contained in:
@@ -47,6 +47,7 @@ use store_api::metric_engine_consts::{
|
||||
};
|
||||
use store_api::mito_engine_options::{APPEND_MODE_KEY, MERGE_MODE_KEY};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
use table::metadata::TableInfoRef;
|
||||
use table::requests::{InsertRequest as TableInsertRequest, AUTO_CREATE_TABLE_KEY, TTL_KEY};
|
||||
use table::table_reference::TableReference;
|
||||
use table::TableRef;
|
||||
@@ -200,11 +201,11 @@ impl Inserter {
|
||||
});
|
||||
validate_column_count_match(&requests)?;
|
||||
|
||||
let (table_name_to_ids, instant_table_ids) = self
|
||||
let (tables_info, instant_table_ids) = self
|
||||
.create_or_alter_tables_on_demand(&requests, &ctx, create_type, statement_executor)
|
||||
.await?;
|
||||
let inserts = RowToRegion::new(
|
||||
table_name_to_ids,
|
||||
tables_info,
|
||||
instant_table_ids,
|
||||
self.partition_manager.as_ref(),
|
||||
)
|
||||
@@ -236,7 +237,7 @@ impl Inserter {
|
||||
.await?;
|
||||
|
||||
// check and create logical tables
|
||||
let (table_name_to_ids, instant_table_ids) = self
|
||||
let (tables_info, instant_table_ids) = self
|
||||
.create_or_alter_tables_on_demand(
|
||||
&requests,
|
||||
&ctx,
|
||||
@@ -244,13 +245,9 @@ impl Inserter {
|
||||
statement_executor,
|
||||
)
|
||||
.await?;
|
||||
let inserts = RowToRegion::new(
|
||||
table_name_to_ids,
|
||||
instant_table_ids,
|
||||
&self.partition_manager,
|
||||
)
|
||||
.convert(requests)
|
||||
.await?;
|
||||
let inserts = RowToRegion::new(tables_info, instant_table_ids, &self.partition_manager)
|
||||
.convert(requests)
|
||||
.await?;
|
||||
|
||||
self.do_request(inserts, &ctx).await
|
||||
}
|
||||
@@ -499,14 +496,14 @@ impl Inserter {
|
||||
ctx: &QueryContextRef,
|
||||
auto_create_table_type: AutoCreateTableType,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<(HashMap<String, TableId>, HashSet<TableId>)> {
|
||||
) -> Result<(HashMap<String, TableInfoRef>, HashSet<TableId>)> {
|
||||
let _timer = crate::metrics::CREATE_ALTER_ON_DEMAND
|
||||
.with_label_values(&[auto_create_table_type.as_str()])
|
||||
.start_timer();
|
||||
|
||||
let catalog = ctx.current_catalog();
|
||||
let schema = ctx.current_schema();
|
||||
let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len());
|
||||
let mut tables_info = HashMap::with_capacity(requests.inserts.len());
|
||||
// If `auto_create_table` hint is disabled, skip creating/altering tables.
|
||||
let auto_create_table_hint = ctx
|
||||
.extension(AUTO_CREATE_TABLE_KEY)
|
||||
@@ -535,9 +532,9 @@ impl Inserter {
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
|
||||
tables_info.insert(table_info.name.clone(), table_info);
|
||||
}
|
||||
return Ok((table_name_to_ids, instant_table_ids));
|
||||
return Ok((tables_info, instant_table_ids));
|
||||
}
|
||||
|
||||
let mut create_tables = vec![];
|
||||
@@ -551,7 +548,7 @@ impl Inserter {
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
|
||||
tables_info.insert(table_info.name.clone(), table_info);
|
||||
if let Some(alter_expr) =
|
||||
self.get_alter_table_expr_on_demand(req, &table, ctx)?
|
||||
{
|
||||
@@ -579,7 +576,7 @@ impl Inserter {
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
|
||||
tables_info.insert(table_info.name.clone(), table_info);
|
||||
}
|
||||
}
|
||||
if !alter_tables.is_empty() {
|
||||
@@ -602,7 +599,7 @@ impl Inserter {
|
||||
if table_info.is_ttl_instant_table() {
|
||||
instant_table_ids.insert(table_info.table_id());
|
||||
}
|
||||
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
|
||||
tables_info.insert(table_info.name.clone(), table_info);
|
||||
}
|
||||
for alter_expr in alter_tables.into_iter() {
|
||||
statement_executor
|
||||
@@ -612,7 +609,7 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
|
||||
Ok((table_name_to_ids, instant_table_ids))
|
||||
Ok((tables_info, instant_table_ids))
|
||||
}
|
||||
|
||||
async fn create_physical_table_on_demand(
|
||||
|
||||
@@ -13,30 +13,31 @@
|
||||
// limitations under the License.
|
||||
|
||||
use ahash::{HashMap, HashSet};
|
||||
use api::v1::region::InsertRequests as RegionInsertRequests;
|
||||
use api::v1::region::{InsertRequest, InsertRequests as RegionInsertRequests};
|
||||
use api::v1::RowInsertRequests;
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use snafu::OptionExt;
|
||||
use table::metadata::TableId;
|
||||
use store_api::storage::{RegionId, RegionNumber};
|
||||
use table::metadata::{TableId, TableInfoRef};
|
||||
|
||||
use crate::error::{Result, TableNotFoundSnafu};
|
||||
use crate::insert::InstantAndNormalInsertRequests;
|
||||
use crate::req_convert::common::partitioner::Partitioner;
|
||||
|
||||
pub struct RowToRegion<'a> {
|
||||
table_name_to_ids: HashMap<String, TableId>,
|
||||
tables_info: HashMap<String, TableInfoRef>,
|
||||
instant_table_ids: HashSet<TableId>,
|
||||
partition_manager: &'a PartitionRuleManager,
|
||||
}
|
||||
|
||||
impl<'a> RowToRegion<'a> {
|
||||
pub fn new(
|
||||
table_name_to_ids: HashMap<String, TableId>,
|
||||
tables_info: HashMap<String, TableInfoRef>,
|
||||
instant_table_ids: HashSet<TableId>,
|
||||
partition_manager: &'a PartitionRuleManager,
|
||||
) -> Self {
|
||||
Self {
|
||||
table_name_to_ids,
|
||||
tables_info,
|
||||
instant_table_ids,
|
||||
partition_manager,
|
||||
}
|
||||
@@ -49,10 +50,24 @@ impl<'a> RowToRegion<'a> {
|
||||
let mut region_request = Vec::with_capacity(requests.inserts.len());
|
||||
let mut instant_request = Vec::with_capacity(requests.inserts.len());
|
||||
for request in requests.inserts {
|
||||
let Some(rows) = request.rows else { continue };
|
||||
|
||||
let table_id = self.get_table_id(&request.table_name)?;
|
||||
let requests = Partitioner::new(self.partition_manager)
|
||||
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
|
||||
.await?;
|
||||
let region_numbers = self.region_numbers(&request.table_name)?;
|
||||
let requests = if let Some(region_id) = match region_numbers[..] {
|
||||
[singular] => Some(RegionId::new(table_id, singular)),
|
||||
_ => None,
|
||||
} {
|
||||
vec![InsertRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
rows: Some(rows),
|
||||
}]
|
||||
} else {
|
||||
Partitioner::new(self.partition_manager)
|
||||
.partition_insert_requests(table_id, rows)
|
||||
.await?
|
||||
};
|
||||
|
||||
if self.instant_table_ids.contains(&table_id) {
|
||||
instant_request.extend(requests);
|
||||
} else {
|
||||
@@ -71,9 +86,16 @@ impl<'a> RowToRegion<'a> {
|
||||
}
|
||||
|
||||
fn get_table_id(&self, table_name: &str) -> Result<TableId> {
|
||||
self.table_name_to_ids
|
||||
self.tables_info
|
||||
.get(table_name)
|
||||
.cloned()
|
||||
.map(|x| x.table_id())
|
||||
.context(TableNotFoundSnafu { table_name })
|
||||
}
|
||||
|
||||
fn region_numbers(&self, table_name: &str) -> Result<&Vec<RegionNumber>> {
|
||||
self.tables_info
|
||||
.get(table_name)
|
||||
.map(|x| &x.meta.region_numbers)
|
||||
.context(TableNotFoundSnafu { table_name })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,35 +80,20 @@ impl<'a> SplitReadRowHelper<'a> {
|
||||
|
||||
fn split_rows(mut self) -> Result<HashMap<RegionNumber, Rows>> {
|
||||
let regions = self.split_to_regions()?;
|
||||
let request_splits = if regions.len() == 1 {
|
||||
// fast path, zero copy
|
||||
regions
|
||||
.into_keys()
|
||||
.map(|region_number| {
|
||||
let rows = std::mem::take(&mut self.rows);
|
||||
let rows = Rows {
|
||||
schema: self.schema.clone(),
|
||||
rows,
|
||||
};
|
||||
(region_number, rows)
|
||||
})
|
||||
.collect::<HashMap<_, _>>()
|
||||
} else {
|
||||
regions
|
||||
.into_iter()
|
||||
.map(|(region_number, row_indexes)| {
|
||||
let rows = row_indexes
|
||||
.into_iter()
|
||||
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
|
||||
.collect();
|
||||
let rows = Rows {
|
||||
schema: self.schema.clone(),
|
||||
rows,
|
||||
};
|
||||
(region_number, rows)
|
||||
})
|
||||
.collect::<HashMap<_, _>>()
|
||||
};
|
||||
let request_splits = regions
|
||||
.into_iter()
|
||||
.map(|(region_number, row_indexes)| {
|
||||
let rows = row_indexes
|
||||
.into_iter()
|
||||
.map(|row_idx| std::mem::take(&mut self.rows[row_idx]))
|
||||
.collect();
|
||||
let rows = Rows {
|
||||
schema: self.schema.clone(),
|
||||
rows,
|
||||
};
|
||||
(region_number, rows)
|
||||
})
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
Ok(request_splits)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user