feat: optimize CreateFlowData with lightweight FlowQueryContext (#6780)

* feat: optimize CreateFlowData with lightweight FlowQueryContext

Replace full QueryContext with FlowQueryContext containing only essential fields (catalog, schema, timezone) in CreateFlowData struct. This improves serialization performance by eliminating unused extensions HashMap and channel fields.

Key changes:
- Add FlowQueryContext struct with conversion implementations
- Update CreateFlowData to use FlowQueryContext with backward compatibility
- Add tests for serialization and conversions

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>

* chore: run make fmt

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>

---------

Signed-off-by: Alex Araujo <alexaraujo@gmail.com>
This commit is contained in:
Alex Araujo
2025-08-21 07:02:57 -05:00
committed by GitHub
parent 6692957e08
commit a98c48a9b2
3 changed files with 396 additions and 19 deletions

View File

@@ -47,7 +47,7 @@ use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId};
use crate::lock_key::{CatalogLock, FlowNameLock, TableNameLock};
use crate::metrics;
use crate::peer::Peer;
use crate::rpc::ddl::{CreateFlowTask, QueryContext};
use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
/// The procedure of flow creation.
pub struct CreateFlowProcedure {
@@ -67,7 +67,7 @@ impl CreateFlowProcedure {
flow_id: None,
peers: vec![],
source_table_ids: vec![],
query_context,
flow_context: query_context.into(), // Convert to FlowQueryContext
state: CreateFlowState::Prepare,
prev_flow_info_value: None,
did_replace: false,
@@ -204,7 +204,8 @@ impl CreateFlowProcedure {
let request = FlowRequest {
header: Some(FlowRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
query_context: Some(self.data.query_context.clone().into()),
// Convert FlowQueryContext to QueryContext
query_context: Some(QueryContext::from(self.data.flow_context.clone()).into()),
}),
body: Some(PbFlowRequest::Create((&self.data).into())),
};
@@ -415,7 +416,9 @@ pub struct CreateFlowData {
pub(crate) flow_id: Option<FlowId>,
pub(crate) peers: Vec<Peer>,
pub(crate) source_table_ids: Vec<TableId>,
pub(crate) query_context: QueryContext,
/// Use alias for backward compatibility with QueryContext serialized data
#[serde(alias = "query_context")]
pub(crate) flow_context: FlowQueryContext,
/// For verify if prev value is consistent when need to update flow metadata.
/// only set when `or_replace` is true.
pub(crate) prev_flow_info_value: Option<DeserializedValueWithBytes<FlowInfoValue>>,
@@ -495,7 +498,8 @@ impl From<&CreateFlowData> for (FlowInfoValue, Vec<(FlowPartitionId, FlowRouteVa
sink_table_name,
flownode_ids,
catalog_name,
query_context: Some(value.query_context.clone()),
// Convert FlowQueryContext back to QueryContext for storage
query_context: Some(QueryContext::from(value.flow_context.clone())),
flow_name,
raw_sql: sql,
expire_after,

View File

@@ -18,17 +18,17 @@ use std::sync::Arc;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_procedure_test::execute_procedure_until_done;
use session::context::QueryContext;
use session::context::QueryContext as SessionQueryContext;
use table::table_name::TableName;
use crate::ddl::create_flow::CreateFlowProcedure;
use crate::ddl::create_flow::{CreateFlowData, CreateFlowProcedure, CreateFlowState, FlowType};
use crate::ddl::test_util::create_table::test_create_table_task;
use crate::ddl::test_util::flownode_handler::NaiveFlownodeHandler;
use crate::ddl::DdlContext;
use crate::error;
use crate::key::table_route::TableRouteValue;
use crate::key::FlowId;
use crate::rpc::ddl::CreateFlowTask;
use crate::rpc::ddl::{CreateFlowTask, FlowQueryContext, QueryContext};
use crate::test_util::{new_ddl_context, MockFlownodeManager};
pub(crate) fn test_create_flow_task(
@@ -63,7 +63,7 @@ async fn test_create_flow_source_table_not_found() {
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
let node_manager = Arc::new(MockFlownodeManager::new(NaiveFlownodeHandler));
let ddl_context = new_ddl_context(node_manager);
let query_ctx = QueryContext::arc().into();
let query_ctx = SessionQueryContext::arc().into();
let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::TableNotFound { .. });
@@ -81,7 +81,7 @@ pub(crate) async fn create_test_flow(
sink_table_name.clone(),
false,
);
let query_ctx = QueryContext::arc().into();
let query_ctx = SessionQueryContext::arc().into();
let mut procedure = CreateFlowProcedure::new(task.clone(), query_ctx, ddl_context.clone());
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let flow_id = output.downcast_ref::<FlowId>().unwrap();
@@ -128,7 +128,7 @@ async fn test_create_flow() {
sink_table_name.clone(),
true,
);
let query_ctx = QueryContext::arc().into();
let query_ctx = SessionQueryContext::arc().into();
let mut procedure = CreateFlowProcedure::new(task.clone(), query_ctx, ddl_context.clone());
let output = execute_procedure_until_done(&mut procedure).await.unwrap();
let flow_id = output.downcast_ref::<FlowId>().unwrap();
@@ -136,7 +136,7 @@ async fn test_create_flow() {
// Creates again
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
let query_ctx = QueryContext::arc().into();
let query_ctx = SessionQueryContext::arc().into();
let mut procedure = CreateFlowProcedure::new(task.clone(), query_ctx, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::FlowAlreadyExists { .. });
@@ -168,7 +168,7 @@ async fn test_create_flow_same_source_and_sink_table() {
// Try to create a flow with same source and sink table - should fail
let task = test_create_flow_task("my_flow", source_table_names, sink_table_name, false);
let query_ctx = QueryContext::arc().into();
let query_ctx = SessionQueryContext::arc().into();
let mut procedure = CreateFlowProcedure::new(task, query_ctx, ddl_context);
let err = procedure.on_prepare().await.unwrap_err();
assert_matches!(err, error::Error::Unsupported { .. });
@@ -179,3 +179,165 @@ async fn test_create_flow_same_source_and_sink_table() {
assert!(operation.contains("same_table"));
}
}
fn create_test_flow_task_for_serialization() -> CreateFlowTask {
CreateFlowTask {
catalog_name: "test_catalog".to_string(),
flow_name: "test_flow".to_string(),
source_table_names: vec![TableName::new("catalog", "schema", "source_table")],
sink_table_name: TableName::new("catalog", "schema", "sink_table"),
or_replace: false,
create_if_not_exists: false,
expire_after: None,
comment: "test comment".to_string(),
sql: "SELECT * FROM source_table".to_string(),
flow_options: HashMap::new(),
}
}
#[test]
fn test_create_flow_data_serialization_backward_compatibility() {
// Test that old serialized data with query_context can be deserialized
let old_json = r#"{
"state": "Prepare",
"task": {
"catalog_name": "test_catalog",
"flow_name": "test_flow",
"source_table_names": [{"catalog_name": "catalog", "schema_name": "schema", "table_name": "source"}],
"sink_table_name": {"catalog_name": "catalog", "schema_name": "schema", "table_name": "sink"},
"or_replace": false,
"create_if_not_exists": false,
"expire_after": null,
"comment": "test",
"sql": "SELECT * FROM source",
"flow_options": {}
},
"flow_id": null,
"peers": [],
"source_table_ids": [],
"query_context": {
"current_catalog": "old_catalog",
"current_schema": "old_schema",
"timezone": "UTC",
"extensions": {},
"channel": 0
},
"prev_flow_info_value": null,
"did_replace": false,
"flow_type": null
}"#;
let data: CreateFlowData = serde_json::from_str(old_json).unwrap();
assert_eq!(data.flow_context.catalog, "old_catalog");
assert_eq!(data.flow_context.schema, "old_schema");
assert_eq!(data.flow_context.timezone, "UTC");
}
#[test]
fn test_create_flow_data_new_format_serialization() {
// Test new format serialization/deserialization
let flow_context = FlowQueryContext {
catalog: "new_catalog".to_string(),
schema: "new_schema".to_string(),
timezone: "America/New_York".to_string(),
};
let data = CreateFlowData {
state: CreateFlowState::Prepare,
task: create_test_flow_task_for_serialization(),
flow_id: None,
peers: vec![],
source_table_ids: vec![],
flow_context,
prev_flow_info_value: None,
did_replace: false,
flow_type: None,
};
let serialized = serde_json::to_string(&data).unwrap();
let deserialized: CreateFlowData = serde_json::from_str(&serialized).unwrap();
assert_eq!(data.flow_context, deserialized.flow_context);
assert_eq!(deserialized.flow_context.catalog, "new_catalog");
assert_eq!(deserialized.flow_context.schema, "new_schema");
assert_eq!(deserialized.flow_context.timezone, "America/New_York");
}
#[test]
fn test_flow_query_context_conversion_from_query_context() {
let query_context = QueryContext {
current_catalog: "prod_catalog".to_string(),
current_schema: "public".to_string(),
timezone: "America/Los_Angeles".to_string(),
extensions: [
("unused_key".to_string(), "unused_value".to_string()),
("another_key".to_string(), "another_value".to_string()),
]
.into(),
channel: 99,
};
let flow_context: FlowQueryContext = query_context.into();
assert_eq!(flow_context.catalog, "prod_catalog");
assert_eq!(flow_context.schema, "public");
assert_eq!(flow_context.timezone, "America/Los_Angeles");
}
#[test]
fn test_flow_info_conversion_with_flow_context() {
let flow_context = FlowQueryContext {
catalog: "info_catalog".to_string(),
schema: "info_schema".to_string(),
timezone: "Europe/Berlin".to_string(),
};
let data = CreateFlowData {
state: CreateFlowState::CreateMetadata,
task: create_test_flow_task_for_serialization(),
flow_id: Some(123),
peers: vec![],
source_table_ids: vec![456, 789],
flow_context,
prev_flow_info_value: None,
did_replace: false,
flow_type: Some(FlowType::Batching),
};
let (flow_info, _routes) = (&data).into();
assert!(flow_info.query_context.is_some());
let query_context = flow_info.query_context.unwrap();
assert_eq!(query_context.current_catalog(), "info_catalog");
assert_eq!(query_context.current_schema(), "info_schema");
assert_eq!(query_context.timezone(), "Europe/Berlin");
assert_eq!(query_context.channel(), 0);
assert!(query_context.extensions().is_empty());
}
#[test]
fn test_mixed_serialization_format_support() {
// Test that we can deserialize both old and new formats
// Test new FlowQueryContext format
let new_format = r#"{"catalog": "test", "schema": "test", "timezone": "UTC"}"#;
let ctx_from_new: FlowQueryContext = serde_json::from_str(new_format).unwrap();
assert_eq!(ctx_from_new.catalog, "test");
assert_eq!(ctx_from_new.schema, "test");
assert_eq!(ctx_from_new.timezone, "UTC");
// Test old QueryContext format conversion
let old_format = r#"{"current_catalog": "old_test", "current_schema": "old_schema", "timezone": "PST", "extensions": {}, "channel": 0}"#;
let ctx_from_old: FlowQueryContext = serde_json::from_str(old_format).unwrap();
assert_eq!(ctx_from_old.catalog, "old_test");
assert_eq!(ctx_from_old.schema, "old_schema");
assert_eq!(ctx_from_old.timezone, "PST");
// Test that they can be compared
let expected_new = FlowQueryContext {
catalog: "test".to_string(),
schema: "test".to_string(),
timezone: "UTC".to_string(),
};
assert_eq!(ctx_from_new, expected_new);
}

View File

@@ -1257,11 +1257,82 @@ impl From<DropFlowTask> for PbDropFlowTask {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct QueryContext {
current_catalog: String,
current_schema: String,
timezone: String,
extensions: HashMap<String, String>,
channel: u8,
pub(crate) current_catalog: String,
pub(crate) current_schema: String,
pub(crate) timezone: String,
pub(crate) extensions: HashMap<String, String>,
pub(crate) channel: u8,
}
impl QueryContext {
/// Get the current catalog
pub fn current_catalog(&self) -> &str {
&self.current_catalog
}
/// Get the current schema
pub fn current_schema(&self) -> &str {
&self.current_schema
}
/// Get the timezone
pub fn timezone(&self) -> &str {
&self.timezone
}
/// Get the extensions
pub fn extensions(&self) -> &HashMap<String, String> {
&self.extensions
}
/// Get the channel
pub fn channel(&self) -> u8 {
self.channel
}
}
/// Lightweight query context for flow operations containing only essential fields.
/// This is a subset of QueryContext that includes only the fields actually needed
/// for flow creation and execution.
#[derive(Debug, Clone, Serialize, PartialEq)]
pub struct FlowQueryContext {
/// Current catalog name - needed for flow metadata and recovery
pub(crate) catalog: String,
/// Current schema name - needed for table resolution during flow execution
pub(crate) schema: String,
/// Timezone for timestamp operations in the flow
pub(crate) timezone: String,
}
impl<'de> Deserialize<'de> for FlowQueryContext {
fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
// Support both QueryContext format and FlowQueryContext format
#[derive(Deserialize)]
#[serde(untagged)]
enum ContextCompat {
Flow(FlowQueryContextHelper),
Full(QueryContext),
}
#[derive(Deserialize)]
struct FlowQueryContextHelper {
catalog: String,
schema: String,
timezone: String,
}
match ContextCompat::deserialize(deserializer)? {
ContextCompat::Flow(helper) => Ok(FlowQueryContext {
catalog: helper.catalog,
schema: helper.schema,
timezone: helper.timezone,
}),
ContextCompat::Full(full_ctx) => Ok(full_ctx.into()),
}
}
}
impl From<QueryContextRef> for QueryContext {
@@ -1311,6 +1382,45 @@ impl From<QueryContext> for PbQueryContext {
}
}
impl From<QueryContext> for FlowQueryContext {
fn from(ctx: QueryContext) -> Self {
Self {
catalog: ctx.current_catalog,
schema: ctx.current_schema,
timezone: ctx.timezone,
}
}
}
impl From<QueryContextRef> for FlowQueryContext {
fn from(ctx: QueryContextRef) -> Self {
Self {
catalog: ctx.current_catalog().to_string(),
schema: ctx.current_schema().to_string(),
timezone: ctx.timezone().to_string(),
}
}
}
impl From<FlowQueryContext> for QueryContext {
fn from(flow_ctx: FlowQueryContext) -> Self {
Self {
current_catalog: flow_ctx.catalog,
current_schema: flow_ctx.schema,
timezone: flow_ctx.timezone,
extensions: HashMap::new(),
channel: 0, // Use default channel for flows
}
}
}
impl From<FlowQueryContext> for PbQueryContext {
fn from(flow_ctx: FlowQueryContext) -> Self {
let query_ctx: QueryContext = flow_ctx.into();
query_ctx.into()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
@@ -1322,7 +1432,7 @@ mod tests {
use table::metadata::{RawTableInfo, RawTableMeta, TableType};
use table::test_util::table_info::test_table_info;
use super::{AlterTableTask, CreateTableTask};
use super::{AlterTableTask, CreateTableTask, *};
#[test]
fn test_basic_ser_de_create_table_task() {
@@ -1457,4 +1567,105 @@ mod tests {
);
assert_eq!(create_table_task.table_info.meta.value_indices, vec![0, 1]);
}
#[test]
fn test_flow_query_context_conversion_from_query_context() {
use std::collections::HashMap;
let mut extensions = HashMap::new();
extensions.insert("key1".to_string(), "value1".to_string());
extensions.insert("key2".to_string(), "value2".to_string());
let query_ctx = QueryContext {
current_catalog: "test_catalog".to_string(),
current_schema: "test_schema".to_string(),
timezone: "UTC".to_string(),
extensions,
channel: 5,
};
let flow_ctx: FlowQueryContext = query_ctx.into();
assert_eq!(flow_ctx.catalog, "test_catalog");
assert_eq!(flow_ctx.schema, "test_schema");
assert_eq!(flow_ctx.timezone, "UTC");
}
#[test]
fn test_flow_query_context_conversion_to_query_context() {
let flow_ctx = FlowQueryContext {
catalog: "prod_catalog".to_string(),
schema: "public".to_string(),
timezone: "America/New_York".to_string(),
};
let query_ctx: QueryContext = flow_ctx.clone().into();
assert_eq!(query_ctx.current_catalog, "prod_catalog");
assert_eq!(query_ctx.current_schema, "public");
assert_eq!(query_ctx.timezone, "America/New_York");
assert!(query_ctx.extensions.is_empty());
assert_eq!(query_ctx.channel, 0);
// Test roundtrip conversion
let flow_ctx_roundtrip: FlowQueryContext = query_ctx.into();
assert_eq!(flow_ctx, flow_ctx_roundtrip);
}
#[test]
fn test_flow_query_context_conversion_from_query_context_ref() {
use common_time::Timezone;
use session::context::QueryContextBuilder;
let session_ctx = QueryContextBuilder::default()
.current_catalog("session_catalog".to_string())
.current_schema("session_schema".to_string())
.timezone(Timezone::from_tz_string("Europe/London").unwrap())
.build();
let session_ctx_ref = Arc::new(session_ctx);
let flow_ctx: FlowQueryContext = session_ctx_ref.into();
assert_eq!(flow_ctx.catalog, "session_catalog");
assert_eq!(flow_ctx.schema, "session_schema");
assert_eq!(flow_ctx.timezone, "Europe/London");
}
#[test]
fn test_flow_query_context_serialization() {
let flow_ctx = FlowQueryContext {
catalog: "test_catalog".to_string(),
schema: "test_schema".to_string(),
timezone: "UTC".to_string(),
};
let serialized = serde_json::to_string(&flow_ctx).unwrap();
let deserialized: FlowQueryContext = serde_json::from_str(&serialized).unwrap();
assert_eq!(flow_ctx, deserialized);
// Verify JSON structure
let json_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
assert_eq!(json_value["catalog"], "test_catalog");
assert_eq!(json_value["schema"], "test_schema");
assert_eq!(json_value["timezone"], "UTC");
}
#[test]
fn test_flow_query_context_conversion_to_pb() {
let flow_ctx = FlowQueryContext {
catalog: "pb_catalog".to_string(),
schema: "pb_schema".to_string(),
timezone: "Asia/Tokyo".to_string(),
};
let pb_ctx: PbQueryContext = flow_ctx.into();
assert_eq!(pb_ctx.current_catalog, "pb_catalog");
assert_eq!(pb_ctx.current_schema, "pb_schema");
assert_eq!(pb_ctx.timezone, "Asia/Tokyo");
assert!(pb_ctx.extensions.is_empty());
assert_eq!(pb_ctx.channel, 0);
assert!(pb_ctx.snapshot_seqs.is_none());
assert!(pb_ctx.explain.is_none());
}
}