mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-21 07:20:41 +00:00
feat: re-support query engine execute dml (#2484)
* feat: re-support query engine execute dml Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: remove region_number in InsertRequest Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> * chore: add doc comments Signed-off-by: Zhenchi <zhongzc_arch@outlook.com> --------- Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
@@ -261,6 +261,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
|
||||
let state = Arc::new(QueryEngineState::new(
|
||||
catalog_list,
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
plugins.clone(),
|
||||
));
|
||||
|
||||
@@ -338,6 +338,7 @@ impl DatanodeBuilder {
|
||||
// query engine in datanode only executes plan with resolved table source.
|
||||
MemoryCatalogManager::with_default_setup(),
|
||||
None,
|
||||
None,
|
||||
false,
|
||||
plugins,
|
||||
);
|
||||
|
||||
@@ -53,7 +53,7 @@ use meta_client::client::{MetaClient, MetaClientBuilder};
|
||||
use operator::delete::{Deleter, DeleterRef};
|
||||
use operator::insert::{Inserter, InserterRef};
|
||||
use operator::statement::StatementExecutor;
|
||||
use operator::table::table_idents_to_full_name;
|
||||
use operator::table::{table_idents_to_full_name, TableMutationOperator};
|
||||
use partition::manager::PartitionRuleManager;
|
||||
use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
|
||||
use query::plan::LogicalPlan;
|
||||
@@ -163,14 +163,6 @@ impl Instance {
|
||||
catalog_manager.datanode_manager().clone(),
|
||||
);
|
||||
|
||||
let query_engine = QueryEngineFactory::new_with_plugins(
|
||||
catalog_manager.clone(),
|
||||
Some(region_query_handler.clone()),
|
||||
true,
|
||||
plugins.clone(),
|
||||
)
|
||||
.query_engine();
|
||||
|
||||
let inserter = Arc::new(Inserter::new(
|
||||
catalog_manager.clone(),
|
||||
partition_manager.clone(),
|
||||
@@ -182,6 +174,20 @@ impl Instance {
|
||||
datanode_clients,
|
||||
));
|
||||
|
||||
let table_mutation_handler = Arc::new(TableMutationOperator::new(
|
||||
inserter.clone(),
|
||||
deleter.clone(),
|
||||
));
|
||||
|
||||
let query_engine = QueryEngineFactory::new_with_plugins(
|
||||
catalog_manager.clone(),
|
||||
Some(region_query_handler.clone()),
|
||||
Some(table_mutation_handler),
|
||||
true,
|
||||
plugins.clone(),
|
||||
)
|
||||
.query_engine();
|
||||
|
||||
let statement_executor = Arc::new(StatementExecutor::new(
|
||||
catalog_manager.clone(),
|
||||
query_engine.clone(),
|
||||
@@ -189,7 +195,6 @@ impl Instance {
|
||||
meta_backend.clone(),
|
||||
catalog_manager.clone(),
|
||||
inserter.clone(),
|
||||
deleter.clone(),
|
||||
));
|
||||
|
||||
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
|
||||
@@ -301,9 +306,25 @@ impl Instance {
|
||||
let region_query_handler =
|
||||
FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone());
|
||||
|
||||
let inserter = Arc::new(Inserter::new(
|
||||
catalog_manager.clone(),
|
||||
partition_manager.clone(),
|
||||
datanode_manager.clone(),
|
||||
));
|
||||
let deleter = Arc::new(Deleter::new(
|
||||
catalog_manager.clone(),
|
||||
partition_manager,
|
||||
datanode_manager.clone(),
|
||||
));
|
||||
let table_mutation_handler = Arc::new(TableMutationOperator::new(
|
||||
inserter.clone(),
|
||||
deleter.clone(),
|
||||
));
|
||||
|
||||
let query_engine = QueryEngineFactory::new_with_plugins(
|
||||
catalog_manager.clone(),
|
||||
Some(region_query_handler),
|
||||
Some(table_mutation_handler),
|
||||
true,
|
||||
plugins.clone(),
|
||||
)
|
||||
@@ -317,25 +338,12 @@ impl Instance {
|
||||
let cache_invalidator = Arc::new(DummyCacheInvalidator);
|
||||
let ddl_executor = Arc::new(DdlManager::new(
|
||||
procedure_manager,
|
||||
datanode_manager.clone(),
|
||||
datanode_manager,
|
||||
cache_invalidator.clone(),
|
||||
table_metadata_manager.clone(),
|
||||
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
|
||||
));
|
||||
|
||||
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));
|
||||
|
||||
let inserter = Arc::new(Inserter::new(
|
||||
catalog_manager.clone(),
|
||||
partition_manager.clone(),
|
||||
datanode_manager.clone(),
|
||||
));
|
||||
let deleter = Arc::new(Deleter::new(
|
||||
catalog_manager.clone(),
|
||||
partition_manager,
|
||||
datanode_manager,
|
||||
));
|
||||
|
||||
let statement_executor = Arc::new(StatementExecutor::new(
|
||||
catalog_manager.clone(),
|
||||
query_engine.clone(),
|
||||
@@ -343,7 +351,6 @@ impl Instance {
|
||||
kv_backend.clone(),
|
||||
cache_invalidator,
|
||||
inserter.clone(),
|
||||
deleter.clone(),
|
||||
));
|
||||
|
||||
Ok(Instance {
|
||||
|
||||
@@ -98,7 +98,7 @@ impl Deleter {
|
||||
&self,
|
||||
request: TableDeleteRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
) -> Result<usize> {
|
||||
let catalog = request.catalog_name.as_str();
|
||||
let schema = request.schema_name.as_str();
|
||||
let table = request.table_name.as_str();
|
||||
@@ -108,7 +108,9 @@ impl Deleter {
|
||||
let deletes = TableToRegion::new(&table_info, &self.partition_manager)
|
||||
.convert(request)
|
||||
.await?;
|
||||
self.do_request(deletes, ctx.trace_id(), 0).await
|
||||
|
||||
let affected_rows = self.do_request(deletes, ctx.trace_id(), 0).await?;
|
||||
Ok(affected_rows as _)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -145,7 +145,6 @@ mod tests {
|
||||
schema_name: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table_name: "table_1".to_string(),
|
||||
columns_values: HashMap::from([("a".to_string(), vector)]),
|
||||
region_number: 0,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,7 +48,6 @@ use table::engine::TableReference;
|
||||
use table::requests::{CopyDatabaseRequest, CopyDirection, CopyTableRequest};
|
||||
use table::TableRef;
|
||||
|
||||
use crate::delete::DeleterRef;
|
||||
use crate::error::{
|
||||
self, CatalogSnafu, ExecLogicalPlanSnafu, ExternalSnafu, InvalidSqlSnafu, PlanStatementSnafu,
|
||||
Result, TableNotFoundSnafu,
|
||||
@@ -66,7 +65,6 @@ pub struct StatementExecutor {
|
||||
partition_manager: PartitionRuleManagerRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
inserter: InserterRef,
|
||||
deleter: DeleterRef,
|
||||
}
|
||||
|
||||
impl StatementExecutor {
|
||||
@@ -77,7 +75,6 @@ impl StatementExecutor {
|
||||
kv_backend: KvBackendRef,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
inserter: InserterRef,
|
||||
deleter: DeleterRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
catalog_manager,
|
||||
@@ -87,7 +84,6 @@ impl StatementExecutor {
|
||||
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)),
|
||||
cache_invalidator,
|
||||
inserter,
|
||||
deleter,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,14 +100,12 @@ impl StatementExecutor {
|
||||
|
||||
pub async fn execute_sql(&self, stmt: Statement, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
match stmt {
|
||||
Statement::Query(_) | Statement::Explain(_) => {
|
||||
Statement::Query(_) | Statement::Explain(_) | Statement::Delete(_) => {
|
||||
self.plan_exec(QueryStatement::Sql(stmt), query_ctx).await
|
||||
}
|
||||
|
||||
Statement::Insert(insert) => self.insert(insert, query_ctx).await,
|
||||
|
||||
Statement::Delete(delete) => self.delete(delete, query_ctx).await,
|
||||
|
||||
Statement::Tql(tql) => self.execute_tql(tql, query_ctx).await,
|
||||
|
||||
Statement::DescribeTable(stmt) => self.describe_table(stmt, query_ctx).await,
|
||||
|
||||
@@ -330,7 +330,6 @@ impl StatementExecutor {
|
||||
schema_name: req.schema_name.to_string(),
|
||||
table_name: req.table_name.to_string(),
|
||||
columns_values,
|
||||
region_number: 0,
|
||||
},
|
||||
query_ctx.clone(),
|
||||
));
|
||||
|
||||
@@ -12,30 +12,14 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use common_query::Output;
|
||||
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
|
||||
use datafusion_expr::{DmlStatement, LogicalPlan as DfLogicalPlan, WriteOp};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures_util::StreamExt;
|
||||
use query::parser::QueryStatement;
|
||||
use query::plan::LogicalPlan;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use sql::statements::delete::Delete;
|
||||
use sql::statements::insert::Insert;
|
||||
use sql::statements::statement::Statement;
|
||||
use table::engine::TableReference;
|
||||
use table::metadata::TableInfoRef;
|
||||
use table::requests::{DeleteRequest, InsertRequest};
|
||||
use table::TableRef;
|
||||
|
||||
use super::StatementExecutor;
|
||||
use crate::error::{
|
||||
BuildColumnVectorsSnafu, ExecLogicalPlanSnafu, MissingTimeIndexColumnSnafu,
|
||||
ReadRecordBatchSnafu, Result, UnexpectedSnafu,
|
||||
};
|
||||
use crate::error::Result;
|
||||
|
||||
impl StatementExecutor {
|
||||
pub async fn insert(&self, insert: Box<Insert>, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
@@ -45,178 +29,9 @@ impl StatementExecutor {
|
||||
.handle_statement_insert(insert.as_ref(), &query_ctx)
|
||||
.await
|
||||
} else {
|
||||
// Slow path: insert with subquery. Execute the subquery first, via query engine. Then
|
||||
// insert the results by sending insert requests.
|
||||
|
||||
// 1. Plan the whole insert statement into a logical plan, then a wrong insert statement
|
||||
// will be caught and a plan error will be returned.
|
||||
// Slow path: insert with subquery. Execute using query engine.
|
||||
let statement = QueryStatement::Sql(Statement::Insert(insert));
|
||||
let logical_plan = self.plan(statement, query_ctx.clone()).await?;
|
||||
|
||||
// 2. Execute the subquery, get the results as a record batch stream.
|
||||
let dml_statement = extract_dml_statement(logical_plan)?;
|
||||
ensure!(
|
||||
dml_statement.op == WriteOp::Insert,
|
||||
UnexpectedSnafu {
|
||||
violated: "expected an INSERT plan"
|
||||
}
|
||||
);
|
||||
let mut stream = self
|
||||
.execute_dml_subquery(&dml_statement, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
// 3. Send insert requests.
|
||||
let mut affected_rows = 0;
|
||||
let table = self.get_table_from_dml(dml_statement, &query_ctx).await?;
|
||||
let table_info = table.table_info();
|
||||
while let Some(batch) = stream.next().await {
|
||||
let record_batch = batch.context(ReadRecordBatchSnafu)?;
|
||||
let insert_request =
|
||||
build_insert_request(record_batch, table.schema(), &table_info)?;
|
||||
affected_rows += self
|
||||
.inserter
|
||||
.handle_table_insert(insert_request, query_ctx.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Output::AffectedRows(affected_rows))
|
||||
self.plan_exec(statement, query_ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn delete(&self, delete: Box<Delete>, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
// 1. Plan the whole delete statement into a logical plan, then a wrong delete statement
|
||||
// will be caught and a plan error will be returned.
|
||||
let statement = QueryStatement::Sql(Statement::Delete(delete));
|
||||
let logical_plan = self.plan(statement, query_ctx.clone()).await?;
|
||||
|
||||
// 2. Execute the subquery, get the results as a record batch stream.
|
||||
let dml_statement = extract_dml_statement(logical_plan)?;
|
||||
ensure!(
|
||||
dml_statement.op == WriteOp::Delete,
|
||||
UnexpectedSnafu {
|
||||
violated: "expected a DELETE plan"
|
||||
}
|
||||
);
|
||||
let mut stream = self
|
||||
.execute_dml_subquery(&dml_statement, query_ctx.clone())
|
||||
.await?;
|
||||
|
||||
// 3. Send delete requests.
|
||||
let mut affected_rows = 0;
|
||||
let table = self.get_table_from_dml(dml_statement, &query_ctx).await?;
|
||||
let table_info = table.table_info();
|
||||
while let Some(batch) = stream.next().await {
|
||||
let record_batch = batch.context(ReadRecordBatchSnafu)?;
|
||||
let request = build_delete_request(record_batch, table.schema(), &table_info)?;
|
||||
affected_rows += self
|
||||
.deleter
|
||||
.handle_table_delete(request, query_ctx.clone())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Output::AffectedRows(affected_rows as _))
|
||||
}
|
||||
|
||||
async fn execute_dml_subquery(
|
||||
&self,
|
||||
dml_statement: &DmlStatement,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
let subquery_plan = LogicalPlan::from(dml_statement.input.as_ref().clone());
|
||||
let output = self
|
||||
.query_engine
|
||||
.execute(subquery_plan, query_ctx)
|
||||
.await
|
||||
.context(ExecLogicalPlanSnafu)?;
|
||||
match output {
|
||||
Output::Stream(stream) => Ok(stream),
|
||||
Output::RecordBatches(record_batches) => Ok(record_batches.as_stream()),
|
||||
_ => UnexpectedSnafu {
|
||||
violated: "expected a stream",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_table_from_dml(
|
||||
&self,
|
||||
dml_statement: DmlStatement,
|
||||
query_ctx: &QueryContextRef,
|
||||
) -> Result<TableRef> {
|
||||
let default_catalog = query_ctx.current_catalog().to_owned();
|
||||
let default_schema = query_ctx.current_schema().to_owned();
|
||||
let resolved_table_ref = dml_statement
|
||||
.table_name
|
||||
.resolve(&default_catalog, &default_schema);
|
||||
let table_ref = TableReference::full(
|
||||
&resolved_table_ref.catalog,
|
||||
&resolved_table_ref.schema,
|
||||
&resolved_table_ref.table,
|
||||
);
|
||||
self.get_table(&table_ref).await
|
||||
}
|
||||
}
|
||||
|
||||
fn extract_dml_statement(logical_plan: LogicalPlan) -> Result<DmlStatement> {
|
||||
let LogicalPlan::DfPlan(df_plan) = logical_plan;
|
||||
match df_plan {
|
||||
DfLogicalPlan::Dml(dml) => Ok(dml),
|
||||
_ => UnexpectedSnafu {
|
||||
violated: "expected a DML plan",
|
||||
}
|
||||
.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_insert_request(
|
||||
record_batch: RecordBatch,
|
||||
table_schema: SchemaRef,
|
||||
table_info: &TableInfoRef,
|
||||
) -> Result<InsertRequest> {
|
||||
let columns_values = record_batch
|
||||
.column_vectors(&table_info.name, table_schema)
|
||||
.context(BuildColumnVectorsSnafu)?;
|
||||
|
||||
Ok(InsertRequest {
|
||||
catalog_name: table_info.catalog_name.clone(),
|
||||
schema_name: table_info.schema_name.clone(),
|
||||
table_name: table_info.name.clone(),
|
||||
columns_values,
|
||||
region_number: 0,
|
||||
})
|
||||
}
|
||||
|
||||
fn build_delete_request(
|
||||
record_batch: RecordBatch,
|
||||
table_schema: SchemaRef,
|
||||
table_info: &TableInfoRef,
|
||||
) -> Result<DeleteRequest> {
|
||||
let ts_column = table_schema
|
||||
.timestamp_column()
|
||||
.map(|x| x.name.clone())
|
||||
.with_context(|| table::error::MissingTimeIndexColumnSnafu {
|
||||
table_name: table_info.name.clone(),
|
||||
})
|
||||
.context(MissingTimeIndexColumnSnafu)?;
|
||||
|
||||
let column_vectors = record_batch
|
||||
.column_vectors(&table_info.name, table_schema)
|
||||
.context(BuildColumnVectorsSnafu)?;
|
||||
|
||||
let rowkey_columns = table_info
|
||||
.meta
|
||||
.row_key_column_names()
|
||||
.collect::<Vec<&String>>();
|
||||
|
||||
let key_column_values = column_vectors
|
||||
.into_iter()
|
||||
.filter(|x| x.0 == ts_column || rowkey_columns.contains(&&x.0))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
Ok(DeleteRequest {
|
||||
catalog_name: table_info.catalog_name.clone(),
|
||||
schema_name: table_info.schema_name.clone(),
|
||||
table_name: table_info.name.clone(),
|
||||
key_column_values,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -12,10 +12,19 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use query::error as query_error;
|
||||
use query::error::Result as QueryResult;
|
||||
use query::table_mutation::{AffectedRows, TableMutationHandler};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::ResultExt;
|
||||
use sqlparser::ast::ObjectName;
|
||||
use table::requests::{DeleteRequest as TableDeleteRequest, InsertRequest as TableInsertRequest};
|
||||
|
||||
use crate::delete::DeleterRef;
|
||||
use crate::error::{InvalidSqlSnafu, Result};
|
||||
use crate::insert::InserterRef;
|
||||
|
||||
// TODO(LFC): Refactor consideration: move this function to some helper mod,
|
||||
// could be done together or after `TableReference`'s refactoring, when issue #559 is resolved.
|
||||
@@ -47,3 +56,41 @@ pub fn table_idents_to_full_name(
|
||||
}.fail(),
|
||||
}
|
||||
}
|
||||
|
||||
pub struct TableMutationOperator {
|
||||
inserter: InserterRef,
|
||||
deleter: DeleterRef,
|
||||
}
|
||||
|
||||
impl TableMutationOperator {
|
||||
pub fn new(inserter: InserterRef, deleter: DeleterRef) -> Self {
|
||||
Self { inserter, deleter }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl TableMutationHandler for TableMutationOperator {
|
||||
async fn insert(
|
||||
&self,
|
||||
request: TableInsertRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> QueryResult<AffectedRows> {
|
||||
self.inserter
|
||||
.handle_table_insert(request, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(query_error::TableMutationSnafu)
|
||||
}
|
||||
|
||||
async fn delete(
|
||||
&self,
|
||||
request: TableDeleteRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> QueryResult<AffectedRows> {
|
||||
self.deleter
|
||||
.handle_table_delete(request, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(query_error::TableMutationSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -435,7 +435,6 @@ fn test_meter_insert_request() {
|
||||
schema_name: "public".to_string(),
|
||||
table_name: "numbers".to_string(),
|
||||
columns_values: Default::default(),
|
||||
region_number: 0,
|
||||
};
|
||||
meter_insert_request!(req);
|
||||
|
||||
|
||||
@@ -54,8 +54,8 @@ use crate::dataframe::DataFrame;
|
||||
pub use crate::datafusion::planner::DfContextProviderAdapter;
|
||||
use crate::error::{
|
||||
CatalogSnafu, CreateRecordBatchSnafu, CreateSchemaSnafu, DataFusionSnafu,
|
||||
MissingTimestampColumnSnafu, QueryExecutionSnafu, Result, TableNotFoundSnafu,
|
||||
UnimplementedSnafu, UnsupportedExprSnafu,
|
||||
MissingTableMutationHandlerSnafu, MissingTimestampColumnSnafu, QueryExecutionSnafu, Result,
|
||||
TableNotFoundSnafu, UnimplementedSnafu, UnsupportedExprSnafu,
|
||||
};
|
||||
use crate::executor::QueryExecutor;
|
||||
use crate::logical_optimizer::LogicalOptimizer;
|
||||
@@ -115,7 +115,7 @@ impl DatafusionQueryEngine {
|
||||
let table = self.find_table(&table_name).await?;
|
||||
|
||||
let output = self
|
||||
.exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx)
|
||||
.exec_query_plan(LogicalPlan::DfPlan((*dml.input).clone()), query_ctx.clone())
|
||||
.await?;
|
||||
let mut stream = match output {
|
||||
Output::RecordBatches(batches) => batches.as_stream(),
|
||||
@@ -132,8 +132,14 @@ impl DatafusionQueryEngine {
|
||||
.context(QueryExecutionSnafu)?;
|
||||
|
||||
let rows = match dml.op {
|
||||
WriteOp::Insert => Self::insert(&table_name, &table, column_vectors).await?,
|
||||
WriteOp::Delete => Self::delete(&table_name, &table, column_vectors).await?,
|
||||
WriteOp::Insert => {
|
||||
self.insert(&table_name, column_vectors, query_ctx.clone())
|
||||
.await?
|
||||
}
|
||||
WriteOp::Delete => {
|
||||
self.delete(&table_name, &table, column_vectors, query_ctx.clone())
|
||||
.await?
|
||||
}
|
||||
_ => unreachable!("guarded by the 'ensure!' at the beginning"),
|
||||
};
|
||||
affected_rows += rows;
|
||||
@@ -142,9 +148,11 @@ impl DatafusionQueryEngine {
|
||||
}
|
||||
|
||||
async fn delete<'a>(
|
||||
&self,
|
||||
table_name: &ResolvedTableReference<'a>,
|
||||
table: &TableRef,
|
||||
column_vectors: HashMap<String, VectorRef>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<usize> {
|
||||
let catalog_name = table_name.catalog.to_string();
|
||||
let schema_name = table_name.schema.to_string();
|
||||
@@ -174,31 +182,31 @@ impl DatafusionQueryEngine {
|
||||
key_column_values: column_vectors,
|
||||
};
|
||||
|
||||
table
|
||||
.delete(request)
|
||||
self.state
|
||||
.table_mutation_handler()
|
||||
.context(MissingTableMutationHandlerSnafu)?
|
||||
.delete(request, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryExecutionSnafu)
|
||||
}
|
||||
|
||||
async fn insert<'a>(
|
||||
&self,
|
||||
table_name: &ResolvedTableReference<'a>,
|
||||
table: &TableRef,
|
||||
column_vectors: HashMap<String, VectorRef>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<usize> {
|
||||
let request = InsertRequest {
|
||||
catalog_name: table_name.catalog.to_string(),
|
||||
schema_name: table_name.schema.to_string(),
|
||||
table_name: table_name.table.to_string(),
|
||||
columns_values: column_vectors,
|
||||
region_number: 0,
|
||||
};
|
||||
|
||||
table
|
||||
.insert(request)
|
||||
self.state
|
||||
.table_mutation_handler()
|
||||
.context(MissingTableMutationHandlerSnafu)?
|
||||
.insert(request, query_ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryExecutionSnafu)
|
||||
}
|
||||
|
||||
async fn find_table(&self, table_name: &ResolvedTableReference<'_>) -> Result<TableRef> {
|
||||
@@ -517,7 +525,7 @@ mod tests {
|
||||
};
|
||||
catalog_manager.register_table_sync(req).unwrap();
|
||||
|
||||
QueryEngineFactory::new(catalog_manager, None, false).query_engine()
|
||||
QueryEngineFactory::new(catalog_manager, None, None, false).query_engine()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -254,11 +254,20 @@ pub enum Error {
|
||||
#[snafu(display("Column schema has no default value, column: {}", column))]
|
||||
ColumnSchemaNoDefault { column: String, location: Location },
|
||||
|
||||
#[snafu(display("Region query error, source: {}", source))]
|
||||
#[snafu(display("Region query error"))]
|
||||
RegionQuery {
|
||||
source: BoxedError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Table mutation error"))]
|
||||
TableMutation {
|
||||
source: BoxedError,
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Missing table mutation handler"))]
|
||||
MissingTableMutationHandler { location: Location },
|
||||
}
|
||||
|
||||
impl ErrorExt for Error {
|
||||
@@ -305,7 +314,10 @@ impl ErrorExt for Error {
|
||||
RemoteRequest { source, .. } => source.status_code(),
|
||||
UnexpectedOutputKind { .. } => StatusCode::Unexpected,
|
||||
CreateSchema { source, .. } => source.status_code(),
|
||||
|
||||
RegionQuery { source, .. } => source.status_code(),
|
||||
TableMutation { source, .. } => source.status_code(),
|
||||
MissingTableMutationHandler { .. } => StatusCode::Unexpected,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ pub mod query_engine;
|
||||
mod range_select;
|
||||
pub mod region_query;
|
||||
pub mod sql;
|
||||
pub mod table_mutation;
|
||||
|
||||
pub use crate::datafusion::DfContextProviderAdapter;
|
||||
pub use crate::query_engine::{
|
||||
|
||||
@@ -38,6 +38,7 @@ use crate::planner::LogicalPlanner;
|
||||
pub use crate::query_engine::context::QueryEngineContext;
|
||||
pub use crate::query_engine::state::QueryEngineState;
|
||||
use crate::region_query::RegionQueryHandlerRef;
|
||||
use crate::table_mutation::TableMutationHandlerRef;
|
||||
|
||||
/// Describe statement result
|
||||
#[derive(Debug)]
|
||||
@@ -80,11 +81,13 @@ impl QueryEngineFactory {
|
||||
pub fn new(
|
||||
catalog_manager: CatalogManagerRef,
|
||||
region_query_handler: Option<RegionQueryHandlerRef>,
|
||||
table_mutation_handler: Option<TableMutationHandlerRef>,
|
||||
with_dist_planner: bool,
|
||||
) -> Self {
|
||||
Self::new_with_plugins(
|
||||
catalog_manager,
|
||||
region_query_handler,
|
||||
table_mutation_handler,
|
||||
with_dist_planner,
|
||||
Default::default(),
|
||||
)
|
||||
@@ -93,12 +96,14 @@ impl QueryEngineFactory {
|
||||
pub fn new_with_plugins(
|
||||
catalog_manager: CatalogManagerRef,
|
||||
region_query_handler: Option<RegionQueryHandlerRef>,
|
||||
table_mutation_handler: Option<TableMutationHandlerRef>,
|
||||
with_dist_planner: bool,
|
||||
plugins: Arc<Plugins>,
|
||||
) -> Self {
|
||||
let state = Arc::new(QueryEngineState::new(
|
||||
catalog_manager,
|
||||
region_query_handler,
|
||||
table_mutation_handler,
|
||||
with_dist_planner,
|
||||
plugins.clone(),
|
||||
));
|
||||
@@ -131,7 +136,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_query_engine_factory() {
|
||||
let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
|
||||
let factory = QueryEngineFactory::new(catalog_list, None, false);
|
||||
let factory = QueryEngineFactory::new(catalog_list, None, None, false);
|
||||
|
||||
let engine = factory.query_engine();
|
||||
|
||||
|
||||
@@ -48,6 +48,7 @@ use crate::optimizer::type_conversion::TypeConversionRule;
|
||||
use crate::query_engine::options::QueryOptions;
|
||||
use crate::range_select::planner::RangeSelectPlanner;
|
||||
use crate::region_query::RegionQueryHandlerRef;
|
||||
use crate::table_mutation::TableMutationHandlerRef;
|
||||
|
||||
/// Query engine global state
|
||||
// TODO(yingwen): This QueryEngineState still relies on datafusion, maybe we can define a trait for it,
|
||||
@@ -57,6 +58,7 @@ use crate::region_query::RegionQueryHandlerRef;
|
||||
pub struct QueryEngineState {
|
||||
df_context: SessionContext,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
table_mutation_handler: Option<TableMutationHandlerRef>,
|
||||
aggregate_functions: Arc<RwLock<HashMap<String, AggregateFunctionMetaRef>>>,
|
||||
plugins: Arc<Plugins>,
|
||||
}
|
||||
@@ -73,6 +75,7 @@ impl QueryEngineState {
|
||||
pub fn new(
|
||||
catalog_list: CatalogManagerRef,
|
||||
region_query_handler: Option<RegionQueryHandlerRef>,
|
||||
table_mutation_handler: Option<TableMutationHandlerRef>,
|
||||
with_dist_planner: bool,
|
||||
plugins: Arc<Plugins>,
|
||||
) -> Self {
|
||||
@@ -123,6 +126,7 @@ impl QueryEngineState {
|
||||
Self {
|
||||
df_context,
|
||||
catalog_manager: catalog_list,
|
||||
table_mutation_handler,
|
||||
aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
|
||||
plugins,
|
||||
}
|
||||
@@ -184,6 +188,11 @@ impl QueryEngineState {
|
||||
&self.catalog_manager
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn table_mutation_handler(&self) -> Option<&TableMutationHandlerRef> {
|
||||
self.table_mutation_handler.as_ref()
|
||||
}
|
||||
|
||||
pub(crate) fn disallow_cross_schema_query(&self) -> bool {
|
||||
self.plugins
|
||||
.map::<QueryOptions, _, _>(|x| x.disallow_cross_schema_query)
|
||||
|
||||
@@ -388,7 +388,7 @@ mod test {
|
||||
table,
|
||||
})
|
||||
.is_ok());
|
||||
QueryEngineFactory::new(catalog_list, None, false).query_engine()
|
||||
QueryEngineFactory::new(catalog_list, None, None, false).query_engine()
|
||||
}
|
||||
|
||||
async fn query_plan_compare(sql: &str, expected: String) {
|
||||
|
||||
@@ -22,7 +22,6 @@ use crate::error::Result;
|
||||
|
||||
#[async_trait]
|
||||
pub trait RegionQueryHandler: Send + Sync {
|
||||
// TODO(ruihang): add trace id and span id in the request.
|
||||
async fn do_get(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
|
||||
}
|
||||
|
||||
|
||||
35
src/query/src/table_mutation.rs
Normal file
35
src/query/src/table_mutation.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use session::context::QueryContextRef;
|
||||
use table::requests::{DeleteRequest, InsertRequest};
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
pub type AffectedRows = usize;
|
||||
|
||||
/// A trait for handling table mutations in `QueryEngine`.
|
||||
#[async_trait]
|
||||
pub trait TableMutationHandler: Send + Sync {
|
||||
/// Inserts rows into the table.
|
||||
async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
|
||||
|
||||
/// Delete rows from the table.
|
||||
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
|
||||
}
|
||||
|
||||
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
|
||||
@@ -51,5 +51,5 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
|
||||
pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef {
|
||||
let catalog_manager = MemoryCatalogManager::new_with_table(table);
|
||||
|
||||
QueryEngineFactory::new(catalog_manager, None, false).query_engine()
|
||||
QueryEngineFactory::new(catalog_manager, None, None, false).query_engine()
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ async fn test_datafusion_query_engine() -> Result<()> {
|
||||
let catalog_list = catalog::memory::new_memory_catalog_manager()
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryExecutionSnafu)?;
|
||||
let factory = QueryEngineFactory::new(catalog_list, None, false);
|
||||
let factory = QueryEngineFactory::new(catalog_list, None, None, false);
|
||||
let engine = factory.query_engine();
|
||||
|
||||
let column_schemas = vec![ColumnSchema::new(
|
||||
@@ -129,7 +129,7 @@ async fn test_query_validate() -> Result<()> {
|
||||
});
|
||||
let plugins = Arc::new(plugins);
|
||||
|
||||
let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, false, plugins);
|
||||
let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, None, false, plugins);
|
||||
let engine = factory.query_engine();
|
||||
|
||||
let stmt = QueryLanguageParser::parse_sql("select number from public.numbers").unwrap();
|
||||
@@ -153,7 +153,7 @@ async fn test_udf() -> Result<()> {
|
||||
common_telemetry::init_default_ut_logging();
|
||||
let catalog_list = catalog_manager()?;
|
||||
|
||||
let factory = QueryEngineFactory::new(catalog_list, None, false);
|
||||
let factory = QueryEngineFactory::new(catalog_list, None, None, false);
|
||||
let engine = factory.query_engine();
|
||||
|
||||
let pow = make_scalar_function(pow);
|
||||
|
||||
@@ -106,7 +106,7 @@ fn create_test_engine() -> TimeRangeTester {
|
||||
};
|
||||
let _ = catalog_manager.register_table_sync(req).unwrap();
|
||||
|
||||
let engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
|
||||
let engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine();
|
||||
TimeRangeTester { engine, filter }
|
||||
}
|
||||
|
||||
|
||||
@@ -52,7 +52,7 @@ where
|
||||
pub(crate) fn sample_script_engine() -> PyEngine {
|
||||
let catalog_manager =
|
||||
MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID));
|
||||
let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
|
||||
let query_engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine();
|
||||
|
||||
PyEngine::new(query_engine.clone())
|
||||
}
|
||||
|
||||
@@ -375,7 +375,8 @@ mod tests {
|
||||
pub(crate) fn sample_script_engine() -> PyEngine {
|
||||
let catalog_manager =
|
||||
MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID));
|
||||
let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
|
||||
let query_engine =
|
||||
QueryEngineFactory::new(catalog_manager, None, None, false).query_engine();
|
||||
|
||||
PyEngine::new(query_engine.clone())
|
||||
}
|
||||
|
||||
@@ -56,7 +56,7 @@ pub async fn setup_scripts_manager(
|
||||
|
||||
let catalog_manager = MemoryCatalogManager::new_with_table(table.clone());
|
||||
|
||||
let factory = QueryEngineFactory::new(catalog_manager.clone(), None, false);
|
||||
let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, false);
|
||||
let query_engine = factory.query_engine();
|
||||
let mgr = ScriptManager::new(Arc::new(MockGrpcQueryHandler {}) as _, query_engine)
|
||||
.await
|
||||
|
||||
@@ -141,7 +141,6 @@ impl LineWriter {
|
||||
schema_name: self.db,
|
||||
table_name: self.table_name,
|
||||
columns_values,
|
||||
region_number: 0, // TODO(hl): Check if assign 0 region is ok?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -214,7 +214,7 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
|
||||
fn create_testing_instance(table: TableRef) -> DummyInstance {
|
||||
let catalog_manager = MemoryCatalogManager::new_with_table(table);
|
||||
let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
|
||||
let query_engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine();
|
||||
DummyInstance::new(query_engine)
|
||||
}
|
||||
|
||||
|
||||
@@ -246,7 +246,6 @@ pub struct InsertRequest {
|
||||
pub schema_name: String,
|
||||
pub table_name: String,
|
||||
pub columns_values: HashMap<String, VectorRef>,
|
||||
pub region_number: RegionNumber,
|
||||
}
|
||||
|
||||
/// Delete (by primary key) request
|
||||
@@ -327,7 +326,6 @@ macro_rules! meter_insert_request {
|
||||
$req.catalog_name.to_string(),
|
||||
$req.schema_name.to_string(),
|
||||
$req.table_name.to_string(),
|
||||
$req.region_number,
|
||||
$req
|
||||
);
|
||||
};
|
||||
|
||||
@@ -24,14 +24,10 @@ use async_trait::async_trait;
|
||||
use common_query::logical_plan::Expr;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datatypes::schema::SchemaRef;
|
||||
use store_api::storage::{RegionNumber, ScanRequest};
|
||||
use store_api::storage::ScanRequest;
|
||||
|
||||
use crate::error::{Result, UnsupportedSnafu};
|
||||
use crate::error::Result;
|
||||
use crate::metadata::{FilterPushDownType, TableId, TableInfoRef, TableType};
|
||||
use crate::requests::{AlterTableRequest, DeleteRequest, InsertRequest};
|
||||
use crate::stats::TableStatistics;
|
||||
|
||||
pub type AlterContext = anymap::Map<dyn Any + Send + Sync>;
|
||||
|
||||
/// Table abstraction.
|
||||
#[async_trait]
|
||||
@@ -49,16 +45,6 @@ pub trait Table: Send + Sync {
|
||||
/// Get the type of this table for metadata/catalog purposes.
|
||||
fn table_type(&self) -> TableType;
|
||||
|
||||
/// Insert values into table.
|
||||
///
|
||||
/// Returns number of inserted rows.
|
||||
async fn insert(&self, _request: InsertRequest) -> Result<usize> {
|
||||
UnsupportedSnafu {
|
||||
operation: "INSERT",
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
|
||||
async fn scan_to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream>;
|
||||
|
||||
/// Tests whether the table provider can make use of any or all filter expressions
|
||||
@@ -66,67 +52,6 @@ pub trait Table: Send + Sync {
|
||||
fn supports_filters_pushdown(&self, filters: &[&Expr]) -> Result<Vec<FilterPushDownType>> {
|
||||
Ok(vec![FilterPushDownType::Unsupported; filters.len()])
|
||||
}
|
||||
|
||||
/// Alter table.
|
||||
async fn alter(&self, _context: AlterContext, _request: &AlterTableRequest) -> Result<()> {
|
||||
UnsupportedSnafu {
|
||||
operation: "ALTER TABLE",
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
|
||||
/// Delete rows in the table.
|
||||
///
|
||||
/// Returns number of deleted rows.
|
||||
async fn delete(&self, _request: DeleteRequest) -> Result<usize> {
|
||||
UnsupportedSnafu {
|
||||
operation: "DELETE",
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
|
||||
/// Flush table.
|
||||
///
|
||||
/// Options:
|
||||
/// - region_number: specify region to flush.
|
||||
/// - wait: Whether to wait until flush is done.
|
||||
async fn flush(&self, region_number: Option<RegionNumber>, wait: Option<bool>) -> Result<()> {
|
||||
let _ = (region_number, wait);
|
||||
UnsupportedSnafu { operation: "FLUSH" }.fail()?
|
||||
}
|
||||
|
||||
/// Close the table.
|
||||
async fn close(&self, _regions: &[RegionNumber]) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Return true if contains the region
|
||||
fn contains_region(&self, _region: RegionNumber) -> Result<bool> {
|
||||
UnsupportedSnafu {
|
||||
operation: "contain_region",
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
|
||||
/// Get statistics for this table, if available
|
||||
fn statistics(&self) -> Option<TableStatistics> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn compact(&self, region_number: Option<RegionNumber>, wait: Option<bool>) -> Result<()> {
|
||||
let _ = (region_number, wait);
|
||||
UnsupportedSnafu {
|
||||
operation: "COMPACTION",
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
|
||||
async fn truncate(&self) -> Result<()> {
|
||||
UnsupportedSnafu {
|
||||
operation: "TRUNCATE",
|
||||
}
|
||||
.fail()?
|
||||
}
|
||||
}
|
||||
|
||||
pub type TableRef = Arc<dyn Table>;
|
||||
|
||||
Reference in New Issue
Block a user