mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat: create tables in batch on prom write (#3246)
* feat: create tables in batch on prom write * feat: add logic table ids to log * fix: miss tabble ids in response
This commit is contained in:
@@ -252,20 +252,36 @@ impl Inserter {
|
||||
on_physical_table: Option<String>,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<()> {
|
||||
// TODO(jeremy): create and alter in batch? (from `handle_metric_row_inserts`)
|
||||
let mut create_tables = vec![];
|
||||
for req in &requests.inserts {
|
||||
let catalog = ctx.current_catalog();
|
||||
let schema = ctx.current_schema();
|
||||
let table = self.get_table(catalog, schema, &req.table_name).await?;
|
||||
match table {
|
||||
Some(table) => {
|
||||
// TODO(jeremy): alter in batch? (from `handle_metric_row_inserts`)
|
||||
validate_request_with_table(req, &table)?;
|
||||
self.alter_table_on_demand(req, table, ctx, statement_executor)
|
||||
.await?
|
||||
}
|
||||
None => {
|
||||
self.create_table(req, ctx, &on_physical_table, statement_executor)
|
||||
.await?
|
||||
create_tables.push(req);
|
||||
}
|
||||
}
|
||||
}
|
||||
if !create_tables.is_empty() {
|
||||
if let Some(on_physical_table) = on_physical_table {
|
||||
// Creates logical tables in batch.
|
||||
self.create_logical_tables(
|
||||
create_tables,
|
||||
ctx,
|
||||
&on_physical_table,
|
||||
statement_executor,
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
for req in create_tables {
|
||||
self.create_table(req, ctx, statement_executor).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -403,7 +419,6 @@ impl Inserter {
|
||||
&self,
|
||||
req: &RowInsertRequest,
|
||||
ctx: &QueryContextRef,
|
||||
on_physical_table: &Option<String>,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<()> {
|
||||
let table_ref =
|
||||
@@ -412,15 +427,7 @@ impl Inserter {
|
||||
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
|
||||
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
|
||||
|
||||
if let Some(physical_table) = on_physical_table {
|
||||
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
|
||||
create_table_expr.table_options.insert(
|
||||
LOGICAL_TABLE_METADATA_KEY.to_string(),
|
||||
physical_table.clone(),
|
||||
);
|
||||
}
|
||||
|
||||
info!("Table `{table_ref}` does not exist, try creating table",);
|
||||
info!("Table `{table_ref}` does not exist, try creating table");
|
||||
|
||||
// TODO(weny): multiple regions table.
|
||||
let res = statement_executor
|
||||
@@ -444,6 +451,65 @@ impl Inserter {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn create_logical_tables(
|
||||
&self,
|
||||
create_tables: Vec<&RowInsertRequest>,
|
||||
ctx: &QueryContextRef,
|
||||
physical_table: &str,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<()> {
|
||||
let create_table_exprs = create_tables
|
||||
.iter()
|
||||
.map(|req| {
|
||||
let table_ref = TableReference::full(
|
||||
ctx.current_catalog(),
|
||||
ctx.current_schema(),
|
||||
&req.table_name,
|
||||
);
|
||||
|
||||
info!("Logical table `{table_ref}` does not exist, try creating table");
|
||||
|
||||
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
|
||||
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;
|
||||
|
||||
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
|
||||
create_table_expr.table_options.insert(
|
||||
LOGICAL_TABLE_METADATA_KEY.to_string(),
|
||||
physical_table.to_string(),
|
||||
);
|
||||
|
||||
Ok(create_table_expr)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let res = statement_executor
|
||||
.create_logical_tables(&create_table_exprs)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(_) => {
|
||||
info!("Successfully created logical tables");
|
||||
Ok(())
|
||||
}
|
||||
Err(err) => {
|
||||
let failed_tables = create_table_exprs
|
||||
.into_iter()
|
||||
.map(|expr| {
|
||||
format!(
|
||||
"{}.{}.{}",
|
||||
expr.catalog_name, expr.schema_name, expr.table_name
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
error!(
|
||||
"Failed to create logical tables {:?}: {}",
|
||||
failed_tables, err
|
||||
);
|
||||
Err(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
|
||||
|
||||
Reference in New Issue
Block a user