From 0e05f85a9dad8f107599e5f20641638520bc75fe Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Thu, 9 May 2024 13:57:33 +0900 Subject: [PATCH] feat: pass `QueryContext` to `FlowRequestHeader` (#3878) * feat: pass `QueryContext` to `DdlTaskRequest` * feat: pass `QueryContext` to `FlowRequestHeader` * chore: fmt toml --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/common/meta/Cargo.toml | 1 + src/common/meta/src/ddl/create_flow.rs | 18 ++++- src/common/meta/src/ddl_manager.rs | 16 +++- src/common/meta/src/rpc/ddl.rs | 42 +++++++++- src/frontend/src/instance/grpc.rs | 16 ++-- src/meta-srv/src/service/procedure.rs | 18 ++++- src/operator/src/expr_factory.rs | 31 ++++---- src/operator/src/insert.rs | 12 +-- src/operator/src/statement.rs | 8 +- src/operator/src/statement/ddl.rs | 106 ++++++++++++++++++------- src/query/src/dist_plan/merge_scan.rs | 3 +- src/session/src/context.rs | 30 +++++++ 14 files changed, 238 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2560897f61..656600d553 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1942,6 +1942,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "session", "snafu", "store-api", "strum 0.25.0", @@ -3893,7 +3894,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=07de9a47ed46eae0fab4cbc5c32a17c67ed5cb38#07de9a47ed46eae0fab4cbc5c32a17c67ed5cb38" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=47af36e2b218bdb09fa3a84a31999245152db2ee#47af36e2b218bdb09fa3a84a31999245152db2ee" dependencies = [ "prost 0.12.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index 2dd4cbb8fc..6579614ab3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,7 +116,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "07de9a47ed46eae0fab4cbc5c32a17c67ed5cb38" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "47af36e2b218bdb09fa3a84a31999245152db2ee" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 40df9d6180..ddc47f7f8c 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -46,6 +46,7 @@ rskafka.workspace = true serde.workspace = true serde_json.workspace = true serde_with.workspace = true +session.workspace = true snafu.workspace = true store-api.workspace = true strum.workspace = true diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 195bb3a567..20e46bfaa8 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -18,13 +18,14 @@ mod metadata; use std::collections::BTreeMap; use api::v1::flow::flow_request::Body as PbFlowRequest; -use api::v1::flow::{CreateRequest, FlowRequest}; +use api::v1::flow::{CreateRequest, FlowRequest, FlowRequestHeader}; use async_trait::async_trait; use common_procedure::error::{FromJsonSnafu, ToJsonSnafu}; use common_procedure::{ Context as ProcedureContext, LockKey, Procedure, Result as ProcedureResult, Status, }; use common_telemetry::info; +use common_telemetry::tracing_context::TracingContext; use futures::future::join_all; use itertools::Itertools; use serde::{Deserialize, Serialize}; @@ -40,7 +41,7 @@ use crate::key::flow::flow_info::FlowInfoValue; use crate::key::FlowId; use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock}; use crate::peer::Peer; -use crate::rpc::ddl::CreateFlowTask; +use crate::rpc::ddl::{CreateFlowTask, QueryContext}; use crate::{metrics, ClusterId}; /// The procedure of flow creation. @@ -53,7 +54,12 @@ impl CreateFlowProcedure { pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateFlow"; /// Returns a new [CreateFlowProcedure]. - pub fn new(cluster_id: ClusterId, task: CreateFlowTask, context: DdlContext) -> Self { + pub fn new( + cluster_id: ClusterId, + task: CreateFlowTask, + query_context: QueryContext, + context: DdlContext, + ) -> Self { Self { context, data: CreateFlowData { @@ -63,6 +69,7 @@ impl CreateFlowProcedure { peers: vec![], source_table_ids: vec![], state: CreateFlowState::CreateMetadata, + query_context, }, } } @@ -88,6 +95,10 @@ impl CreateFlowProcedure { for peer in &self.data.peers { let requester = self.context.node_manager.flownode(peer).await; let request = FlowRequest { + header: Some(FlowRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + query_context: Some(self.data.query_context.clone().into()), + }), body: Some(PbFlowRequest::Create((&self.data).into())), }; create_flow.push(async move { @@ -187,6 +198,7 @@ pub struct CreateFlowData { pub(crate) flow_id: Option, pub(crate) peers: Vec, pub(crate) source_table_ids: Vec, + pub(crate) query_context: QueryContext, } impl From<&CreateFlowData> for CreateRequest { diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 8af7211210..50b7a7ddf1 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -48,7 +48,7 @@ use crate::rpc::ddl::DdlTask::{ }; use crate::rpc::ddl::{ AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask, - DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, + DropTableTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::procedure; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -328,9 +328,10 @@ impl DdlManager { &self, cluster_id: ClusterId, create_flow: CreateFlowTask, + query_context: QueryContext, ) -> Result<(ProcedureId, Option)> { let context = self.create_context(); - let procedure = CreateFlowProcedure::new(cluster_id, create_flow, context); + let procedure = CreateFlowProcedure::new(cluster_id, create_flow, query_context, context); let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); self.submit_procedure(procedure_with_id).await @@ -600,9 +601,10 @@ async fn handle_create_flow_task( ddl_manager: &DdlManager, cluster_id: ClusterId, create_flow_task: CreateFlowTask, + query_context: QueryContext, ) -> Result { let (id, output) = ddl_manager - .submit_create_flow_task(cluster_id, create_flow_task.clone()) + .submit_create_flow_task(cluster_id, create_flow_task.clone(), query_context) .await?; let procedure_id = id.to_string(); @@ -705,7 +707,13 @@ impl ProcedureExecutor for DdlManager { handle_drop_database_task(self, cluster_id, drop_database_task).await } CreateFlow(create_flow_task) => { - handle_create_flow_task(self, cluster_id, create_flow_task).await + handle_create_flow_task( + self, + cluster_id, + create_flow_task, + request.query_context.into(), + ) + .await } } } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 9b75bd6c39..1493d864c0 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -27,12 +27,13 @@ use api::v1::meta::{ }; use api::v1::{ AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, DropDatabaseExpr, DropFlowExpr, - DropTableExpr, TruncateTableExpr, + DropTableExpr, QueryContext as PbQueryContext, TruncateTableExpr, }; use base64::engine::general_purpose; use base64::Engine as _; use prost::Message; use serde::{Deserialize, Serialize}; +use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use table::metadata::{RawTableInfo, TableId}; use table::table_reference::TableReference; @@ -195,6 +196,7 @@ impl TryFrom for DdlTask { #[derive(Clone)] pub struct SubmitDdlTaskRequest { + pub query_context: QueryContextRef, pub task: DdlTask, } @@ -238,6 +240,7 @@ impl TryFrom for PbDdlTaskRequest { Ok(Self { header: None, + query_context: Some((*request.query_context).clone().into()), task: Some(task), }) } @@ -853,6 +856,43 @@ impl From for PbDropFlowTask { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct QueryContext { + current_catalog: String, + current_schema: String, + timezone: String, + extensions: HashMap, +} + +impl From for QueryContext { + fn from(query_context: QueryContextRef) -> Self { + QueryContext { + current_catalog: query_context.current_catalog().to_string(), + current_schema: query_context.current_schema().to_string(), + timezone: query_context.timezone().to_string(), + extensions: query_context.extensions(), + } + } +} + +impl From for PbQueryContext { + fn from( + QueryContext { + current_catalog, + current_schema, + timezone, + extensions, + }: QueryContext, + ) -> Self { + PbQueryContext { + current_catalog, + current_schema, + timezone, + extensions, + } + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 2009bc5638..a9eee1527e 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -111,17 +111,21 @@ impl GrpcQueryHandler for Instance { DdlExpr::CreateTable(mut expr) => { let _ = self .statement_executor - .create_table_inner(&mut expr, None, &ctx) + .create_table_inner(&mut expr, None, ctx.clone()) .await?; Output::new_with_affected_rows(0) } - DdlExpr::Alter(expr) => self.statement_executor.alter_table_inner(expr).await?, + DdlExpr::Alter(expr) => { + self.statement_executor + .alter_table_inner(expr, ctx.clone()) + .await? + } DdlExpr::CreateDatabase(expr) => { self.statement_executor .create_database( - ctx.current_catalog(), &expr.schema_name, expr.create_if_not_exists, + ctx.clone(), ) .await? } @@ -129,13 +133,15 @@ impl GrpcQueryHandler for Instance { let table_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); self.statement_executor - .drop_table(table_name, expr.drop_if_exists) + .drop_table(table_name, expr.drop_if_exists, ctx.clone()) .await? } DdlExpr::TruncateTable(expr) => { let table_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.table_name); - self.statement_executor.truncate_table(table_name).await? + self.statement_executor + .truncate_table(table_name, ctx.clone()) + .await? } DdlExpr::CreateFlow(_) => { unimplemented!() diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 0a410875fa..05e6d98fa4 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::Duration; use api::v1::meta::{ @@ -56,10 +57,20 @@ impl procedure_service_server::ProcedureService for Metasrv { } async fn ddl(&self, request: Request) -> GrpcResult { - let PbDdlTaskRequest { header, task, .. } = request.into_inner(); + let PbDdlTaskRequest { + header, + query_context, + task, + .. + } = request.into_inner(); let header = header.context(error::MissingRequestHeaderSnafu)?; let cluster_id = header.cluster_id; + let query_context = query_context + .context(error::MissingRequiredParameterSnafu { + param: "query_context", + })? + .into(); let task: DdlTask = task .context(error::MissingRequiredParameterSnafu { param: "task" })? .try_into() @@ -72,7 +83,10 @@ impl procedure_service_server::ProcedureService for Metasrv { cluster_id: Some(cluster_id), tracing_context: Some(header.tracing_context), }, - SubmitDdlTaskRequest { task }, + SubmitDdlTaskRequest { + query_context: Arc::new(query_context), + task, + }, ) .await .context(error::SubmitDdlTaskSnafu)? diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index 91c374889e..d4ff07fe4f 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -125,10 +125,10 @@ impl CreateExprFactory { // constraint. pub(crate) async fn create_external_expr( create: CreateExternalTable, - query_ctx: QueryContextRef, + query_ctx: &QueryContextRef, ) -> Result { let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(&create.name, &query_ctx) + table_idents_to_full_name(&create.name, query_ctx) .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -187,9 +187,12 @@ pub(crate) async fn create_external_expr( } /// Convert `CreateTable` statement to [`CreateTableExpr`] gRPC request. -pub fn create_to_expr(create: &CreateTable, query_ctx: QueryContextRef) -> Result { +pub fn create_to_expr( + create: &CreateTable, + query_ctx: &QueryContextRef, +) -> Result { let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(&create.name, &query_ctx) + table_idents_to_full_name(&create.name, query_ctx) .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -449,10 +452,10 @@ pub fn column_schemas_to_defs( pub(crate) fn to_alter_expr( alter_table: AlterTable, - query_ctx: QueryContextRef, + query_ctx: &QueryContextRef, ) -> Result { let (catalog_name, schema_name, table_name) = - table_idents_to_full_name(alter_table.table_name(), &query_ctx) + table_idents_to_full_name(alter_table.table_name(), query_ctx) .map_err(BoxedError::new) .context(ExternalSnafu)?; @@ -513,7 +516,7 @@ pub(crate) fn to_alter_expr( pub fn to_create_flow_task_expr( create_flow: CreateFlow, - query_ctx: QueryContextRef, + query_ctx: &QueryContextRef, ) -> Result { // retrieve sink table name let sink_table_ref = @@ -598,7 +601,7 @@ mod tests { let Statement::CreateTable(create_table) = stmt else { unreachable!() }; - let expr = create_to_expr(&create_table, QueryContext::arc()).unwrap(); + let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap(); assert_eq!("3days", expr.table_options.get("ttl").unwrap()); assert_eq!( "1.0MiB", @@ -629,7 +632,7 @@ mod tests { let Statement::CreateTable(create_table) = stmt else { unreachable!() }; - create_to_expr(&create_table, QueryContext::arc()).unwrap_err(); + create_to_expr(&create_table, &QueryContext::arc()).unwrap_err(); } } @@ -647,7 +650,7 @@ mod tests { }; // query context with system timezone UTC. - let expr = create_to_expr(&create_table, QueryContext::arc()).unwrap(); + let expr = create_to_expr(&create_table, &QueryContext::arc()).unwrap(); let ts_column = &expr.column_defs[1]; let constraint = assert_ts_column(ts_column); assert!( @@ -660,7 +663,7 @@ mod tests { .timezone(Timezone::from_tz_string("+08:00").unwrap().into()) .build() .into(); - let expr = create_to_expr(&create_table, ctx).unwrap(); + let expr = create_to_expr(&create_table, &ctx).unwrap(); let ts_column = &expr.column_defs[1]; let constraint = assert_ts_column(ts_column); assert!( @@ -694,7 +697,7 @@ mod tests { }; // query context with system timezone UTC. - let expr = to_alter_expr(alter_table.clone(), QueryContext::arc()).unwrap(); + let expr = to_alter_expr(alter_table.clone(), &QueryContext::arc()).unwrap(); let kind = expr.kind.unwrap(); let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else { @@ -715,7 +718,7 @@ mod tests { .timezone(Timezone::from_tz_string("+08:00").unwrap().into()) .build() .into(); - let expr = to_alter_expr(alter_table, ctx).unwrap(); + let expr = to_alter_expr(alter_table, &ctx).unwrap(); let kind = expr.kind.unwrap(); let Kind::AddColumns(AddColumns { add_columns, .. }) = kind else { @@ -745,7 +748,7 @@ mod tests { }; // query context with system timezone UTC. - let expr = to_alter_expr(alter_table.clone(), QueryContext::arc()).unwrap(); + let expr = to_alter_expr(alter_table.clone(), &QueryContext::arc()).unwrap(); let kind = expr.kind.unwrap(); let Kind::ChangeColumnTypes(ChangeColumnTypes { diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index d2d2cb9e9f..cf9f44b30e 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -406,7 +406,7 @@ impl Inserter { if !alter_tables.is_empty() { // Alter logical tables in batch. statement_executor - .alter_logical_tables(alter_tables) + .alter_logical_tables(alter_tables, ctx.clone()) .await?; } } else { @@ -414,7 +414,9 @@ impl Inserter { self.create_table(req, ctx, statement_executor).await?; } for alter_expr in alter_tables.into_iter() { - statement_executor.alter_table_inner(alter_expr).await?; + statement_executor + .alter_table_inner(alter_expr, ctx.clone()) + .await?; } } @@ -466,7 +468,7 @@ impl Inserter { // create physical table let res = statement_executor - .create_table_inner(create_table_expr, None, ctx) + .create_table_inner(create_table_expr, None, ctx.clone()) .await; match res { @@ -538,7 +540,7 @@ impl Inserter { // TODO(weny): multiple regions table. let res = statement_executor - .create_table_inner(create_table_expr, None, ctx) + .create_table_inner(create_table_expr, None, ctx.clone()) .await; match res { @@ -589,7 +591,7 @@ impl Inserter { .collect::>>()?; let res = statement_executor - .create_logical_tables(&create_table_exprs) + .create_logical_tables(&create_table_exprs, ctx.clone()) .await; match res { diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index ad1e3de802..b8fcc4111e 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -175,13 +175,15 @@ impl StatementExecutor { .map_err(BoxedError::new) .context(error::ExternalSnafu)?; let table_name = TableName::new(catalog, schema, table); - self.drop_table(table_name, stmt.drop_if_exists()).await + self.drop_table(table_name, stmt.drop_if_exists(), query_ctx) + .await } Statement::DropDatabase(stmt) => { self.drop_database( query_ctx.current_catalog().to_string(), format_raw_object_name(stmt.name()), stmt.drop_if_exists(), + query_ctx, ) .await } @@ -191,13 +193,13 @@ impl StatementExecutor { .map_err(BoxedError::new) .context(error::ExternalSnafu)?; let table_name = TableName::new(catalog, schema, table); - self.truncate_table(table_name).await + self.truncate_table(table_name, query_ctx).await } Statement::CreateDatabase(stmt) => { self.create_database( - query_ctx.current_catalog(), &format_raw_object_name(&stmt.name), stmt.if_not_exists, + query_ctx, ) .await } diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 767b443eb4..599dbe5138 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -79,8 +79,8 @@ impl StatementExecutor { #[tracing::instrument(skip_all)] pub async fn create_table(&self, stmt: CreateTable, ctx: QueryContextRef) -> Result { - let create_expr = &mut expr_factory::create_to_expr(&stmt, ctx.clone())?; - self.create_table_inner(create_expr, stmt.partitions, &ctx) + let create_expr = &mut expr_factory::create_to_expr(&stmt, &ctx)?; + self.create_table_inner(create_expr, stmt.partitions, ctx) .await } @@ -120,8 +120,8 @@ impl StatementExecutor { } }); - let create_expr = &mut expr_factory::create_to_expr(&create_stmt, ctx.clone())?; - self.create_table_inner(create_expr, partitions, &ctx).await + let create_expr = &mut expr_factory::create_to_expr(&create_stmt, &ctx)?; + self.create_table_inner(create_expr, partitions, ctx).await } #[tracing::instrument(skip_all)] @@ -130,8 +130,8 @@ impl StatementExecutor { create_expr: CreateExternalTable, ctx: QueryContextRef, ) -> Result { - let create_expr = &mut expr_factory::create_external_expr(create_expr, ctx.clone()).await?; - self.create_table_inner(create_expr, None, &ctx).await + let create_expr = &mut expr_factory::create_external_expr(create_expr, &ctx).await?; + self.create_table_inner(create_expr, None, ctx).await } #[tracing::instrument(skip_all)] @@ -139,7 +139,7 @@ impl StatementExecutor { &self, create_table: &mut CreateTableExpr, partitions: Option, - query_ctx: &QueryContextRef, + query_ctx: QueryContextRef, ) -> Result { // Check if is creating logical table if create_table.engine == METRIC_ENGINE_NAME @@ -148,7 +148,7 @@ impl StatementExecutor { .contains_key(LOGICAL_TABLE_METADATA_KEY) { return self - .create_logical_tables(&[create_table.clone()]) + .create_logical_tables(&[create_table.clone()], query_ctx) .await? .into_iter() .next() @@ -213,14 +213,19 @@ impl StatementExecutor { &create_table.table_name, ); - let (partitions, partition_cols) = parse_partitions(create_table, partitions, query_ctx)?; + let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?; validate_partition_columns(create_table, &partition_cols)?; let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?; let resp = self - .create_table_procedure(create_table.clone(), partitions, table_info.clone()) + .create_table_procedure( + create_table.clone(), + partitions, + table_info.clone(), + query_ctx, + ) .await?; let table_id = resp.table_id.context(error::UnexpectedSnafu { @@ -242,6 +247,7 @@ impl StatementExecutor { pub async fn create_logical_tables( &self, create_table_exprs: &[CreateTableExpr], + query_context: QueryContextRef, ) -> Result> { let _timer = crate::metrics::DIST_CREATE_TABLES.start_timer(); ensure!( @@ -300,7 +306,9 @@ impl StatementExecutor { .zip(raw_tables_info.iter().cloned()) .collect::>(); - let resp = self.create_logical_tables_procedure(tables_data).await?; + let resp = self + .create_logical_tables_procedure(tables_data, query_context) + .await?; let table_ids = resp.table_ids; ensure!(table_ids.len() == raw_tables_info.len(), CreateLogicalTablesSnafu { @@ -326,17 +334,22 @@ impl StatementExecutor { pub async fn create_flow( &self, stmt: CreateFlow, - query_ctx: QueryContextRef, + query_context: QueryContextRef, ) -> Result { // TODO(ruihang): do some verification - let expr = expr_factory::to_create_flow_task_expr(stmt, query_ctx)?; + let expr = expr_factory::to_create_flow_task_expr(stmt, &query_context)?; - self.create_flow_procedure(expr).await?; + self.create_flow_procedure(expr, query_context).await?; Ok(Output::new_with_affected_rows(0)) } - async fn create_flow_procedure(&self, expr: CreateFlowTask) -> Result { + async fn create_flow_procedure( + &self, + expr: CreateFlowTask, + query_context: QueryContextRef, + ) -> Result { let request = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_create_flow(expr), }; @@ -347,7 +360,11 @@ impl StatementExecutor { } #[tracing::instrument(skip_all)] - pub async fn alter_logical_tables(&self, alter_table_exprs: Vec) -> Result { + pub async fn alter_logical_tables( + &self, + alter_table_exprs: Vec, + query_context: QueryContextRef, + ) -> Result { let _timer = crate::metrics::DIST_ALTER_TABLES.start_timer(); ensure!( !alter_table_exprs.is_empty(), @@ -356,14 +373,19 @@ impl StatementExecutor { } ); - self.alter_logical_tables_procedure(alter_table_exprs) + self.alter_logical_tables_procedure(alter_table_exprs, query_context) .await?; Ok(Output::new_with_affected_rows(0)) } #[tracing::instrument(skip_all)] - pub async fn drop_table(&self, table_name: TableName, drop_if_exists: bool) -> Result { + pub async fn drop_table( + &self, + table_name: TableName, + drop_if_exists: bool, + query_context: QueryContextRef, + ) -> Result { if let Some(table) = self .catalog_manager .table( @@ -375,7 +397,7 @@ impl StatementExecutor { .context(CatalogSnafu)? { let table_id = table.table_info().table_id(); - self.drop_table_procedure(&table_name, table_id, drop_if_exists) + self.drop_table_procedure(&table_name, table_id, drop_if_exists, query_context) .await?; // Invalidates local cache ASAP. @@ -408,6 +430,7 @@ impl StatementExecutor { catalog: String, schema: String, drop_if_exists: bool, + query_context: QueryContextRef, ) -> Result { if self .catalog_manager @@ -415,7 +438,7 @@ impl StatementExecutor { .await .context(CatalogSnafu)? { - self.drop_database_procedure(catalog, schema, drop_if_exists) + self.drop_database_procedure(catalog, schema, drop_if_exists, query_context) .await?; Ok(Output::new_with_affected_rows(0)) @@ -431,7 +454,11 @@ impl StatementExecutor { } #[tracing::instrument(skip_all)] - pub async fn truncate_table(&self, table_name: TableName) -> Result { + pub async fn truncate_table( + &self, + table_name: TableName, + query_context: QueryContextRef, + ) -> Result { let table = self .catalog_manager .table( @@ -445,7 +472,8 @@ impl StatementExecutor { table_name: table_name.to_string(), })?; let table_id = table.table_info().table_id(); - self.truncate_table_procedure(&table_name, table_id).await?; + self.truncate_table_procedure(&table_name, table_id, query_context) + .await?; Ok(Output::new_with_affected_rows(0)) } @@ -488,14 +516,18 @@ impl StatementExecutor { pub async fn alter_table( &self, alter_table: AlterTable, - query_ctx: QueryContextRef, + query_context: QueryContextRef, ) -> Result { - let expr = expr_factory::to_alter_expr(alter_table, query_ctx)?; - self.alter_table_inner(expr).await + let expr = expr_factory::to_alter_expr(alter_table, &query_context)?; + self.alter_table_inner(expr, query_context).await } #[tracing::instrument(skip_all)] - pub async fn alter_table_inner(&self, expr: AlterExpr) -> Result { + pub async fn alter_table_inner( + &self, + expr: AlterExpr, + query_context: QueryContextRef, + ) -> Result { let catalog_name = if expr.catalog_name.is_empty() { DEFAULT_CATALOG_NAME.to_string() } else { @@ -538,6 +570,7 @@ impl StatementExecutor { let (req, invalidate_keys) = if physical_table_id == table_id { // This is physical table let req = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_alter_table(expr), }; @@ -550,6 +583,7 @@ impl StatementExecutor { } else { // This is logical table let req = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_alter_logical_tables(vec![expr]), }; @@ -597,10 +631,12 @@ impl StatementExecutor { create_table: CreateTableExpr, partitions: Vec, table_info: RawTableInfo, + query_context: QueryContextRef, ) -> Result { let partitions = partitions.into_iter().map(Into::into).collect(); let request = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_create_table(create_table, partitions, table_info), }; @@ -613,8 +649,10 @@ impl StatementExecutor { async fn create_logical_tables_procedure( &self, tables_data: Vec<(CreateTableExpr, RawTableInfo)>, + query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_create_logical_tables(tables_data), }; @@ -627,8 +665,10 @@ impl StatementExecutor { async fn alter_logical_tables_procedure( &self, tables_data: Vec, + query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_alter_logical_tables(tables_data), }; @@ -643,8 +683,10 @@ impl StatementExecutor { table_name: &TableName, table_id: TableId, drop_if_exists: bool, + query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_drop_table( table_name.catalog_name.to_string(), table_name.schema_name.to_string(), @@ -665,8 +707,10 @@ impl StatementExecutor { catalog: String, schema: String, drop_if_exists: bool, + query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_drop_database(catalog, schema, drop_if_exists), }; @@ -680,8 +724,10 @@ impl StatementExecutor { &self, table_name: &TableName, table_id: TableId, + query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_truncate_table( table_name.catalog_name.to_string(), table_name.schema_name.to_string(), @@ -699,10 +745,11 @@ impl StatementExecutor { #[tracing::instrument(skip_all)] pub async fn create_database( &self, - catalog: &str, database: &str, create_if_not_exists: bool, + query_context: QueryContextRef, ) -> Result { + let catalog = query_context.current_catalog(); ensure!( NAME_PATTERN_REG.is_match(catalog), error::UnexpectedSnafu { @@ -727,6 +774,7 @@ impl StatementExecutor { catalog.to_string(), database.to_string(), create_if_not_exists, + query_context, ) .await?; @@ -743,8 +791,10 @@ impl StatementExecutor { catalog: String, database: String, create_if_not_exists: bool, + query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest { + query_context, task: DdlTask::new_create_database(catalog, database, create_if_not_exists, None), }; @@ -1099,7 +1149,7 @@ ENGINE=mito", .unwrap(); match &result[0] { Statement::CreateTable(c) => { - let expr = expr_factory::create_to_expr(c, QueryContext::arc()).unwrap(); + let expr = expr_factory::create_to_expr(c, &QueryContext::arc()).unwrap(); let (partitions, _) = parse_partitions(&expr, c.partitions.clone(), &ctx).unwrap(); let json = serde_json::to_string(&partitions).unwrap(); diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index fbc88da3b4..6845e26bc1 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -41,7 +41,8 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::EquivalenceProperties; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; -use greptime_proto::v1::region::{QueryContext, QueryRequest, RegionRequestHeader}; +use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader}; +use greptime_proto::v1::QueryContext; use meter_core::data::ReadItem; use meter_macros::read_meter; use session::context::QueryContextRef; diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 37f7e39c5f..592b56b2fb 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -96,6 +96,36 @@ impl From<&RegionRequestHeader> for QueryContext { } } +impl From for QueryContext { + fn from(ctx: api::v1::QueryContext) -> Self { + QueryContextBuilder::default() + .current_catalog(ctx.current_catalog) + .current_schema(ctx.current_schema) + .timezone(Arc::new(parse_timezone(Some(&ctx.timezone)))) + .extensions(ctx.extensions) + .build() + } +} + +impl From for api::v1::QueryContext { + fn from( + QueryContext { + current_catalog, + current_schema, + timezone, + extensions, + .. + }: QueryContext, + ) -> Self { + api::v1::QueryContext { + current_catalog, + current_schema, + timezone: timezone.to_string(), + extensions, + } + } +} + impl QueryContext { pub fn arc() -> QueryContextRef { Arc::new(QueryContextBuilder::default().build())