diff --git a/src/common/meta/src/cache/flow/table_flownode.rs b/src/common/meta/src/cache/flow/table_flownode.rs index c9eb883b76..b285088822 100644 --- a/src/common/meta/src/cache/flow/table_flownode.rs +++ b/src/common/meta/src/cache/flow/table_flownode.rs @@ -187,6 +187,7 @@ mod tests { }, flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]), catalog_name: DEFAULT_CATALOG_NAME.to_string(), + query_context: None, flow_name: "my_flow".to_string(), raw_sql: "sql".to_string(), expire_after: Some(300), diff --git a/src/common/meta/src/ddl/create_flow.rs b/src/common/meta/src/ddl/create_flow.rs index 8b1c0354d4..1d751746d3 100644 --- a/src/common/meta/src/ddl/create_flow.rs +++ b/src/common/meta/src/ddl/create_flow.rs @@ -437,6 +437,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa sink_table_name, flownode_ids, catalog_name, + query_context: Some(value.query_context.clone()), flow_name, raw_sql: sql, expire_after, diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 1bdb3d0857..5242698458 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -783,6 +783,14 @@ pub enum Error { #[snafu(source)] source: common_procedure::error::Error, }, + + #[snafu(display("Failed to parse timezone"))] + InvalidTimeZone { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: common_time::error::Error, + }, } pub type Result = std::result::Result; @@ -853,7 +861,8 @@ impl ErrorExt for Error { | TlsConfig { .. } | InvalidSetDatabaseOption { .. } | InvalidUnsetDatabaseOption { .. } - | InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments, + | InvalidTopicNamePrefix { .. } + | InvalidTimeZone { .. } => StatusCode::InvalidArguments, FlowNotFound { .. } => StatusCode::FlowNotFound, FlowRouteNotFound { .. } => StatusCode::Unexpected, diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index 6af910b2fc..bc66c08a9d 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -452,6 +452,7 @@ mod tests { }; FlowInfoValue { catalog_name: catalog_name.to_string(), + query_context: None, flow_name: flow_name.to_string(), source_table_ids, sink_table_name, @@ -625,6 +626,7 @@ mod tests { }; let flow_value = FlowInfoValue { catalog_name: "greptime".to_string(), + query_context: None, flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name: another_sink_table_name, @@ -864,6 +866,7 @@ mod tests { }; let flow_value = FlowInfoValue { catalog_name: "greptime".to_string(), + query_context: None, flow_name: "flow".to_string(), source_table_ids: vec![1024, 1025, 1026], sink_table_name: another_sink_table_name, diff --git a/src/common/meta/src/key/flow/flow_info.rs b/src/common/meta/src/key/flow/flow_info.rs index eeb37da81d..1ed3f1e6f4 100644 --- a/src/common/meta/src/key/flow/flow_info.rs +++ b/src/common/meta/src/key/flow/flow_info.rs @@ -121,6 +121,13 @@ pub struct FlowInfoValue { pub(crate) flownode_ids: BTreeMap, /// The catalog name. pub(crate) catalog_name: String, + /// The query context used when create flow. + /// Although flow doesn't belong to any schema, this query_context is needed to remember + /// the query context when `create_flow` is executed + /// for recovering flow using the same sql&query_context after db restart. + /// if none, should use default query context + #[serde(default)] + pub(crate) query_context: Option, /// The flow name. pub(crate) flow_name: String, /// The raw sql. @@ -155,6 +162,10 @@ impl FlowInfoValue { &self.catalog_name } + pub fn query_context(&self) -> &Option { + &self.query_context + } + pub fn flow_name(&self) -> &String { &self.flow_name } diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index ae7794d9bd..2797c6bee0 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -35,17 +35,20 @@ use api::v1::{ }; use base64::engine::general_purpose; use base64::Engine as _; -use common_time::DatabaseTimeToLive; +use common_time::{DatabaseTimeToLive, Timezone}; use prost::Message; use serde::{Deserialize, Serialize}; use serde_with::{serde_as, DefaultOnNull}; -use session::context::QueryContextRef; +use session::context::{QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; use table::metadata::{RawTableInfo, TableId}; use table::table_name::TableName; use table::table_reference::TableReference; -use crate::error::{self, InvalidSetDatabaseOptionSnafu, InvalidUnsetDatabaseOptionSnafu, Result}; +use crate::error::{ + self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu, + Result, +}; use crate::key::FlowId; /// DDL tasks @@ -1202,7 +1205,7 @@ impl From for PbDropFlowTask { } } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct QueryContext { current_catalog: String, current_schema: String, @@ -1223,6 +1226,19 @@ impl From for QueryContext { } } +impl TryFrom for session::context::QueryContext { + type Error = error::Error; + fn try_from(value: QueryContext) -> std::result::Result { + Ok(QueryContextBuilder::default() + .current_catalog(value.current_catalog) + .current_schema(value.current_schema) + .timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?) + .extensions(value.extensions) + .channel((value.channel as u32).into()) + .build()) + } +} + impl From for PbQueryContext { fn from( QueryContext { diff --git a/src/flow/src/server.rs b/src/flow/src/server.rs index 53712ffb67..99f7a7c089 100644 --- a/src/flow/src/server.rs +++ b/src/flow/src/server.rs @@ -401,6 +401,7 @@ impl FlownodeBuilder { let cnt = to_be_recovered.len(); // TODO(discord9): recover in parallel + info!("Recovering {} flows: {:?}", cnt, to_be_recovered); for flow_id in to_be_recovered { let info = self .flow_metadata_manager @@ -416,6 +417,7 @@ impl FlownodeBuilder { info.sink_table_name().schema_name.clone(), info.sink_table_name().table_name.clone(), ]; + let args = CreateFlowArgs { flow_id: flow_id as _, sink_table_name, @@ -429,11 +431,24 @@ impl FlownodeBuilder { comment: Some(info.comment().clone()), sql: info.raw_sql().clone(), flow_options: info.options().clone(), - query_ctx: Some( - QueryContextBuilder::default() - .current_catalog(info.catalog_name().clone()) - .build(), - ), + query_ctx: info + .query_context() + .clone() + .map(|ctx| { + ctx.try_into() + .map_err(BoxedError::new) + .context(ExternalSnafu) + }) + .transpose()? + // or use default QueryContext with catalog_name from info + // to keep compatibility with old version + .or_else(|| { + Some( + QueryContextBuilder::default() + .current_catalog(info.catalog_name().to_string()) + .build(), + ) + }), }; manager .create_flow_inner(args) diff --git a/tests/cases/standalone/common/flow/flow_rebuild.result b/tests/cases/standalone/common/flow/flow_rebuild.result index 67fd43a032..0cc5f1ce9c 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.result +++ b/tests/cases/standalone/common/flow/flow_rebuild.result @@ -576,3 +576,117 @@ DROP TABLE out_basic; Affected Rows: 0 +-- check if different schema is working as expected +CREATE DATABASE jsdp_log; + +Affected Rows: 1 + +USE jsdp_log; + +Affected Rows: 0 + +CREATE TABLE IF NOT EXISTS `api_log` ( + `time` TIMESTAMP(9) NOT NULL, + `key` STRING NULL SKIPPING INDEX WITH(granularity = '1024', type = 'BLOOM'), + `status_code` TINYINT NULL, + `method` STRING NULL, + `path` STRING NULL, + `raw_query` STRING NULL, + `user_agent` STRING NULL, + `client_ip` STRING NULL, + `duration` INT NULL, + `count` INT NULL, + TIME INDEX (`time`) +) ENGINE=mito WITH( + append_mode = 'true' +); + +Affected Rows: 0 + +CREATE TABLE IF NOT EXISTS `api_stats` ( + `time` TIMESTAMP(0) NOT NULL, + `key` STRING NULL, + `qpm` BIGINT NULL, + `rpm` BIGINT NULL, + `update_at` TIMESTAMP(3) NULL, + TIME INDEX (`time`), + PRIMARY KEY (`key`) +) ENGINE=mito WITH( + append_mode = 'false', + merge_mode = 'last_row' +); + +Affected Rows: 0 + +CREATE FLOW IF NOT EXISTS api_stats_flow +SINK TO api_stats EXPIRE AFTER '10 minute'::INTERVAL AS +SELECT date_trunc('minute', `time`::TimestampSecond) AS `time1`, `key`, count(*), sum(`count`) +FROM api_log +GROUP BY `time1`, `key`; + +Affected Rows: 0 + +INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '1', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); + +Affected Rows: 1 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('api_stats_flow'); + ++------------------------------------+ +| ADMIN FLUSH_FLOW('api_stats_flow') | ++------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------+ + +SELECT key FROM api_stats; + ++-----+ +| key | ++-----+ +| 1 | ++-----+ + +-- SQLNESS ARG restart=true +INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); + +Affected Rows: 1 + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('api_stats_flow'); + ++------------------------------------+ +| ADMIN FLUSH_FLOW('api_stats_flow') | ++------------------------------------+ +| FLOW_FLUSHED | ++------------------------------------+ + +SELECT key FROM api_stats; + ++-----+ +| key | ++-----+ +| 1 | +| 2 | ++-----+ + +DROP FLOW api_stats_flow; + +Affected Rows: 0 + +DROP TABLE api_log; + +Affected Rows: 0 + +DROP TABLE api_stats; + +Affected Rows: 0 + +USE public; + +Affected Rows: 0 + +DROP DATABASE jsdp_log; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/flow/flow_rebuild.sql b/tests/cases/standalone/common/flow/flow_rebuild.sql index 288d6f1f03..ad07ea4a40 100644 --- a/tests/cases/standalone/common/flow/flow_rebuild.sql +++ b/tests/cases/standalone/common/flow/flow_rebuild.sql @@ -317,3 +317,66 @@ DROP FLOW test_wildcard_basic; DROP TABLE input_basic; DROP TABLE out_basic; + +-- check if different schema is working as expected + +CREATE DATABASE jsdp_log; +USE jsdp_log; + +CREATE TABLE IF NOT EXISTS `api_log` ( + `time` TIMESTAMP(9) NOT NULL, + `key` STRING NULL SKIPPING INDEX WITH(granularity = '1024', type = 'BLOOM'), + `status_code` TINYINT NULL, + `method` STRING NULL, + `path` STRING NULL, + `raw_query` STRING NULL, + `user_agent` STRING NULL, + `client_ip` STRING NULL, + `duration` INT NULL, + `count` INT NULL, + TIME INDEX (`time`) +) ENGINE=mito WITH( + append_mode = 'true' +); + +CREATE TABLE IF NOT EXISTS `api_stats` ( + `time` TIMESTAMP(0) NOT NULL, + `key` STRING NULL, + `qpm` BIGINT NULL, + `rpm` BIGINT NULL, + `update_at` TIMESTAMP(3) NULL, + TIME INDEX (`time`), + PRIMARY KEY (`key`) +) ENGINE=mito WITH( + append_mode = 'false', + merge_mode = 'last_row' +); + +CREATE FLOW IF NOT EXISTS api_stats_flow +SINK TO api_stats EXPIRE AFTER '10 minute'::INTERVAL AS +SELECT date_trunc('minute', `time`::TimestampSecond) AS `time1`, `key`, count(*), sum(`count`) +FROM api_log +GROUP BY `time1`, `key`; + +INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '1', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('api_stats_flow'); + +SELECT key FROM api_stats; + +-- SQLNESS ARG restart=true +INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1); + +-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED | +ADMIN FLUSH_FLOW('api_stats_flow'); + +SELECT key FROM api_stats; + +DROP FLOW api_stats_flow; + +DROP TABLE api_log; +DROP TABLE api_stats; + +USE public; +DROP DATABASE jsdp_log;