perf(operator): reuse table info from table creation (#3945)

perf(operator): reuse table info from creating

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-05-15 17:18:17 +08:00
committed by GitHub
parent 97eb196699
commit 2a169f9364
2 changed files with 64 additions and 66 deletions

View File

@@ -108,15 +108,12 @@ impl Inserter {
});
validate_column_count_match(&requests)?;
self.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor)
let table_name_to_ids = self
.create_or_alter_tables_on_demand(&requests, &ctx, None, statement_executor)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, self.partition_manager.as_ref())
.convert(requests)
.await?;
let inserts = RowToRegion::new(
self.catalog_manager.as_ref(),
self.partition_manager.as_ref(),
&ctx,
)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
}
@@ -143,17 +140,17 @@ impl Inserter {
.await?;
// check and create logical tables
self.create_or_alter_tables_on_demand(
&requests,
&ctx,
Some(physical_table.to_string()),
statement_executor,
)
.await?;
let inserts =
RowToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, &ctx)
.convert(requests)
.await?;
let table_name_to_ids = self
.create_or_alter_tables_on_demand(
&requests,
&ctx,
Some(physical_table.to_string()),
statement_executor,
)
.await?;
let inserts = RowToRegion::new(table_name_to_ids, &self.partition_manager)
.convert(requests)
.await?;
self.do_request(inserts, &ctx).await
}
@@ -359,16 +356,20 @@ impl Inserter {
Ok(inserts)
}
// check if tables already exist:
// - if table does not exist, create table by inferred CreateExpr
// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
/// Creates or alter tables on demand:
/// - if table does not exist, create table by inferred CreateExpr
/// - if table exist, check if schema matches. If any new column found, alter table by inferred `AlterExpr`
///
/// Returns a mapping from table name to table id, where table name is the table name involved in the requests.
/// This mapping is used in the conversion of RowToRegion.
async fn create_or_alter_tables_on_demand(
&self,
requests: &RowInsertRequests,
ctx: &QueryContextRef,
on_physical_table: Option<String>,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<HashMap<String, TableId>> {
let mut table_name_to_ids = HashMap::with_capacity(requests.inserts.len());
let mut create_tables = vec![];
let mut alter_tables = vec![];
for req in &requests.inserts {
@@ -377,6 +378,9 @@ impl Inserter {
let table = self.get_table(catalog, schema, &req.table_name).await?;
match table {
Some(table) => {
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
// TODO(jeremy): alter in batch? (from `handle_metric_row_inserts`)
validate_request_with_table(req, &table)?;
let alter_expr = self.get_alter_table_expr_on_demand(req, table, ctx)?;
@@ -393,13 +397,19 @@ impl Inserter {
if let Some(on_physical_table) = on_physical_table {
if !create_tables.is_empty() {
// Creates logical tables in batch.
self.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;
let tables = self
.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.await?;
for table in tables {
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
}
if !alter_tables.is_empty() {
// Alter logical tables in batch.
@@ -409,7 +419,9 @@ impl Inserter {
}
} else {
for req in create_tables {
self.create_table(req, ctx, statement_executor).await?;
let table = self.create_table(req, ctx, statement_executor).await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
@@ -418,7 +430,7 @@ impl Inserter {
}
}
Ok(())
Ok(table_name_to_ids)
}
async fn create_physical_table_on_demand(
@@ -527,7 +539,7 @@ impl Inserter {
req: &RowInsertRequest,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<TableRef> {
let table_ref =
TableReference::full(ctx.current_catalog(), ctx.current_schema(), &req.table_name);
@@ -542,12 +554,12 @@ impl Inserter {
.await;
match res {
Ok(_) => {
Ok(table) => {
info!(
"Successfully created table {}.{}.{}",
table_ref.catalog, table_ref.schema, table_ref.table,
);
Ok(())
Ok(table)
}
Err(err) => {
error!(
@@ -565,7 +577,7 @@ impl Inserter {
ctx: &QueryContextRef,
physical_table: &str,
statement_executor: &StatementExecutor,
) -> Result<()> {
) -> Result<Vec<TableRef>> {
let create_table_exprs = create_tables
.iter()
.map(|req| {
@@ -593,9 +605,9 @@ impl Inserter {
.await;
match res {
Ok(_) => {
Ok(res) => {
info!("Successfully created logical tables");
Ok(())
Ok(res)
}
Err(err) => {
let failed_tables = create_table_exprs

View File

@@ -12,42 +12,37 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::region::InsertRequests as RegionInsertRequests;
use api::v1::RowInsertRequests;
use catalog::CatalogManager;
use partition::manager::PartitionRuleManager;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use table::TableRef;
use snafu::OptionExt;
use table::metadata::TableId;
use crate::error::{CatalogSnafu, Result, TableNotFoundSnafu};
use crate::error::{Result, TableNotFoundSnafu};
use crate::req_convert::common::partitioner::Partitioner;
pub struct RowToRegion<'a> {
catalog_manager: &'a dyn CatalogManager,
table_name_to_ids: HashMap<String, TableId>,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
}
impl<'a> RowToRegion<'a> {
pub fn new(
catalog_manager: &'a dyn CatalogManager,
table_name_to_ids: HashMap<String, TableId>,
partition_manager: &'a PartitionRuleManager,
ctx: &'a QueryContext,
) -> Self {
Self {
catalog_manager,
table_name_to_ids,
partition_manager,
ctx,
}
}
pub async fn convert(&self, requests: RowInsertRequests) -> Result<RegionInsertRequests> {
let mut region_request = Vec::with_capacity(requests.inserts.len());
for request in requests.inserts {
let table = self.get_table(&request.table_name).await?;
let table_id = table.table_info().table_id();
let table_id = self.get_table_id(&request.table_name)?;
let requests = Partitioner::new(self.partition_manager)
.partition_insert_requests(table_id, request.rows.unwrap_or_default())
.await?;
@@ -60,19 +55,10 @@ impl<'a> RowToRegion<'a> {
})
}
async fn get_table(&self, table_name: &str) -> Result<TableRef> {
let catalog_name = self.ctx.current_catalog();
let schema_name = self.ctx.current_schema();
self.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: common_catalog::format_full_table_name(
catalog_name,
schema_name,
table_name,
),
})
fn get_table_id(&self, table_name: &str) -> Result<TableId> {
self.table_name_to_ids
.get(table_name)
.cloned()
.context(TableNotFoundSnafu { table_name })
}
}