mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-15 20:40:39 +00:00
refactor: remove session from common meta (#7698)
* refactor: remove session dependency from common-meta * chore: add udeps * chore: format * fix: lint issues * chore: update oneshot * chore: update unused deps
This commit is contained in:
7
Cargo.lock
generated
7
Cargo.lock
generated
@@ -2416,7 +2416,6 @@ dependencies = [
|
||||
"humantime",
|
||||
"meta-client",
|
||||
"serde",
|
||||
"session",
|
||||
"snafu 0.8.6",
|
||||
"tokio",
|
||||
"tonic 0.14.2",
|
||||
@@ -2653,7 +2652,6 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_with",
|
||||
"session",
|
||||
"snafu 0.8.6",
|
||||
"sqlx",
|
||||
"store-api",
|
||||
@@ -8867,9 +8865,9 @@ checksum = "a4895175b425cb1f87721b59f0f286c2092bd4af812243672510e1ac53e2e0ad"
|
||||
|
||||
[[package]]
|
||||
name = "oneshot"
|
||||
version = "0.1.11"
|
||||
version = "0.1.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b4ce411919553d3f9fa53a0880544cda985a112117a0444d5ff1e870a893d6ea"
|
||||
checksum = "269bca4c2591a28585d6bf10d9ed0332b7d76900a1b02bec41bdc3a2cdcda107"
|
||||
|
||||
[[package]]
|
||||
name = "onig"
|
||||
@@ -9466,7 +9464,6 @@ dependencies = [
|
||||
"rand 0.8.5",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"session",
|
||||
"snafu 0.8.6",
|
||||
"sql",
|
||||
"sqlparser",
|
||||
|
||||
@@ -43,6 +43,7 @@
|
||||
])
|
||||
cargo-nextest
|
||||
cargo-llvm-cov
|
||||
cargo-udeps
|
||||
taplo
|
||||
curl
|
||||
gnuplot ## for cargo bench
|
||||
|
||||
@@ -17,7 +17,6 @@ greptime-proto.workspace = true
|
||||
humantime.workspace = true
|
||||
meta-client.workspace = true
|
||||
serde.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
tonic.workspace = true
|
||||
|
||||
|
||||
@@ -75,7 +75,6 @@ rustls-pemfile = { version = "2.0", optional = true }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_with.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
sqlx = { workspace = true, features = [
|
||||
"mysql",
|
||||
|
||||
@@ -18,7 +18,6 @@ use std::sync::Arc;
|
||||
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_procedure_test::execute_procedure_until_done;
|
||||
use session::context::QueryContext as SessionQueryContext;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::ddl::DdlContext;
|
||||
@@ -31,6 +30,16 @@ use crate::key::table_route::TableRouteValue;
|
||||
use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
|
||||
use crate::test_util::{MockFlownodeManager, new_ddl_context};
|
||||
|
||||
fn test_query_context() -> QueryContext {
|
||||
QueryContext {
|
||||
current_catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
current_schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
timezone: "UTC".to_string(),
|
||||
extensions: HashMap::new(),
|
||||
channel: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn test_create_flow_task(
|
||||
name: &str,
|
||||
source_table_names: Vec<TableName>,
|
||||
@@ -64,7 +73,7 @@ async fn test_create_flow_source_table_not_found() {
|
||||
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
|
||||
let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler));
|
||||
let ddl_context = new_ddl_context(node_manager);
|
||||
let query_ctx = SessionQueryContext::arc().into();
|
||||
let query_ctx = test_query_context();
|
||||
let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, error::Error::TableNotFound { .. });
|
||||
@@ -82,7 +91,7 @@ pub(crate) async fn create_test_flow(
|
||||
sink_table_name.clone(),
|
||||
false,
|
||||
);
|
||||
let query_ctx = SessionQueryContext::arc().into();
|
||||
let query_ctx = test_query_context();
|
||||
let mut procedure = CreateFlowProcedure::new(task.clone(), query_ctx, ddl_context.clone());
|
||||
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
|
||||
let flow_id = output.downcast_ref::<FlowId>().unwrap();
|
||||
@@ -129,7 +138,7 @@ async fn test_create_flow() {
|
||||
sink_table_name.clone(),
|
||||
true,
|
||||
);
|
||||
let query_ctx = SessionQueryContext::arc().into();
|
||||
let query_ctx = test_query_context();
|
||||
let mut procedure = CreateFlowProcedure::new(task.clone(), query_ctx, ddl_context.clone());
|
||||
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
|
||||
let flow_id = output.downcast_ref::<FlowId>().unwrap();
|
||||
@@ -137,7 +146,7 @@ async fn test_create_flow() {
|
||||
|
||||
// Creates again
|
||||
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
|
||||
let query_ctx = SessionQueryContext::arc().into();
|
||||
let query_ctx = test_query_context();
|
||||
let mut procedure = CreateFlowProcedure::new(task.clone(), query_ctx, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, error::Error::FlowAlreadyExists { .. });
|
||||
@@ -169,7 +178,7 @@ async fn test_create_flow_same_source_and_sink_table() {
|
||||
|
||||
// Try to create a flow with same source and sink table - should fail
|
||||
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
|
||||
let query_ctx = SessionQueryContext::arc().into();
|
||||
let query_ctx = test_query_context();
|
||||
let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context);
|
||||
let err = procedure.on_prepare().await.unwrap_err();
|
||||
assert_matches!(err, error::Error::Unsupported { .. });
|
||||
@@ -219,7 +228,7 @@ fn test_create_flow_data_serialization_backward_compatibility() {
|
||||
"source_table_ids": [],
|
||||
"query_context": {
|
||||
"current_catalog": "old_catalog",
|
||||
"current_schema": "old_schema",
|
||||
"current_schema": "old_schema",
|
||||
"timezone": "UTC",
|
||||
"extensions": {},
|
||||
"channel": 0
|
||||
|
||||
@@ -626,8 +626,7 @@ impl DdlManager {
|
||||
handle_alter_database_task(self, alter_database_task).await
|
||||
}
|
||||
CreateFlow(create_flow_task) => {
|
||||
handle_create_flow_task(self, create_flow_task, request.query_context.into())
|
||||
.await
|
||||
handle_create_flow_task(self, create_flow_task, request.query_context).await
|
||||
}
|
||||
DropFlow(drop_flow_task) => handle_drop_flow_task(self, drop_flow_task).await,
|
||||
CreateView(create_view_task) => {
|
||||
@@ -637,17 +636,12 @@ impl DdlManager {
|
||||
CommentOn(comment_on_task) => handle_comment_on_task(self, comment_on_task).await,
|
||||
#[cfg(feature = "enterprise")]
|
||||
CreateTrigger(create_trigger_task) => {
|
||||
handle_create_trigger_task(
|
||||
self,
|
||||
create_trigger_task,
|
||||
request.query_context.into(),
|
||||
)
|
||||
.await
|
||||
handle_create_trigger_task(self, create_trigger_task, request.query_context)
|
||||
.await
|
||||
}
|
||||
#[cfg(feature = "enterprise")]
|
||||
DropTrigger(drop_trigger_task) => {
|
||||
handle_drop_trigger_task(self, drop_trigger_task, request.query_context.into())
|
||||
.await
|
||||
handle_drop_trigger_task(self, drop_trigger_task, request.query_context).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -867,13 +867,6 @@ pub enum Error {
|
||||
source: common_procedure::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse timezone"))]
|
||||
InvalidTimeZone {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: common_time::error::Error,
|
||||
},
|
||||
#[snafu(display("Invalid file path: {}", file_path))]
|
||||
InvalidFilePath {
|
||||
#[snafu(implicit)]
|
||||
@@ -1140,7 +1133,6 @@ impl ErrorExt for Error {
|
||||
| InvalidSetDatabaseOption { .. }
|
||||
| InvalidUnsetDatabaseOption { .. }
|
||||
| InvalidTopicNamePrefix { .. }
|
||||
| InvalidTimeZone { .. }
|
||||
| InvalidFileExtension { .. }
|
||||
| InvalidFileName { .. }
|
||||
| InvalidFlowRequestBody { .. }
|
||||
|
||||
@@ -42,11 +42,10 @@ use api::v1::{
|
||||
use base64::Engine as _;
|
||||
use base64::engine::general_purpose;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_time::{DatabaseTimeToLive, Timestamp, Timezone};
|
||||
use common_time::{DatabaseTimeToLive, Timestamp};
|
||||
use prost::Message;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_with::{DefaultOnNull, serde_as};
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use table::metadata::{TableId, TableInfo};
|
||||
use table::requests::validate_database_option;
|
||||
@@ -55,7 +54,7 @@ use table::table_reference::TableReference;
|
||||
|
||||
use crate::error::{
|
||||
self, ConvertTimeRangesSnafu, ExternalSnafu, InvalidSetDatabaseOptionSnafu,
|
||||
InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, Result,
|
||||
InvalidUnsetDatabaseOptionSnafu, Result,
|
||||
};
|
||||
use crate::key::FlowId;
|
||||
|
||||
@@ -293,7 +292,7 @@ impl TryFrom<Task> for DdlTask {
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SubmitDdlTaskRequest {
|
||||
pub query_context: QueryContextRef,
|
||||
pub query_context: QueryContext,
|
||||
pub wait: bool,
|
||||
pub timeout: Duration,
|
||||
pub task: DdlTask,
|
||||
@@ -301,7 +300,7 @@ pub struct SubmitDdlTaskRequest {
|
||||
|
||||
impl SubmitDdlTaskRequest {
|
||||
/// The default constructor for [`SubmitDdlTaskRequest`].
|
||||
pub fn new(query_context: QueryContextRef, task: DdlTask) -> Self {
|
||||
pub fn new(query_context: QueryContext, task: DdlTask) -> Self {
|
||||
Self {
|
||||
query_context,
|
||||
wait: Self::default_wait(),
|
||||
@@ -370,7 +369,7 @@ impl TryFrom<SubmitDdlTaskRequest> for PbDdlTaskRequest {
|
||||
|
||||
Ok(Self {
|
||||
header: None,
|
||||
query_context: Some((*request.query_context).clone().into()),
|
||||
query_context: Some(request.query_context.into()),
|
||||
timeout_secs: request.timeout.as_secs() as u32,
|
||||
wait: request.wait,
|
||||
task: Some(task),
|
||||
@@ -1426,13 +1425,13 @@ impl From<CommentOnTask> for PbCommentOnTask {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
|
||||
pub struct QueryContext {
|
||||
pub(crate) current_catalog: String,
|
||||
pub(crate) current_schema: String,
|
||||
pub(crate) timezone: String,
|
||||
pub(crate) extensions: HashMap<String, String>,
|
||||
pub(crate) channel: u8,
|
||||
pub current_catalog: String,
|
||||
pub current_schema: String,
|
||||
pub timezone: String,
|
||||
pub extensions: HashMap<String, String>,
|
||||
pub channel: u8,
|
||||
}
|
||||
|
||||
impl QueryContext {
|
||||
@@ -1468,11 +1467,11 @@ impl QueryContext {
|
||||
#[derive(Debug, Clone, Serialize, PartialEq)]
|
||||
pub struct FlowQueryContext {
|
||||
/// Current catalog name - needed for flow metadata and recovery
|
||||
pub(crate) catalog: String,
|
||||
pub catalog: String,
|
||||
/// Current schema name - needed for table resolution during flow execution
|
||||
pub(crate) schema: String,
|
||||
pub schema: String,
|
||||
/// Timezone for timestamp operations in the flow
|
||||
pub(crate) timezone: String,
|
||||
pub timezone: String,
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for FlowQueryContext {
|
||||
@@ -1506,31 +1505,18 @@ impl<'de> Deserialize<'de> for FlowQueryContext {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<QueryContextRef> for QueryContext {
|
||||
fn from(query_context: QueryContextRef) -> Self {
|
||||
QueryContext {
|
||||
current_catalog: query_context.current_catalog().to_string(),
|
||||
current_schema: query_context.current_schema().clone(),
|
||||
timezone: query_context.timezone().to_string(),
|
||||
extensions: query_context.extensions(),
|
||||
channel: query_context.channel() as u8,
|
||||
impl From<PbQueryContext> for QueryContext {
|
||||
fn from(pb_ctx: PbQueryContext) -> Self {
|
||||
Self {
|
||||
current_catalog: pb_ctx.current_catalog,
|
||||
current_schema: pb_ctx.current_schema,
|
||||
timezone: pb_ctx.timezone,
|
||||
extensions: pb_ctx.extensions,
|
||||
channel: pb_ctx.channel as u8,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<QueryContext> for session::context::QueryContext {
|
||||
type Error = error::Error;
|
||||
fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
|
||||
Ok(QueryContextBuilder::default()
|
||||
.current_catalog(value.current_catalog)
|
||||
.current_schema(value.current_schema)
|
||||
.timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
|
||||
.extensions(value.extensions)
|
||||
.channel((value.channel as u32).into())
|
||||
.build())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<QueryContext> for PbQueryContext {
|
||||
fn from(
|
||||
QueryContext {
|
||||
@@ -1563,16 +1549,6 @@ impl From<QueryContext> for FlowQueryContext {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<QueryContextRef> for FlowQueryContext {
|
||||
fn from(ctx: QueryContextRef) -> Self {
|
||||
Self {
|
||||
catalog: ctx.current_catalog().to_string(),
|
||||
schema: ctx.current_schema().clone(),
|
||||
timezone: ctx.timezone().to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<FlowQueryContext> for QueryContext {
|
||||
fn from(flow_ctx: FlowQueryContext) -> Self {
|
||||
Self {
|
||||
@@ -1774,25 +1750,6 @@ mod tests {
|
||||
assert_eq!(flow_ctx, flow_ctx_roundtrip);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_flow_query_context_conversion_from_query_context_ref() {
|
||||
use common_time::Timezone;
|
||||
use session::context::QueryContextBuilder;
|
||||
|
||||
let session_ctx = QueryContextBuilder::default()
|
||||
.current_catalog("session_catalog".to_string())
|
||||
.current_schema("session_schema".to_string())
|
||||
.timezone(Timezone::from_tz_string("Europe/London").unwrap())
|
||||
.build();
|
||||
|
||||
let session_ctx_ref = Arc::new(session_ctx);
|
||||
let flow_ctx: FlowQueryContext = session_ctx_ref.into();
|
||||
|
||||
assert_eq!(flow_ctx.catalog, "session_catalog");
|
||||
assert_eq!(flow_ctx.schema, "session_schema");
|
||||
assert_eq!(flow_ctx.timezone, "Europe/London");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_flow_query_context_serialization() {
|
||||
let flow_ctx = FlowQueryContext {
|
||||
|
||||
@@ -33,6 +33,7 @@ use common_telemetry::{error, info, trace, warn};
|
||||
use datatypes::value::Value;
|
||||
use futures::TryStreamExt;
|
||||
use itertools::Itertools;
|
||||
use operator::utils::try_to_session_query_context;
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{IntoError, OptionExt, ResultExt, ensure};
|
||||
use store_api::storage::{RegionId, TableId};
|
||||
@@ -325,7 +326,7 @@ impl FlowDualEngine {
|
||||
.query_context()
|
||||
.clone()
|
||||
.map(|ctx| {
|
||||
ctx.try_into()
|
||||
try_to_session_query_context(ctx)
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)
|
||||
})
|
||||
|
||||
@@ -340,10 +340,11 @@ mod tests {
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_meta::rpc::ddl::{CommentObjectType, CommentOnTask, DdlTask, SubmitDdlTaskRequest};
|
||||
use common_meta::rpc::ddl::{
|
||||
CommentObjectType, CommentOnTask, DdlTask, QueryContext, SubmitDdlTaskRequest,
|
||||
};
|
||||
use common_telemetry::common_error::ext::ErrorExt;
|
||||
use common_telemetry::info;
|
||||
use session::context::QueryContext;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
|
||||
use tonic::codec::CompressionEncoding;
|
||||
@@ -468,7 +469,7 @@ mod tests {
|
||||
client.start(&[addr_str.as_str()]).await.unwrap();
|
||||
|
||||
let mut request = SubmitDdlTaskRequest::new(
|
||||
QueryContext::arc(),
|
||||
QueryContext::default(),
|
||||
DdlTask::new_comment_on(CommentOnTask {
|
||||
catalog_name: "greptime".to_string(),
|
||||
schema_name: "public".to_string(),
|
||||
|
||||
@@ -12,7 +12,6 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::reconcile_request::Target;
|
||||
@@ -97,7 +96,7 @@ impl procedure_service_server::ProcedureService for Metasrv {
|
||||
tracing_context: Some(header.tracing_context),
|
||||
},
|
||||
SubmitDdlTaskRequest {
|
||||
query_context: Arc::new(query_context),
|
||||
query_context,
|
||||
wait,
|
||||
timeout: Duration::from_secs(timeout_secs.into()),
|
||||
task,
|
||||
|
||||
@@ -849,6 +849,15 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid timezone: {}", timezone))]
|
||||
InvalidTimezone {
|
||||
timezone: String,
|
||||
#[snafu(source)]
|
||||
source: common_time::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid process id: {}", id))]
|
||||
InvalidProcessId { id: String },
|
||||
|
||||
@@ -1031,7 +1040,9 @@ impl ErrorExt for Error {
|
||||
Error::ColumnOptions { source, .. } => source.status_code(),
|
||||
Error::DecodeFlightData { source, .. } => source.status_code(),
|
||||
Error::ComputeArrow { .. } => StatusCode::Internal,
|
||||
Error::InvalidTimeIndexType { .. } => StatusCode::InvalidArguments,
|
||||
Error::InvalidTimeIndexType { .. } | Error::InvalidTimezone { .. } => {
|
||||
StatusCode::InvalidArguments
|
||||
}
|
||||
Error::InvalidProcessId { .. } => StatusCode::InvalidArguments,
|
||||
Error::ProcessManagerMissing { .. } => StatusCode::Unexpected,
|
||||
Error::PathNotFound { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
@@ -25,6 +25,8 @@ use futures::stream::FuturesUnordered;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::utils::to_meta_query_context;
|
||||
|
||||
/// The operator for flow service which implements [`FlowServiceHandler`].
|
||||
pub struct FlowServiceOperator {
|
||||
flow_metadata_manager: FlowMetadataManagerRef,
|
||||
@@ -105,9 +107,7 @@ impl FlowServiceOperator {
|
||||
let flush_req = FlowRequest {
|
||||
header: Some(FlowRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
query_context: Some(
|
||||
common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
|
||||
),
|
||||
query_context: Some(to_meta_query_context(ctx.clone()).into()),
|
||||
}),
|
||||
body: Some(flow_request::Body::Flush(FlushFlow {
|
||||
flow_id: Some(api::v1::FlowId { id }),
|
||||
|
||||
@@ -30,3 +30,4 @@ pub mod statement;
|
||||
pub mod table;
|
||||
#[cfg(test)]
|
||||
pub(crate) mod tests;
|
||||
pub mod utils;
|
||||
|
||||
@@ -25,6 +25,7 @@ use sql::statements::comment::{Comment, CommentObject};
|
||||
|
||||
use crate::error::{ExecuteDdlSnafu, ExternalSnafu, InvalidSqlSnafu, Result};
|
||||
use crate::statement::StatementExecutor;
|
||||
use crate::utils::to_meta_query_context;
|
||||
|
||||
impl StatementExecutor {
|
||||
/// Adds a comment to a database object (table, column, or flow).
|
||||
@@ -40,8 +41,10 @@ impl StatementExecutor {
|
||||
pub async fn comment(&self, stmt: Comment, query_ctx: QueryContextRef) -> Result<Output> {
|
||||
let comment_on_task = self.create_comment_on_task_from_stmt(stmt, &query_ctx)?;
|
||||
|
||||
let request =
|
||||
SubmitDdlTaskRequest::new(query_ctx, DdlTask::new_comment_on(comment_on_task));
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(query_ctx),
|
||||
DdlTask::new_comment_on(comment_on_task),
|
||||
);
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
@@ -57,8 +60,10 @@ impl StatementExecutor {
|
||||
) -> Result<Output> {
|
||||
let comment_on_task = self.create_comment_on_task_from_expr(expr)?;
|
||||
|
||||
let request =
|
||||
SubmitDdlTaskRequest::new(query_ctx, DdlTask::new_comment_on(comment_on_task));
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(query_ctx),
|
||||
DdlTask::new_comment_on(comment_on_task),
|
||||
);
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
|
||||
@@ -104,6 +104,7 @@ use crate::error::{
|
||||
use crate::expr_helper::{self, RepartitionRequest};
|
||||
use crate::statement::StatementExecutor;
|
||||
use crate::statement::show::create_partitions_stmt;
|
||||
use crate::utils::to_meta_query_context;
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct DdlSubmitOptions {
|
||||
@@ -590,7 +591,10 @@ impl StatementExecutor {
|
||||
})
|
||||
.context(error::InvalidExprSnafu)?;
|
||||
|
||||
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_trigger(task));
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_create_trigger(task),
|
||||
);
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
@@ -640,7 +644,10 @@ impl StatementExecutor {
|
||||
create_flow: Some(expr),
|
||||
})
|
||||
.context(error::InvalidExprSnafu)?;
|
||||
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_create_flow(task));
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_create_flow(task),
|
||||
);
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
@@ -935,8 +942,10 @@ impl StatementExecutor {
|
||||
table_type: TableType::View,
|
||||
};
|
||||
|
||||
let request =
|
||||
SubmitDdlTaskRequest::new(ctx, DdlTask::new_create_view(expr, view_info.clone()));
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(ctx),
|
||||
DdlTask::new_create_view(expr, view_info.clone()),
|
||||
);
|
||||
|
||||
let resp = self
|
||||
.procedure_executor
|
||||
@@ -1019,7 +1028,10 @@ impl StatementExecutor {
|
||||
expr: DropFlowTask,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_flow(expr));
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_drop_flow(expr),
|
||||
);
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
@@ -1051,7 +1063,10 @@ impl StatementExecutor {
|
||||
expr: DropTriggerTask,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_trigger(expr));
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_drop_trigger(expr),
|
||||
);
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
@@ -1116,7 +1131,10 @@ impl StatementExecutor {
|
||||
expr: DropViewTask,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest::new(query_context, DdlTask::new_drop_view(expr));
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_drop_view(expr),
|
||||
);
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
@@ -1499,7 +1517,7 @@ impl StatementExecutor {
|
||||
let from_partition_exprs_json = serialize_exprs(from_partition_exprs)?;
|
||||
let into_partition_exprs_json = serialize_exprs(into_partition_exprs)?;
|
||||
let mut req = SubmitDdlTaskRequest::new(
|
||||
query_context.clone(),
|
||||
to_meta_query_context(query_context.clone()),
|
||||
DdlTask::new_alter_table(AlterTableExpr {
|
||||
catalog_name: request.catalog_name.clone(),
|
||||
schema_name: request.schema_name.clone(),
|
||||
@@ -1613,7 +1631,10 @@ impl StatementExecutor {
|
||||
|
||||
let (req, invalidate_keys) = if physical_table_id == table_id {
|
||||
// This is physical table
|
||||
let req = SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_table(expr));
|
||||
let req = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_alter_table(expr),
|
||||
);
|
||||
|
||||
let invalidate_keys = vec![
|
||||
CacheIdent::TableId(table_id),
|
||||
@@ -1624,7 +1645,7 @@ impl StatementExecutor {
|
||||
} else {
|
||||
// This is logical table
|
||||
let req = SubmitDdlTaskRequest::new(
|
||||
query_context,
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_alter_logical_tables(vec![expr]),
|
||||
);
|
||||
|
||||
@@ -1745,7 +1766,7 @@ impl StatementExecutor {
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
query_context,
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_create_table(create_table, partitions, table_info),
|
||||
);
|
||||
|
||||
@@ -1761,7 +1782,7 @@ impl StatementExecutor {
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
query_context,
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_create_logical_tables(tables_data),
|
||||
);
|
||||
|
||||
@@ -1777,7 +1798,7 @@ impl StatementExecutor {
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
query_context,
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_alter_logical_tables(tables_data),
|
||||
);
|
||||
|
||||
@@ -1795,7 +1816,7 @@ impl StatementExecutor {
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
query_context,
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_drop_table(
|
||||
table_name.catalog_name.clone(),
|
||||
table_name.schema_name.clone(),
|
||||
@@ -1819,7 +1840,7 @@ impl StatementExecutor {
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
query_context,
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_drop_database(catalog, schema, drop_if_exists),
|
||||
);
|
||||
|
||||
@@ -1834,8 +1855,10 @@ impl StatementExecutor {
|
||||
alter_expr: AlterDatabaseExpr,
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request =
|
||||
SubmitDdlTaskRequest::new(query_context, DdlTask::new_alter_database(alter_expr));
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_alter_database(alter_expr),
|
||||
);
|
||||
|
||||
self.procedure_executor
|
||||
.submit_ddl_task(&ExecutorContext::default(), request)
|
||||
@@ -1851,7 +1874,7 @@ impl StatementExecutor {
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
query_context,
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_truncate_table(
|
||||
table_name.catalog_name.clone(),
|
||||
table_name.schema_name.clone(),
|
||||
@@ -1923,7 +1946,7 @@ impl StatementExecutor {
|
||||
query_context: QueryContextRef,
|
||||
) -> Result<SubmitDdlTaskResponse> {
|
||||
let request = SubmitDdlTaskRequest::new(
|
||||
query_context,
|
||||
to_meta_query_context(query_context),
|
||||
DdlTask::new_create_database(catalog, database, create_if_not_exists, options),
|
||||
);
|
||||
|
||||
|
||||
47
src/operator/src/utils.rs
Normal file
47
src/operator/src/utils.rs
Normal file
@@ -0,0 +1,47 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_time::Timezone;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::error::{Error, InvalidTimezoneSnafu};
|
||||
|
||||
pub fn to_meta_query_context(
|
||||
query_context: QueryContextRef,
|
||||
) -> common_meta::rpc::ddl::QueryContext {
|
||||
common_meta::rpc::ddl::QueryContext {
|
||||
current_catalog: query_context.current_catalog().to_string(),
|
||||
current_schema: query_context.current_schema().clone(),
|
||||
timezone: query_context.timezone().to_string(),
|
||||
extensions: query_context.extensions(),
|
||||
channel: query_context.channel() as u8,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_to_session_query_context(
|
||||
value: common_meta::rpc::ddl::QueryContext,
|
||||
) -> Result<session::context::QueryContext, Error> {
|
||||
Ok(QueryContextBuilder::default()
|
||||
.current_catalog(value.current_catalog)
|
||||
.current_schema(value.current_schema)
|
||||
.timezone(
|
||||
Timezone::from_tz_string(&value.timezone).context(InvalidTimezoneSnafu {
|
||||
timezone: value.timezone,
|
||||
})?,
|
||||
)
|
||||
.extensions(value.extensions)
|
||||
.channel((value.channel as u32).into())
|
||||
.build())
|
||||
}
|
||||
@@ -23,7 +23,6 @@ itertools.workspace = true
|
||||
moka = { workspace = true, features = ["future"] }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
sql.workspace = true
|
||||
sqlparser.workspace = true
|
||||
|
||||
Reference in New Issue
Block a user