refactor: use SchemaHelper in Inserter

Remove the dependency to StatementExecutor from Inserter

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
evenyag
2025-06-19 22:08:38 +08:00
committed by Lei, HUANG
parent 14f3a4ab05
commit 2edd861ce9
11 changed files with 70 additions and 137 deletions

View File

@@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
use common_meta::cache::{LayeredCacheRegistryRef, TableFlownodeSetCacheRef, TableRouteCacheRef};
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::flow::FlowMetadataManagerRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::node_manager::{Flownode, NodeManagerRef};
use common_query::Output;
@@ -37,6 +37,7 @@ use greptime_proto::v1::flow::{flow_server, FlowRequest, FlowResponse, InsertReq
use itertools::Itertools;
use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::schema_helper::SchemaHelper;
use operator::statement::StatementExecutor;
use partition::manager::PartitionRuleManager;
use query::{QueryEngine, QueryEngineFactory};
@@ -546,8 +547,14 @@ impl FrontendInvoker {
name: TABLE_FLOWNODE_SET_CACHE_NAME,
})?;
let inserter = Arc::new(Inserter::new(
let schema_helper = SchemaHelper::new(
catalog_manager.clone(),
Arc::new(TableMetadataManager::new(kv_backend.clone())),
procedure_executor.clone(),
layered_cache_registry.clone(),
);
let inserter = Arc::new(Inserter::new(
schema_helper,
partition_manager.clone(),
node_manager.clone(),
table_flownode_cache,
@@ -588,7 +595,7 @@ impl FrontendInvoker {
.start_timer();
self.inserter
.handle_row_inserts(requests, ctx, &self.statement_executor, false, false)
.handle_row_inserts(requests, ctx, false, false)
.await
.map_err(BoxedError::new)
.context(common_frontend::error::ExternalSnafu)

View File

@@ -116,8 +116,7 @@ impl Frontend {
if let Some(t) = self.export_metrics_task.as_ref() {
if t.send_by_handler {
let inserter = self.instance.inserter().clone();
let statement_executor = self.instance.statement_executor().clone();
let handler = ExportMetricHandler::new_handler(inserter, statement_executor);
let handler = ExportMetricHandler::new_handler(inserter);
t.start(Some(handler)).context(error::StartServerSnafu)?
} else {
t.start(None).context(error::StartServerSnafu)?;

View File

@@ -30,6 +30,7 @@ use operator::flow::FlowServiceOperator;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
use operator::request::Requester;
use operator::schema_helper::SchemaHelper;
use operator::statement::{StatementExecutor, StatementExecutorRef};
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
@@ -130,8 +131,15 @@ impl FrontendBuilder {
name: TABLE_FLOWNODE_SET_CACHE_NAME,
})?;
let inserter = Arc::new(Inserter::new(
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let schema_helper = SchemaHelper::new(
self.catalog_manager.clone(),
table_metadata_manager.clone(),
self.procedure_executor.clone(),
local_cache_invalidator.clone(),
);
let inserter = Arc::new(Inserter::new(
schema_helper,
partition_manager.clone(),
node_manager.clone(),
table_flownode_cache,
@@ -176,7 +184,7 @@ impl FrontendBuilder {
self.catalog_manager.clone(),
query_engine.clone(),
self.procedure_executor,
kv_backend.clone(),
kv_backend,
local_cache_invalidator,
inserter.clone(),
table_route_cache,
@@ -219,7 +227,7 @@ impl FrontendBuilder {
plugins,
inserter,
deleter,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
table_metadata_manager,
slow_query_recorder,
limiter,
process_manager,

View File

@@ -408,7 +408,7 @@ impl Instance {
ctx: QueryContextRef,
) -> Result<Output> {
self.inserter
.handle_column_inserts(requests, ctx, self.statement_executor.as_ref())
.handle_column_inserts(requests, ctx)
.await
.context(TableOperationSnafu)
}
@@ -422,13 +422,7 @@ impl Instance {
is_single_value: bool,
) -> Result<Output> {
self.inserter
.handle_row_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
accommodate_existing_schema,
is_single_value,
)
.handle_row_inserts(requests, ctx, accommodate_existing_schema, is_single_value)
.await
.context(TableOperationSnafu)
}
@@ -441,10 +435,7 @@ impl Instance {
) -> Result<Output> {
self.inserter
.handle_last_non_null_inserts(
requests,
ctx,
self.statement_executor.as_ref(),
true,
requests, ctx, true,
// Influx protocol may writes multiple fields (values).
false,
)
@@ -460,7 +451,7 @@ impl Instance {
physical_table: String,
) -> Result<Output> {
self.inserter
.handle_metric_row_inserts(requests, ctx, &self.statement_executor, physical_table)
.handle_metric_row_inserts(requests, ctx, physical_table)
.await
.context(TableOperationSnafu)
}

View File

@@ -135,7 +135,7 @@ impl Instance {
};
self.inserter
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
.handle_log_inserts(log, ctx)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)
@@ -157,7 +157,7 @@ impl Instance {
};
self.inserter
.handle_trace_inserts(rows, ctx, self.statement_executor.as_ref())
.handle_trace_inserts(rows, ctx)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)

View File

@@ -28,7 +28,6 @@ use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::{debug, tracing};
use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, InFlightWriteBytesExceededSnafu, Result as ServerResult};
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
@@ -271,18 +270,11 @@ impl PromStoreProtocolHandler for Instance {
/// so only implement `PromStoreProtocolHandler::write` method.
pub struct ExportMetricHandler {
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
}
impl ExportMetricHandler {
pub fn new_handler(
inserter: InserterRef,
statement_executor: Arc<StatementExecutor>,
) -> PromStoreProtocolHandlerRef {
Arc::new(Self {
inserter,
statement_executor,
})
pub fn new_handler(inserter: InserterRef) -> PromStoreProtocolHandlerRef {
Arc::new(Self { inserter })
}
}
@@ -295,12 +287,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
_: bool,
) -> ServerResult<Output> {
self.inserter
.handle_metric_row_inserts(
request,
ctx,
&self.statement_executor,
GREPTIME_PHYSICAL_TABLE.to_string(),
)
.handle_metric_row_inserts(request, ctx, GREPTIME_PHYSICAL_TABLE.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)

View File

@@ -233,7 +233,7 @@ impl SlowQueryEventHandler {
.into();
self.inserter
.handle_row_inserts(requests, query_ctx, &self.statement_executor, false, false)
.handle_row_inserts(requests, query_ctx, false, false)
.await
.context(TableOperationSnafu)?;

View File

@@ -25,7 +25,6 @@ use api::v1::{
AlterTableExpr, ColumnDataType, ColumnSchema, CreateTableExpr, InsertRequests,
RowInsertRequest, RowInsertRequests, SemanticType,
};
use catalog::CatalogManagerRef;
use client::{OutputData, OutputMeta};
use common_catalog::consts::{
default_engine, trace_services_table_name, PARENT_SPAN_ID_COLUMN, SERVICE_NAME_COLUMN,
@@ -63,7 +62,7 @@ use table::table_reference::TableReference;
use table::TableRef;
use crate::error::{
CatalogSnafu, ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
ColumnOptionsSnafu, CreatePartitionRulesSnafu, FindRegionLeaderSnafu,
InvalidInsertRequestSnafu, JoinTaskSnafu, RequestInsertsSnafu, Result, TableNotFoundSnafu,
};
use crate::expr_helper;
@@ -72,10 +71,10 @@ use crate::req_convert::common::preprocess_row_insert_requests;
use crate::req_convert::insert::{
fill_reqs_with_impure_default, ColumnToRow, RowToRegion, StatementToRegion, TableToRegion,
};
use crate::statement::StatementExecutor;
use crate::schema_helper::SchemaHelper;
pub struct Inserter {
catalog_manager: CatalogManagerRef,
schema_helper: SchemaHelper,
pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef,
pub(crate) table_flownode_set_cache: TableFlownodeSetCacheRef,
@@ -127,13 +126,13 @@ pub struct InstantAndNormalInsertRequests {
impl Inserter {
pub fn new(
catalog_manager: CatalogManagerRef,
schema_helper: SchemaHelper,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef,
) -> Self {
Self {
catalog_manager,
schema_helper,
partition_manager,
node_manager,
table_flownode_set_cache,
@@ -144,10 +143,9 @@ impl Inserter {
&self,
requests: InsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
let row_inserts = ColumnToRow::convert(requests)?;
self.handle_row_inserts(row_inserts, ctx, statement_executor, false, false)
self.handle_row_inserts(row_inserts, ctx, false, false)
.await
}
@@ -156,7 +154,6 @@ impl Inserter {
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
@@ -164,7 +161,6 @@ impl Inserter {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::Physical,
accommodate_existing_schema,
is_single_value,
@@ -177,12 +173,10 @@ impl Inserter {
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::Log,
false,
false,
@@ -194,12 +188,10 @@ impl Inserter {
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::Trace,
false,
false,
@@ -212,14 +204,12 @@ impl Inserter {
&self,
requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<Output> {
self.handle_row_inserts_with_create_type(
requests,
ctx,
statement_executor,
AutoCreateTableType::LastNonNull,
accommodate_existing_schema,
is_single_value,
@@ -232,7 +222,6 @@ impl Inserter {
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
accommodate_existing_schema: bool,
is_single_value: bool,
@@ -254,7 +243,6 @@ impl Inserter {
&mut requests,
&ctx,
create_type,
statement_executor,
accommodate_existing_schema,
is_single_value,
)
@@ -280,7 +268,6 @@ impl Inserter {
&self,
mut requests: RowInsertRequests,
ctx: QueryContextRef,
statement_executor: &StatementExecutor,
physical_table: String,
) -> Result<Output> {
// remove empty requests
@@ -293,7 +280,7 @@ impl Inserter {
validate_column_count_match(&requests)?;
// check and create physical table
self.create_physical_table_on_demand(&ctx, physical_table.clone(), statement_executor)
self.create_physical_table_on_demand(&ctx, physical_table.clone())
.await?;
// check and create logical tables
@@ -305,7 +292,6 @@ impl Inserter {
&mut requests,
&ctx,
AutoCreateTableType::Logical(physical_table.to_string()),
statement_executor,
true,
true,
)
@@ -350,10 +336,13 @@ impl Inserter {
insert: &Insert,
ctx: &QueryContextRef,
) -> Result<Output> {
let (inserts, table_info) =
StatementToRegion::new(self.catalog_manager.as_ref(), &self.partition_manager, ctx)
.convert(insert, ctx)
.await?;
let (inserts, table_info) = StatementToRegion::new(
self.schema_helper.catalog_manager().as_ref(),
&self.partition_manager,
ctx,
)
.convert(insert, ctx)
.await?;
let table_infos =
HashMap::from_iter([(table_info.table_id(), table_info.clone())].into_iter());
@@ -482,7 +471,6 @@ impl Inserter {
requests: &mut RowInsertRequests,
ctx: &QueryContextRef,
auto_create_table_type: AutoCreateTableType,
statement_executor: &StatementExecutor,
accommodate_existing_schema: bool,
is_single_value: bool,
) -> Result<CreateAlterTableResult> {
@@ -543,7 +531,7 @@ impl Inserter {
instant_table_ids.insert(table_info.table_id());
}
table_infos.insert(table_info.table_id(), table.table_info());
if let Some(alter_expr) = self.get_alter_table_expr_on_demand(
if let Some(alter_expr) = Self::get_alter_table_expr_on_demand(
req,
&table,
ctx,
@@ -565,9 +553,7 @@ impl Inserter {
AutoCreateTableType::Logical(_) => {
if !create_tables.is_empty() {
// Creates logical tables in batch.
let tables = self
.create_logical_tables(create_tables, ctx, statement_executor)
.await?;
let tables = self.create_logical_tables(create_tables, ctx).await?;
for table in tables {
let table_info = table.table_info();
@@ -579,7 +565,7 @@ impl Inserter {
}
if !alter_tables.is_empty() {
// Alter logical tables in batch.
statement_executor
self.schema_helper
.alter_logical_tables(alter_tables, ctx.clone())
.await?;
}
@@ -590,9 +576,7 @@ impl Inserter {
// note that auto create table shouldn't be ttl instant table
// for it's a very unexpected behavior and should be set by user explicitly
for create_table in create_tables {
let table = self
.create_physical_table(create_table, None, ctx, statement_executor)
.await?;
let table = self.create_physical_table(create_table, None, ctx).await?;
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
@@ -600,8 +584,8 @@ impl Inserter {
table_infos.insert(table_info.table_id(), table.table_info());
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
self.schema_helper
.alter_table_by_expr(alter_expr, ctx.clone())
.await?;
}
}
@@ -619,9 +603,7 @@ impl Inserter {
create_table
.table_options
.insert(APPEND_MODE_KEY.to_string(), "false".to_string());
let table = self
.create_physical_table(create_table, None, ctx, statement_executor)
.await?;
let table = self.create_physical_table(create_table, None, ctx).await?;
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
instant_table_ids.insert(table_info.table_id());
@@ -662,12 +644,7 @@ impl Inserter {
);
let table = self
.create_physical_table(
create_table,
Some(partitions),
ctx,
statement_executor,
)
.create_physical_table(create_table, Some(partitions), ctx)
.await?;
let table_info = table.table_info();
if table_info.is_ttl_instant_table() {
@@ -677,8 +654,8 @@ impl Inserter {
}
}
for alter_expr in alter_tables.into_iter() {
statement_executor
.alter_table_inner(alter_expr, ctx.clone())
self.schema_helper
.alter_table_by_expr(alter_expr, ctx.clone())
.await?;
}
}
@@ -694,7 +671,6 @@ impl Inserter {
&self,
ctx: &QueryContextRef,
physical_table: String,
statement_executor: &StatementExecutor,
) -> Result<()> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
@@ -737,8 +713,9 @@ impl Inserter {
.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
// create physical table
let res = statement_executor
.create_table_inner(create_table_expr, None, ctx.clone())
let res = self
.schema_helper
.create_table_by_expr(create_table_expr, None, ctx.clone())
.await;
match res {
@@ -759,10 +736,7 @@ impl Inserter {
schema: &str,
table: &str,
) -> Result<Option<TableRef>> {
self.catalog_manager
.table(catalog, schema, table, None)
.await
.context(CatalogSnafu)
self.schema_helper.get_table(catalog, schema, table).await
}
fn get_create_table_expr_on_demand(
@@ -830,7 +804,6 @@ impl Inserter {
/// When `accommodate_existing_schema` is true and `is_single_value` is true, it also consider fields when modifying the
/// input `req`.
fn get_alter_table_expr_on_demand(
&self,
req: &mut RowInsertRequest,
table: &TableRef,
ctx: &QueryContextRef,
@@ -918,7 +891,6 @@ impl Inserter {
mut create_table_expr: CreateTableExpr,
partitions: Option<Partitions>,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<TableRef> {
{
let table_ref = TableReference::full(
@@ -929,8 +901,9 @@ impl Inserter {
info!("Table `{table_ref}` does not exist, try creating table");
}
let res = statement_executor
.create_table_inner(&mut create_table_expr, partitions, ctx.clone())
let res = self
.schema_helper
.create_table_by_expr(&mut create_table_expr, partitions, ctx.clone())
.await;
let table_ref = TableReference::full(
@@ -958,9 +931,9 @@ impl Inserter {
&self,
create_table_exprs: Vec<CreateTableExpr>,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Vec<TableRef>> {
let res = statement_executor
let res = self
.schema_helper
.create_logical_tables(&create_table_exprs, ctx.clone())
.await;
@@ -1145,19 +1118,14 @@ mod tests {
use api::v1::{ColumnSchema as GrpcColumnSchema, RowInsertRequest, Rows, SemanticType, Value};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::cache::new_table_flownode_set_cache;
use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
use common_meta::test_util::MockDatanodeManager;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use moka::future::Cache;
use session::context::QueryContext;
use table::dist_table::DummyDataSource;
use table::metadata::{TableInfoBuilder, TableMetaBuilder, TableType};
use table::TableRef;
use super::*;
use crate::tests::{create_partition_rule_manager, prepare_mocked_backend};
fn make_table_ref_with_schema(ts_name: &str, field_name: &str) -> TableRef {
let schema = datatypes::schema::SchemaBuilder::try_from_columns(vec![
@@ -1237,20 +1205,8 @@ mod tests {
DEFAULT_SCHEMA_NAME,
));
let kv_backend = prepare_mocked_backend().await;
let inserter = Inserter::new(
catalog::memory::MemoryCatalogManager::new(),
create_partition_rule_manager(kv_backend.clone()).await,
Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)),
Arc::new(new_table_flownode_set_cache(
String::new(),
Cache::new(100),
kv_backend.clone(),
)),
);
let alter_expr = inserter
.get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true)
.unwrap();
let alter_expr =
Inserter::get_alter_table_expr_on_demand(&mut req, &table, &ctx, true, true).unwrap();
assert!(alter_expr.is_none());
// The request's schema should have updated names for timestamp and field columns

View File

@@ -62,7 +62,7 @@ lazy_static! {
static ref NAME_PATTERN_REG: Regex = Regex::new(&format!("^{NAME_PATTERN}$")).unwrap();
}
/// Helper to query and manipulate table schemas.
/// Helper to query and manipulate (CREATE/ALTER) table schemas.
#[derive(Clone)]
pub struct SchemaHelper {
catalog_manager: CatalogManagerRef,

View File

@@ -88,7 +88,6 @@ impl PipelineOperator {
catalog.to_string(),
Arc::new(PipelineTable::new(
self.inserter.clone(),
self.statement_executor.clone(),
table,
self.query_engine.clone(),
)),

View File

@@ -30,7 +30,6 @@ use datatypes::timestamp::TimestampNanosecond;
use datatypes::vectors::{StringVector, TimestampNanosecondVector, Vector};
use itertools::Itertools;
use operator::insert::InserterRef;
use operator::statement::StatementExecutorRef;
use query::dataframe::DataFrame;
use query::QueryEngineRef;
use session::context::{QueryContextBuilder, QueryContextRef};
@@ -61,7 +60,6 @@ pub(crate) const EMPTY_SCHEMA_NAME: &str = "";
/// Every catalog has its own pipeline table.
pub struct PipelineTable {
inserter: InserterRef,
statement_executor: StatementExecutorRef,
table: TableRef,
query_engine: QueryEngineRef,
cache: PipelineCache,
@@ -69,15 +67,9 @@ pub struct PipelineTable {
impl PipelineTable {
/// Create a new PipelineTable.
pub fn new(
inserter: InserterRef,
statement_executor: StatementExecutorRef,
table: TableRef,
query_engine: QueryEngineRef,
) -> Self {
pub fn new(inserter: InserterRef, table: TableRef, query_engine: QueryEngineRef) -> Self {
Self {
inserter,
statement_executor,
table,
query_engine,
cache: PipelineCache::new(),
@@ -232,13 +224,7 @@ impl PipelineTable {
let output = self
.inserter
.handle_row_inserts(
requests,
Self::query_ctx(&table_info),
&self.statement_executor,
false,
false,
)
.handle_row_inserts(requests, Self::query_ctx(&table_info), false, false)
.await
.context(InsertPipelineSnafu)?;