mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-17 13:30:38 +00:00
refactor: unify the styling in create_or_alter_tables_on_demand (#4756)
* refactor: refactor `create_or_alter_tables_on_demand` * chore: apply suggestions from CR
This commit is contained in:
@@ -505,28 +505,25 @@ impl Inserter {
|
||||
let table_info = table.table_info();
|
||||
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
|
||||
if let Some(alter_expr) =
|
||||
self.get_alter_table_expr_on_demand(req, table, ctx)?
|
||||
self.get_alter_table_expr_on_demand(req, &table, ctx)?
|
||||
{
|
||||
alter_tables.push(alter_expr);
|
||||
}
|
||||
}
|
||||
None => {
|
||||
create_tables.push(req);
|
||||
let create_expr =
|
||||
self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
|
||||
create_tables.push(create_expr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match auto_create_table_type {
|
||||
AutoCreateTableType::Logical(on_physical_table) => {
|
||||
AutoCreateTableType::Logical(_) => {
|
||||
if !create_tables.is_empty() {
|
||||
// Creates logical tables in batch.
|
||||
let tables = self
|
||||
.create_logical_tables(
|
||||
create_tables,
|
||||
ctx,
|
||||
&on_physical_table,
|
||||
statement_executor,
|
||||
)
|
||||
.create_logical_tables(create_tables, ctx, statement_executor)
|
||||
.await?;
|
||||
|
||||
for table in tables {
|
||||
@@ -544,14 +541,9 @@ impl Inserter {
|
||||
AutoCreateTableType::Physical
|
||||
| AutoCreateTableType::Log
|
||||
| AutoCreateTableType::LastNonNull => {
|
||||
for req in create_tables {
|
||||
for create_table in create_tables {
|
||||
let table = self
|
||||
.create_non_logical_table(
|
||||
req,
|
||||
ctx,
|
||||
statement_executor,
|
||||
auto_create_table_type.clone(),
|
||||
)
|
||||
.create_physical_table(create_table, ctx, statement_executor)
|
||||
.await?;
|
||||
let table_info = table.table_info();
|
||||
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
|
||||
@@ -605,7 +597,8 @@ impl Inserter {
|
||||
options: None,
|
||||
},
|
||||
];
|
||||
let create_table_expr = &mut build_create_table_expr(&table_reference, &default_schema)?;
|
||||
let create_table_expr =
|
||||
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;
|
||||
|
||||
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
|
||||
create_table_expr
|
||||
@@ -641,10 +634,61 @@ impl Inserter {
|
||||
.context(CatalogSnafu)
|
||||
}
|
||||
|
||||
fn get_create_table_expr_on_demand(
|
||||
&self,
|
||||
req: &RowInsertRequest,
|
||||
create_type: &AutoCreateTableType,
|
||||
ctx: &QueryContextRef,
|
||||
) -> Result<CreateTableExpr> {
|
||||
let mut table_options = Vec::with_capacity(4);
|
||||
if let Some(ttl) = ctx.extension(TTL_KEY) {
|
||||
table_options.push((TTL_KEY, ttl));
|
||||
}
|
||||
|
||||
let mut engine_name = default_engine();
|
||||
match create_type {
|
||||
AutoCreateTableType::Logical(physical_table) => {
|
||||
engine_name = METRIC_ENGINE_NAME;
|
||||
table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
|
||||
}
|
||||
AutoCreateTableType::Physical => {
|
||||
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
|
||||
table_options.push((APPEND_MODE_KEY, append_mode));
|
||||
}
|
||||
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
|
||||
table_options.push((MERGE_MODE_KEY, merge_mode));
|
||||
}
|
||||
}
|
||||
// Set append_mode to true for log table.
|
||||
// because log tables should keep rows with the same ts and tags.
|
||||
AutoCreateTableType::Log => {
|
||||
table_options.push((APPEND_MODE_KEY, "true"));
|
||||
}
|
||||
AutoCreateTableType::LastNonNull => {
|
||||
table_options.push((MERGE_MODE_KEY, "last_non_null"));
|
||||
}
|
||||
}
|
||||
|
||||
let schema = ctx.current_schema();
|
||||
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
|
||||
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
|
||||
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
|
||||
let mut create_table_expr =
|
||||
build_create_table_expr(&table_ref, request_schema, engine_name)?;
|
||||
info!("Table `{table_ref}` does not exist, try creating table");
|
||||
for (k, v) in table_options {
|
||||
create_table_expr
|
||||
.table_options
|
||||
.insert(k.to_string(), v.to_string());
|
||||
}
|
||||
|
||||
Ok(create_table_expr)
|
||||
}
|
||||
|
||||
fn get_alter_table_expr_on_demand(
|
||||
&self,
|
||||
req: &RowInsertRequest,
|
||||
table: TableRef,
|
||||
table: &TableRef,
|
||||
ctx: &QueryContextRef,
|
||||
) -> Result<Option<AlterExpr>> {
|
||||
let catalog_name = ctx.current_catalog();
|
||||
@@ -667,76 +711,37 @@ impl Inserter {
|
||||
}))
|
||||
}
|
||||
|
||||
/// Creates a non-logical table by create type.
|
||||
/// # Panics
|
||||
/// Panics if `create_type` is `AutoCreateTableType::Logical`.
|
||||
async fn create_non_logical_table(
|
||||
&self,
|
||||
req: &RowInsertRequest,
|
||||
ctx: &QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
create_type: AutoCreateTableType,
|
||||
) -> Result<TableRef> {
|
||||
let mut hint_options = vec![];
|
||||
|
||||
if let Some(ttl) = ctx.extension(TTL_KEY) {
|
||||
hint_options.push((TTL_KEY, ttl));
|
||||
}
|
||||
|
||||
match create_type {
|
||||
AutoCreateTableType::Logical(_) => unreachable!(),
|
||||
AutoCreateTableType::Physical => {
|
||||
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
|
||||
hint_options.push((APPEND_MODE_KEY, append_mode));
|
||||
}
|
||||
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
|
||||
hint_options.push((MERGE_MODE_KEY, merge_mode));
|
||||
}
|
||||
}
|
||||
// Set append_mode to true for log table.
|
||||
// because log tables should keep rows with the same ts and tags.
|
||||
AutoCreateTableType::Log => {
|
||||
hint_options.push((APPEND_MODE_KEY, "true"));
|
||||
}
|
||||
AutoCreateTableType::LastNonNull => {
|
||||
hint_options.push((MERGE_MODE_KEY, "last_non_null"));
|
||||
}
|
||||
}
|
||||
let options: &[(&str, &str)] = hint_options.as_slice();
|
||||
|
||||
self.create_table_with_options(req, ctx, statement_executor, options)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Creates a table with options.
|
||||
async fn create_table_with_options(
|
||||
async fn create_physical_table(
|
||||
&self,
|
||||
req: &RowInsertRequest,
|
||||
mut create_table_expr: CreateTableExpr,
|
||||
ctx: &QueryContextRef,
|
||||
statement_executor: &StatementExecutor,
|
||||
options: &[(&str, &str)],
|
||||
) -> Result<TableRef> {
|
||||
let schema = ctx.current_schema();
|
||||
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
|
||||
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
|
||||
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
|
||||
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
|
||||
{
|
||||
let table_ref = TableReference::full(
|
||||
&create_table_expr.catalog_name,
|
||||
&create_table_expr.schema_name,
|
||||
&create_table_expr.table_name,
|
||||
);
|
||||
|
||||
info!("Table `{table_ref}` does not exist, try creating table");
|
||||
for (k, v) in options {
|
||||
create_table_expr
|
||||
.table_options
|
||||
.insert(k.to_string(), v.to_string());
|
||||
info!("Table `{table_ref}` does not exist, try creating table");
|
||||
}
|
||||
let res = statement_executor
|
||||
.create_table_inner(create_table_expr, None, ctx.clone())
|
||||
.create_table_inner(&mut create_table_expr, None, ctx.clone())
|
||||
.await;
|
||||
|
||||
let table_ref = TableReference::full(
|
||||
&create_table_expr.catalog_name,
|
||||
&create_table_expr.schema_name,
|
||||
&create_table_expr.table_name,
|
||||
);
|
||||
|
||||
match res {
|
||||
Ok(table) => {
|
||||
info!(
|
||||
"Successfully created table {} with options: {:?}",
|
||||
table_ref, options
|
||||
table_ref, create_table_expr.table_options,
|
||||
);
|
||||
Ok(table)
|
||||
}
|
||||
@@ -749,30 +754,12 @@ impl Inserter {
|
||||
|
||||
async fn create_logical_tables(
|
||||
&self,
|
||||
create_tables: Vec<&RowInsertRequest>,
|
||||
create_table_exprs: Vec<CreateTableExpr>,
|
||||
ctx: &QueryContextRef,
|
||||
physical_table: &str,
|
||||
statement_executor: &StatementExecutor,
|
||||
) -> Result<Vec<TableRef>> {
|
||||
let catalog_name = ctx.current_catalog();
|
||||
let schema_name = ctx.current_schema();
|
||||
let create_table_exprs = create_tables
|
||||
.iter()
|
||||
.map(|req| {
|
||||
let table_ref = TableReference::full(catalog_name, &schema_name, &req.table_name);
|
||||
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
|
||||
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;
|
||||
|
||||
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
|
||||
create_table_expr.table_options.insert(
|
||||
LOGICAL_TABLE_METADATA_KEY.to_string(),
|
||||
physical_table.to_string(),
|
||||
);
|
||||
|
||||
Ok(create_table_expr)
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let res = statement_executor
|
||||
.create_logical_tables(catalog_name, &schema_name, &create_table_exprs, ctx.clone())
|
||||
.await;
|
||||
@@ -827,6 +814,7 @@ fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
|
||||
fn build_create_table_expr(
|
||||
table: &TableReference,
|
||||
request_schema: &[ColumnSchema],
|
||||
engine: &str,
|
||||
) -> Result<CreateTableExpr> {
|
||||
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, default_engine())
|
||||
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user