diff --git a/Cargo.lock b/Cargo.lock index 5a27c3dd2b..ae69448c6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2416,7 +2416,6 @@ dependencies = [ "humantime", "meta-client", "serde", - "session", "snafu 0.8.6", "tokio", "tonic 0.14.2", @@ -2653,7 +2652,6 @@ dependencies = [ "serde", "serde_json", "serde_with", - "session", "snafu 0.8.6", "sqlx", "store-api", @@ -8867,9 +8865,9 @@ checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad" [[package]] name = "oneshot" -version = "0.1.11" +version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4ce411919553d3f9fa53a0880544cda985a112117a0444d5ff1e870a893d6ea" +checksum = "269bca4c2591a28585d6bf10d9ed0332b7d76900a1b02bec41bdc3a2cdcda107" [[package]] name = "onig" @@ -9466,7 +9464,6 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "session", "snafu 0.8.6", "sql", "sqlparser", diff --git a/flake.nix b/flake.nix index 58c10465a0..5da5edef00 100644 --- a/flake.nix +++ b/flake.nix @@ -43,6 +43,7 @@ ]) cargo-nextest cargo-llvm-cov + cargo-udeps taplo curl gnuplot ## for cargo bench diff --git a/src/common/frontend/Cargo.toml b/src/common/frontend/Cargo.toml index de2224f36b..c775944286 100644 --- a/src/common/frontend/Cargo.toml +++ b/src/common/frontend/Cargo.toml @@ -17,7 +17,6 @@ greptime-proto.workspace = true humantime.workspace = true meta-client.workspace = true serde.workspace = true -session.workspace = true snafu.workspace = true tonic.workspace = true diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 7438a237b5..ec000c710d 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -75,7 +75,6 @@ rustls-pemfile = { version = "2.0", optional = true } serde.workspace = true serde_json.workspace = true serde_with.workspace = true -session.workspace = true snafu.workspace = true sqlx = { workspace = true, features = [ "mysql", diff --git a/src/common/meta/src/ddl/tests/create_flow.rs b/src/common/meta/src/ddl/tests/create_flow.rs index d073f5aa38..8803f98e0d 100644 --- a/src/common/meta/src/ddl/tests/create_flow.rs +++ b/src/common/meta/src/ddl/tests/create_flow.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_procedure_test::execute_procedure_until_done; -use session::context::QueryContext as SessionQueryContext; use table::table_name::TableName; use crate::ddl::DdlContext; @@ -31,6 +30,16 @@ use crate::key::table_route::TableRouteValue; use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext}; use crate::test_util::{MockFlownodeManager, new_ddl_context}; +fn test_query_context() -> QueryContext { + QueryContext { + current_catalog: DEFAULT_CATALOG_NAME.to_string(), + current_schema: DEFAULT_SCHEMA_NAME.to_string(), + timezone: "UTC".to_string(), + extensions: HashMap::new(), + channel: 0, + } +} + pub(crate) fn test_create_flow_task( name: &str, source_table_names: Vec, @@ -64,7 +73,7 @@ async fn test_create_flow_source_table_not_found() { let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false); let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler)); let ddl_context = new_ddl_context(node_manager); - let query_ctx = SessionQueryContext::arc().into(); + let query_ctx = test_query_context(); let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context); let err = procedure.on_prepare().await.unwrap_err(); assert_matches!(err, error::Error::TableNotFound { .. }); @@ -82,7 +91,7 @@ pub(crate) async fn create_test_flow( sink_table_name.clone(), false, ); - let query_ctx = SessionQueryContext::arc().into(); + let query_ctx = test_query_context(); let mut procedure = CreateFlowProcedure::new(task.clone(), query_ctx, ddl_context.clone()); let output = execute_procedure_until_done(&mut procedure).await.unwrap(); let flow_id = output.downcast_ref::().unwrap(); @@ -129,7 +138,7 @@ async fn test_create_flow() { sink_table_name.clone(), true, ); - let query_ctx = SessionQueryContext::arc().into(); + let query_ctx = test_query_context(); let mut procedure = CreateFlowProcedure::new(task.clone(), query_ctx, ddl_context.clone()); let output = execute_procedure_until_done(&mut procedure).await.unwrap(); let flow_id = output.downcast_ref::().unwrap(); @@ -137,7 +146,7 @@ async fn test_create_flow() { // Creates again let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false); - let query_ctx = SessionQueryContext::arc().into(); + let query_ctx = test_query_context(); let mut procedure = CreateFlowProcedure::new(task.clone(), query_ctx, ddl_context); let err = procedure.on_prepare().await.unwrap_err(); assert_matches!(err, error::Error::FlowAlreadyExists { .. }); @@ -169,7 +178,7 @@ async fn test_create_flow_same_source_and_sink_table() { // Try to create a flow with same source and sink table - should fail let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false); - let query_ctx = SessionQueryContext::arc().into(); + let query_ctx = test_query_context(); let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context); let err = procedure.on_prepare().await.unwrap_err(); assert_matches!(err, error::Error::Unsupported { .. }); @@ -219,7 +228,7 @@ fn test_create_flow_data_serialization_backward_compatibility() { "source_table_ids": [], "query_context": { "current_catalog": "old_catalog", - "current_schema": "old_schema", + "current_schema": "old_schema", "timezone": "UTC", "extensions": {}, "channel": 0 diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 4104f211f1..0106add32f 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -626,8 +626,7 @@ impl DdlManager { handle_alter_database_task(self, alter_database_task).await } CreateFlow(create_flow_task) => { - handle_create_flow_task(self, create_flow_task, request.query_context.into()) - .await + handle_create_flow_task(self, create_flow_task, request.query_context).await } DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await, CreateView(create_view_task) => { @@ -637,17 +636,12 @@ impl DdlManager { CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await, #[cfg(feature = "enterprise")] CreateTrigger(create_trigger_task) => { - handle_create_trigger_task( - self, - create_trigger_task, - request.query_context.into(), - ) - .await + handle_create_trigger_task(self, create_trigger_task, request.query_context) + .await } #[cfg(feature = "enterprise")] DropTrigger(drop_trigger_task) => { - handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into()) - .await + handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await } } } diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index f95eae7e56..c6613af828 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -867,13 +867,6 @@ pub enum Error { source: common_procedure::error::Error, }, - #[snafu(display("Failed to parse timezone"))] - InvalidTimeZone { - #[snafu(implicit)] - location: Location, - #[snafu(source)] - error: common_time::error::Error, - }, #[snafu(display("Invalid file path: {}", file_path))] InvalidFilePath { #[snafu(implicit)] @@ -1140,7 +1133,6 @@ impl ErrorExt for Error { | InvalidSetDatabaseOption { .. } | InvalidUnsetDatabaseOption { .. } | InvalidTopicNamePrefix { .. } - | InvalidTimeZone { .. } | InvalidFileExtension { .. } | InvalidFileName { .. } | InvalidFlowRequestBody { .. } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index e5293a78a3..1e94a8f092 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -42,11 +42,10 @@ use api::v1::{ use base64::Engine as _; use base64::engine::general_purpose; use common_error::ext::BoxedError; -use common_time::{DatabaseTimeToLive, Timestamp, Timezone}; +use common_time::{DatabaseTimeToLive, Timestamp}; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{DefaultOnNull, serde_as}; -use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use table::metadata::{TableId, TableInfo}; use table::requests::validate_database_option; @@ -55,7 +54,7 @@ use table::table_reference::TableReference; use crate::error::{ self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu, - InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result, + InvalidUnsetDatabaseOptionSnafu, Result, }; use crate::key::FlowId; @@ -293,7 +292,7 @@ impl TryFrom for DdlTask { #[derive(Clone)] pub struct SubmitDdlTaskRequest { - pub query_context: QueryContextRef, + pub query_context: QueryContext, pub wait: bool, pub timeout: Duration, pub task: DdlTask, @@ -301,7 +300,7 @@ pub struct SubmitDdlTaskRequest { impl SubmitDdlTaskRequest { /// The default constructor for [`SubmitDdlTaskRequest`]. - pub fn new(query_context: QueryContextRef, task: DdlTask) -> Self { + pub fn new(query_context: QueryContext, task: DdlTask) -> Self { Self { query_context, wait: Self::default_wait(), @@ -370,7 +369,7 @@ impl TryFrom for PbDdlTaskRequest { Ok(Self { header: None, - query_context: Some((*request.query_context).clone().into()), + query_context: Some(request.query_context.into()), timeout_secs: request.timeout.as_secs() as u32, wait: request.wait, task: Some(task), @@ -1426,13 +1425,13 @@ impl From for PbCommentOnTask { } } -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] pub struct QueryContext { - pub(crate) current_catalog: String, - pub(crate) current_schema: String, - pub(crate) timezone: String, - pub(crate) extensions: HashMap, - pub(crate) channel: u8, + pub current_catalog: String, + pub current_schema: String, + pub timezone: String, + pub extensions: HashMap, + pub channel: u8, } impl QueryContext { @@ -1468,11 +1467,11 @@ impl QueryContext { #[derive(Debug, Clone, Serialize, PartialEq)] pub struct FlowQueryContext { /// Current catalog name - needed for flow metadata and recovery - pub(crate) catalog: String, + pub catalog: String, /// Current schema name - needed for table resolution during flow execution - pub(crate) schema: String, + pub schema: String, /// Timezone for timestamp operations in the flow - pub(crate) timezone: String, + pub timezone: String, } impl<'de> Deserialize<'de> for FlowQueryContext { @@ -1506,31 +1505,18 @@ impl<'de> Deserialize<'de> for FlowQueryContext { } } -impl From for QueryContext { - fn from(query_context: QueryContextRef) -> Self { - QueryContext { - current_catalog: query_context.current_catalog().to_string(), - current_schema: query_context.current_schema().clone(), - timezone: query_context.timezone().to_string(), - extensions: query_context.extensions(), - channel: query_context.channel() as u8, +impl From for QueryContext { + fn from(pb_ctx: PbQueryContext) -> Self { + Self { + current_catalog: pb_ctx.current_catalog, + current_schema: pb_ctx.current_schema, + timezone: pb_ctx.timezone, + extensions: pb_ctx.extensions, + channel: pb_ctx.channel as u8, } } } -impl TryFrom for session::context::QueryContext { - type Error = error::Error; - fn try_from(value: QueryContext) -> std::result::Result { - Ok(QueryContextBuilder::default() - .current_catalog(value.current_catalog) - .current_schema(value.current_schema) - .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?) - .extensions(value.extensions) - .channel((value.channel as u32).into()) - .build()) - } -} - impl From for PbQueryContext { fn from( QueryContext { @@ -1563,16 +1549,6 @@ impl From for FlowQueryContext { } } -impl From for FlowQueryContext { - fn from(ctx: QueryContextRef) -> Self { - Self { - catalog: ctx.current_catalog().to_string(), - schema: ctx.current_schema().clone(), - timezone: ctx.timezone().to_string(), - } - } -} - impl From for QueryContext { fn from(flow_ctx: FlowQueryContext) -> Self { Self { @@ -1774,25 +1750,6 @@ mod tests { assert_eq!(flow_ctx, flow_ctx_roundtrip); } - #[test] - fn test_flow_query_context_conversion_from_query_context_ref() { - use common_time::Timezone; - use session::context::QueryContextBuilder; - - let session_ctx = QueryContextBuilder::default() - .current_catalog("session_catalog".to_string()) - .current_schema("session_schema".to_string()) - .timezone(Timezone::from_tz_string("Europe/London").unwrap()) - .build(); - - let session_ctx_ref = Arc::new(session_ctx); - let flow_ctx: FlowQueryContext = session_ctx_ref.into(); - - assert_eq!(flow_ctx.catalog, "session_catalog"); - assert_eq!(flow_ctx.schema, "session_schema"); - assert_eq!(flow_ctx.timezone, "Europe/London"); - } - #[test] fn test_flow_query_context_serialization() { let flow_ctx = FlowQueryContext { diff --git a/src/flow/src/adapter/flownode_impl.rs b/src/flow/src/adapter/flownode_impl.rs index b876c0c9e5..1391bf2374 100644 --- a/src/flow/src/adapter/flownode_impl.rs +++ b/src/flow/src/adapter/flownode_impl.rs @@ -33,6 +33,7 @@ use common_telemetry::{error, info, trace, warn}; use datatypes::value::Value; use futures::TryStreamExt; use itertools::Itertools; +use operator::utils::try_to_session_query_context; use session::context::QueryContextBuilder; use snafu::{IntoError, OptionExt, ResultExt, ensure}; use store_api::storage::{RegionId, TableId}; @@ -325,7 +326,7 @@ impl FlowDualEngine { .query_context() .clone() .map(|ctx| { - ctx.try_into() + try_to_session_query_context(ctx) .map_err(BoxedError::new) .context(ExternalSnafu) }) diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index 0a4c6bde88..f813aebe20 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -340,10 +340,11 @@ mod tests { }; use async_trait::async_trait; use common_error::status_code::StatusCode; - use common_meta::rpc::ddl::{CommentObjectType, CommentOnTask, DdlTask, SubmitDdlTaskRequest}; + use common_meta::rpc::ddl::{ + CommentObjectType, CommentOnTask, DdlTask, QueryContext, SubmitDdlTaskRequest, + }; use common_telemetry::common_error::ext::ErrorExt; use common_telemetry::info; - use session::context::QueryContext; use tokio::net::TcpListener; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use tonic::codec::CompressionEncoding; @@ -468,7 +469,7 @@ mod tests { client.start(&[addr_str.as_str()]).await.unwrap(); let mut request = SubmitDdlTaskRequest::new( - QueryContext::arc(), + QueryContext::default(), DdlTask::new_comment_on(CommentOnTask { catalog_name: "greptime".to_string(), schema_name: "public".to_string(), diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 20dd0ed1ae..90fb66f39c 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; use std::time::Duration; use api::v1::meta::reconcile_request::Target; @@ -97,7 +96,7 @@ impl procedure_service_server::ProcedureService for Metasrv { tracing_context: Some(header.tracing_context), }, SubmitDdlTaskRequest { - query_context: Arc::new(query_context), + query_context, wait, timeout: Duration::from_secs(timeout_secs.into()), task, diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index b4acc0f363..a5a3e9b0be 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -849,6 +849,15 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid timezone: {}", timezone))] + InvalidTimezone { + timezone: String, + #[snafu(source)] + source: common_time::error::Error, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Invalid process id: {}", id))] InvalidProcessId { id: String }, @@ -1031,7 +1040,9 @@ impl ErrorExt for Error { Error::ColumnOptions { source, .. } => source.status_code(), Error::DecodeFlightData { source, .. } => source.status_code(), Error::ComputeArrow { .. } => StatusCode::Internal, - Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments, + Error::InvalidTimeIndexType { .. } | Error::InvalidTimezone { .. } => { + StatusCode::InvalidArguments + } Error::InvalidProcessId { .. } => StatusCode::InvalidArguments, Error::ProcessManagerMissing { .. } => StatusCode::Unexpected, Error::PathNotFound { .. } => StatusCode::InvalidArguments, diff --git a/src/operator/src/flow.rs b/src/operator/src/flow.rs index 306dd83df1..1642b704b6 100644 --- a/src/operator/src/flow.rs +++ b/src/operator/src/flow.rs @@ -25,6 +25,8 @@ use futures::stream::FuturesUnordered; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; +use crate::utils::to_meta_query_context; + /// The operator for flow service which implements [`FlowServiceHandler`]. pub struct FlowServiceOperator { flow_metadata_manager: FlowMetadataManagerRef, @@ -105,9 +107,7 @@ impl FlowServiceOperator { let flush_req = FlowRequest { header: Some(FlowRequestHeader { tracing_context: TracingContext::from_current_span().to_w3c(), - query_context: Some( - common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(), - ), + query_context: Some(to_meta_query_context(ctx.clone()).into()), }), body: Some(flow_request::Body::Flush(FlushFlow { flow_id: Some(api::v1::FlowId { id }), diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs index 783501c6f0..e31ad87ca1 100644 --- a/src/operator/src/lib.rs +++ b/src/operator/src/lib.rs @@ -30,3 +30,4 @@ pub mod statement; pub mod table; #[cfg(test)] pub(crate) mod tests; +pub mod utils; diff --git a/src/operator/src/statement/comment.rs b/src/operator/src/statement/comment.rs index bf894a50c8..2a5c48ffbb 100644 --- a/src/operator/src/statement/comment.rs +++ b/src/operator/src/statement/comment.rs @@ -25,6 +25,7 @@ use sql::statements::comment::{Comment, CommentObject}; use crate::error::{ExecuteDdlSnafu, ExternalSnafu, InvalidSqlSnafu, Result}; use crate::statement::StatementExecutor; +use crate::utils::to_meta_query_context; impl StatementExecutor { /// Adds a comment to a database object (table, column, or flow). @@ -40,8 +41,10 @@ impl StatementExecutor { pub async fn comment(&self, stmt: Comment, query_ctx: QueryContextRef) -> Result { let comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?; - let request = - SubmitDdlTaskRequest::new(query_ctx, DdlTask::new_comment_on(comment_on_task)); + let request = SubmitDdlTaskRequest::new( + to_meta_query_context(query_ctx), + DdlTask::new_comment_on(comment_on_task), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -57,8 +60,10 @@ impl StatementExecutor { ) -> Result { let comment_on_task = self.create_comment_on_task_from_expr(expr)?; - let request = - SubmitDdlTaskRequest::new(query_ctx, DdlTask::new_comment_on(comment_on_task)); + let request = SubmitDdlTaskRequest::new( + to_meta_query_context(query_ctx), + DdlTask::new_comment_on(comment_on_task), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index 3384aaa097..2114ccd5e1 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -104,6 +104,7 @@ use crate::error::{ use crate::expr_helper::{self, RepartitionRequest}; use crate::statement::StatementExecutor; use crate::statement::show::create_partitions_stmt; +use crate::utils::to_meta_query_context; #[derive(Debug, Clone, Copy)] struct DdlSubmitOptions { @@ -590,7 +591,10 @@ impl StatementExecutor { }) .context(error::InvalidExprSnafu)?; - let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_trigger(task)); + let request = SubmitDdlTaskRequest::new( + to_meta_query_context(query_context), + DdlTask::new_create_trigger(task), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -640,7 +644,10 @@ impl StatementExecutor { create_flow: Some(expr), }) .context(error::InvalidExprSnafu)?; - let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_flow(task)); + let request = SubmitDdlTaskRequest::new( + to_meta_query_context(query_context), + DdlTask::new_create_flow(task), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -935,8 +942,10 @@ impl StatementExecutor { table_type: TableType::View, }; - let request = - SubmitDdlTaskRequest::new(ctx, DdlTask::new_create_view(expr, view_info.clone())); + let request = SubmitDdlTaskRequest::new( + to_meta_query_context(ctx), + DdlTask::new_create_view(expr, view_info.clone()), + ); let resp = self .procedure_executor @@ -1019,7 +1028,10 @@ impl StatementExecutor { expr: DropFlowTask, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_flow(expr)); + let request = SubmitDdlTaskRequest::new( + to_meta_query_context(query_context), + DdlTask::new_drop_flow(expr), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1051,7 +1063,10 @@ impl StatementExecutor { expr: DropTriggerTask, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_trigger(expr)); + let request = SubmitDdlTaskRequest::new( + to_meta_query_context(query_context), + DdlTask::new_drop_trigger(expr), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1116,7 +1131,10 @@ impl StatementExecutor { expr: DropViewTask, query_context: QueryContextRef, ) -> Result { - let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_view(expr)); + let request = SubmitDdlTaskRequest::new( + to_meta_query_context(query_context), + DdlTask::new_drop_view(expr), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1499,7 +1517,7 @@ impl StatementExecutor { let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?; let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?; let mut req = SubmitDdlTaskRequest::new( - query_context.clone(), + to_meta_query_context(query_context.clone()), DdlTask::new_alter_table(AlterTableExpr { catalog_name: request.catalog_name.clone(), schema_name: request.schema_name.clone(), @@ -1613,7 +1631,10 @@ impl StatementExecutor { let (req, invalidate_keys) = if physical_table_id == table_id { // This is physical table - let req = SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_table(expr)); + let req = SubmitDdlTaskRequest::new( + to_meta_query_context(query_context), + DdlTask::new_alter_table(expr), + ); let invalidate_keys = vec![ CacheIdent::TableId(table_id), @@ -1624,7 +1645,7 @@ impl StatementExecutor { } else { // This is logical table let req = SubmitDdlTaskRequest::new( - query_context, + to_meta_query_context(query_context), DdlTask::new_alter_logical_tables(vec![expr]), ); @@ -1745,7 +1766,7 @@ impl StatementExecutor { .collect::>>()?; let request = SubmitDdlTaskRequest::new( - query_context, + to_meta_query_context(query_context), DdlTask::new_create_table(create_table, partitions, table_info), ); @@ -1761,7 +1782,7 @@ impl StatementExecutor { query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest::new( - query_context, + to_meta_query_context(query_context), DdlTask::new_create_logical_tables(tables_data), ); @@ -1777,7 +1798,7 @@ impl StatementExecutor { query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest::new( - query_context, + to_meta_query_context(query_context), DdlTask::new_alter_logical_tables(tables_data), ); @@ -1795,7 +1816,7 @@ impl StatementExecutor { query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest::new( - query_context, + to_meta_query_context(query_context), DdlTask::new_drop_table( table_name.catalog_name.clone(), table_name.schema_name.clone(), @@ -1819,7 +1840,7 @@ impl StatementExecutor { query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest::new( - query_context, + to_meta_query_context(query_context), DdlTask::new_drop_database(catalog, schema, drop_if_exists), ); @@ -1834,8 +1855,10 @@ impl StatementExecutor { alter_expr: AlterDatabaseExpr, query_context: QueryContextRef, ) -> Result { - let request = - SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_database(alter_expr)); + let request = SubmitDdlTaskRequest::new( + to_meta_query_context(query_context), + DdlTask::new_alter_database(alter_expr), + ); self.procedure_executor .submit_ddl_task(&ExecutorContext::default(), request) @@ -1851,7 +1874,7 @@ impl StatementExecutor { query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest::new( - query_context, + to_meta_query_context(query_context), DdlTask::new_truncate_table( table_name.catalog_name.clone(), table_name.schema_name.clone(), @@ -1923,7 +1946,7 @@ impl StatementExecutor { query_context: QueryContextRef, ) -> Result { let request = SubmitDdlTaskRequest::new( - query_context, + to_meta_query_context(query_context), DdlTask::new_create_database(catalog, database, create_if_not_exists, options), ); diff --git a/src/operator/src/utils.rs b/src/operator/src/utils.rs new file mode 100644 index 0000000000..93da5f028e --- /dev/null +++ b/src/operator/src/utils.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_time::Timezone; +use session::context::{QueryContextBuilder, QueryContextRef}; +use snafu::ResultExt; + +use crate::error::{Error, InvalidTimezoneSnafu}; + +pub fn to_meta_query_context( + query_context: QueryContextRef, +) -> common_meta::rpc::ddl::QueryContext { + common_meta::rpc::ddl::QueryContext { + current_catalog: query_context.current_catalog().to_string(), + current_schema: query_context.current_schema().clone(), + timezone: query_context.timezone().to_string(), + extensions: query_context.extensions(), + channel: query_context.channel() as u8, + } +} + +pub fn try_to_session_query_context( + value: common_meta::rpc::ddl::QueryContext, +) -> Result { + Ok(QueryContextBuilder::default() + .current_catalog(value.current_catalog) + .current_schema(value.current_schema) + .timezone( + Timezone::from_tz_string(&value.timezone).context(InvalidTimezoneSnafu { + timezone: value.timezone, + })?, + ) + .extensions(value.extensions) + .channel((value.channel as u32).into()) + .build()) +} diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index 048b96ca9e..4979f655ca 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -23,7 +23,6 @@ itertools.workspace = true moka = { workspace = true, features = ["future"] } serde.workspace = true serde_json.workspace = true -session.workspace = true snafu.workspace = true sql.workspace = true sqlparser.workspace = true