fix: store flow query ctx on creation (#5963)

* fix: store flow schema on creation

* chore: update sqlness

* refactor: save the entire query context to flow info

* chore: sqlness update

* chore: rm pub

* fix: keep old version compatibility
This commit is contained in:
discord9
2025-04-23 17:59:09 +08:00
committed by GitHub
parent 02e9a66d7a
commit 79ed7bbc44
9 changed files with 243 additions and 10 deletions

View File

@@ -187,6 +187,7 @@ mod tests {
},
flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
catalog_name: DEFAULT_CATALOG_NAME.to_string(),
query_context: None,
flow_name: "my_flow".to_string(),
raw_sql: "sql".to_string(),
expire_after: Some(300),

View File

@@ -437,6 +437,7 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
sink_table_name,
flownode_ids,
catalog_name,
query_context: Some(value.query_context.clone()),
flow_name,
raw_sql: sql,
expire_after,

View File

@@ -783,6 +783,14 @@ pub enum Error {
#[snafu(source)]
source: common_procedure::error::Error,
},
#[snafu(display("Failed to parse timezone"))]
InvalidTimeZone {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: common_time::error::Error,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -853,7 +861,8 @@ impl ErrorExt for Error {
| TlsConfig { .. }
| InvalidSetDatabaseOption { .. }
| InvalidUnsetDatabaseOption { .. }
| InvalidTopicNamePrefix { .. } => StatusCode::InvalidArguments,
| InvalidTopicNamePrefix { .. }
| InvalidTimeZone { .. } => StatusCode::InvalidArguments,
FlowNotFound { .. } => StatusCode::FlowNotFound,
FlowRouteNotFound { .. } => StatusCode::Unexpected,

View File

@@ -452,6 +452,7 @@ mod tests {
};
FlowInfoValue {
catalog_name: catalog_name.to_string(),
query_context: None,
flow_name: flow_name.to_string(),
source_table_ids,
sink_table_name,
@@ -625,6 +626,7 @@ mod tests {
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
query_context: None,
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: another_sink_table_name,
@@ -864,6 +866,7 @@ mod tests {
};
let flow_value = FlowInfoValue {
catalog_name: "greptime".to_string(),
query_context: None,
flow_name: "flow".to_string(),
source_table_ids: vec![1024, 1025, 1026],
sink_table_name: another_sink_table_name,

View File

@@ -121,6 +121,13 @@ pub struct FlowInfoValue {
pub(crate) flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
/// The catalog name.
pub(crate) catalog_name: String,
/// The query context used when create flow.
/// Although flow doesn't belong to any schema, this query_context is needed to remember
/// the query context when `create_flow` is executed
/// for recovering flow using the same sql&query_context after db restart.
/// if none, should use default query context
#[serde(default)]
pub(crate) query_context: Option<crate::rpc::ddl::QueryContext>,
/// The flow name.
pub(crate) flow_name: String,
/// The raw sql.
@@ -155,6 +162,10 @@ impl FlowInfoValue {
&self.catalog_name
}
pub fn query_context(&self) -> &Option<crate::rpc::ddl::QueryContext> {
&self.query_context
}
pub fn flow_name(&self) -> &String {
&self.flow_name
}

View File

@@ -35,17 +35,20 @@ use api::v1::{
};
use base64::engine::general_purpose;
use base64::Engine as _;
use common_time::DatabaseTimeToLive;
use common_time::{DatabaseTimeToLive, Timezone};
use prost::Message;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DefaultOnNull};
use session::context::QueryContextRef;
use session::context::{QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use table::metadata::{RawTableInfo, TableId};
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::error::{self, InvalidSetDatabaseOptionSnafu, InvalidUnsetDatabaseOptionSnafu, Result};
use crate::error::{
self, InvalidSetDatabaseOptionSnafu, InvalidTimeZoneSnafu, InvalidUnsetDatabaseOptionSnafu,
Result,
};
use crate::key::FlowId;
/// DDL tasks
@@ -1202,7 +1205,7 @@ impl From<DropFlowTask> for PbDropFlowTask {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct QueryContext {
current_catalog: String,
current_schema: String,
@@ -1223,6 +1226,19 @@ impl From<QueryContextRef> for QueryContext {
}
}
impl TryFrom<QueryContext> for session::context::QueryContext {
type Error = error::Error;
fn try_from(value: QueryContext) -> std::result::Result<Self, Self::Error> {
Ok(QueryContextBuilder::default()
.current_catalog(value.current_catalog)
.current_schema(value.current_schema)
.timezone(Timezone::from_tz_string(&value.timezone).context(InvalidTimeZoneSnafu)?)
.extensions(value.extensions)
.channel((value.channel as u32).into())
.build())
}
}
impl From<QueryContext> for PbQueryContext {
fn from(
QueryContext {

View File

@@ -401,6 +401,7 @@ impl FlownodeBuilder {
let cnt = to_be_recovered.len();
// TODO(discord9): recover in parallel
info!("Recovering {} flows: {:?}", cnt, to_be_recovered);
for flow_id in to_be_recovered {
let info = self
.flow_metadata_manager
@@ -416,6 +417,7 @@ impl FlownodeBuilder {
info.sink_table_name().schema_name.clone(),
info.sink_table_name().table_name.clone(),
];
let args = CreateFlowArgs {
flow_id: flow_id as _,
sink_table_name,
@@ -429,11 +431,24 @@ impl FlownodeBuilder {
comment: Some(info.comment().clone()),
sql: info.raw_sql().clone(),
flow_options: info.options().clone(),
query_ctx: Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().clone())
.build(),
),
query_ctx: info
.query_context()
.clone()
.map(|ctx| {
ctx.try_into()
.map_err(BoxedError::new)
.context(ExternalSnafu)
})
.transpose()?
// or use default QueryContext with catalog_name from info
// to keep compatibility with old version
.or_else(|| {
Some(
QueryContextBuilder::default()
.current_catalog(info.catalog_name().to_string())
.build(),
)
}),
};
manager
.create_flow_inner(args)

View File

@@ -576,3 +576,117 @@ DROP TABLE out_basic;
Affected Rows: 0
-- check if different schema is working as expected
CREATE DATABASE jsdp_log;
Affected Rows: 1
USE jsdp_log;
Affected Rows: 0
CREATE TABLE IF NOT EXISTS `api_log` (
`time` TIMESTAMP(9) NOT NULL,
`key` STRING NULL SKIPPING INDEX WITH(granularity = '1024', type = 'BLOOM'),
`status_code` TINYINT NULL,
`method` STRING NULL,
`path` STRING NULL,
`raw_query` STRING NULL,
`user_agent` STRING NULL,
`client_ip` STRING NULL,
`duration` INT NULL,
`count` INT NULL,
TIME INDEX (`time`)
) ENGINE=mito WITH(
append_mode = 'true'
);
Affected Rows: 0
CREATE TABLE IF NOT EXISTS `api_stats` (
`time` TIMESTAMP(0) NOT NULL,
`key` STRING NULL,
`qpm` BIGINT NULL,
`rpm` BIGINT NULL,
`update_at` TIMESTAMP(3) NULL,
TIME INDEX (`time`),
PRIMARY KEY (`key`)
) ENGINE=mito WITH(
append_mode = 'false',
merge_mode = 'last_row'
);
Affected Rows: 0
CREATE FLOW IF NOT EXISTS api_stats_flow
SINK TO api_stats EXPIRE AFTER '10 minute'::INTERVAL AS
SELECT date_trunc('minute', `time`::TimestampSecond) AS `time1`, `key`, count(*), sum(`count`)
FROM api_log
GROUP BY `time1`, `key`;
Affected Rows: 0
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '1', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');
+------------------------------------+
| ADMIN FLUSH_FLOW('api_stats_flow') |
+------------------------------------+
| FLOW_FLUSHED |
+------------------------------------+
SELECT key FROM api_stats;
+-----+
| key |
+-----+
| 1 |
+-----+
-- SQLNESS ARG restart=true
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
Affected Rows: 1
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');
+------------------------------------+
| ADMIN FLUSH_FLOW('api_stats_flow') |
+------------------------------------+
| FLOW_FLUSHED |
+------------------------------------+
SELECT key FROM api_stats;
+-----+
| key |
+-----+
| 1 |
| 2 |
+-----+
DROP FLOW api_stats_flow;
Affected Rows: 0
DROP TABLE api_log;
Affected Rows: 0
DROP TABLE api_stats;
Affected Rows: 0
USE public;
Affected Rows: 0
DROP DATABASE jsdp_log;
Affected Rows: 0

View File

@@ -317,3 +317,66 @@ DROP FLOW test_wildcard_basic;
DROP TABLE input_basic;
DROP TABLE out_basic;
-- check if different schema is working as expected
CREATE DATABASE jsdp_log;
USE jsdp_log;
CREATE TABLE IF NOT EXISTS `api_log` (
`time` TIMESTAMP(9) NOT NULL,
`key` STRING NULL SKIPPING INDEX WITH(granularity = '1024', type = 'BLOOM'),
`status_code` TINYINT NULL,
`method` STRING NULL,
`path` STRING NULL,
`raw_query` STRING NULL,
`user_agent` STRING NULL,
`client_ip` STRING NULL,
`duration` INT NULL,
`count` INT NULL,
TIME INDEX (`time`)
) ENGINE=mito WITH(
append_mode = 'true'
);
CREATE TABLE IF NOT EXISTS `api_stats` (
`time` TIMESTAMP(0) NOT NULL,
`key` STRING NULL,
`qpm` BIGINT NULL,
`rpm` BIGINT NULL,
`update_at` TIMESTAMP(3) NULL,
TIME INDEX (`time`),
PRIMARY KEY (`key`)
) ENGINE=mito WITH(
append_mode = 'false',
merge_mode = 'last_row'
);
CREATE FLOW IF NOT EXISTS api_stats_flow
SINK TO api_stats EXPIRE AFTER '10 minute'::INTERVAL AS
SELECT date_trunc('minute', `time`::TimestampSecond) AS `time1`, `key`, count(*), sum(`count`)
FROM api_log
GROUP BY `time1`, `key`;
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '1', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');
SELECT key FROM api_stats;
-- SQLNESS ARG restart=true
INSERT INTO `api_log` (`time`, `key`, `status_code`, `method`, `path`, `raw_query`, `user_agent`, `client_ip`, `duration`, `count`) VALUES (now(), '2', 0, 'GET', '/lightning/v1/query', 'key=1&since=600', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36', '1', 21, 1);
-- SQLNESS REPLACE (ADMIN\sFLUSH_FLOW\('\w+'\)\s+\|\n\+-+\+\n\|\s+)[0-9]+\s+\| $1 FLOW_FLUSHED |
ADMIN FLUSH_FLOW('api_stats_flow');
SELECT key FROM api_stats;
DROP FLOW api_stats_flow;
DROP TABLE api_log;
DROP TABLE api_stats;
USE public;
DROP DATABASE jsdp_log;