diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index c31b608967..4bf89f465f 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -16,6 +16,7 @@ mod build_index_table; mod flush_compact_region; mod flush_compact_table; mod gc; +mod migrate_flow; mod migrate_region; mod reconcile_catalog; mod reconcile_database; @@ -24,6 +25,7 @@ mod reconcile_table; use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; use flush_compact_table::{CompactTableFunction, FlushTableFunction}; use gc::{GcRegionsFunction, GcTableFunction}; +use migrate_flow::MigrateFlowFunction; use migrate_region::MigrateRegionFunction; use reconcile_catalog::ReconcileCatalogFunction; use reconcile_database::ReconcileDatabaseFunction; @@ -48,6 +50,7 @@ impl AdminFunction { registry.register(GcTableFunction::factory()); registry.register(BuildIndexFunction::factory()); registry.register(FlushFlowFunction::factory()); + registry.register(MigrateFlowFunction::factory()); registry.register(ReconcileCatalogFunction::factory()); registry.register(ReconcileDatabaseFunction::factory()); registry.register(ReconcileTableFunction::factory()); diff --git a/src/common/function/src/admin/migrate_flow.rs b/src/common/function/src/admin/migrate_flow.rs new file mode 100644 index 0000000000..eb0d6cef2c --- /dev/null +++ b/src/common/function/src/admin/migrate_flow.rs @@ -0,0 +1,325 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use common_macro::admin_fn; +use common_meta::rpc::procedure::MigrateFlowRequest; +use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result}; +use datafusion_expr::{Signature, TypeSignature, Volatility}; +use datatypes::arrow::datatypes::DataType as ArrowDataType; +use datatypes::data_type::DataType; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::{Value, ValueRef}; +use session::context::QueryContextRef; +use snafu::ensure; + +use crate::handlers::ProcedureServiceHandlerRef; +use crate::helper::cast_u64; + +/// The default timeout for migrate flow procedure. +const DEFAULT_TIMEOUT_SECS: u64 = 300; + +/// A function to migrate a flow partition from source flownode to target flownode. +/// Returns the submitted procedure id if success. +/// +/// - `migrate_flow(catalog, flow_id, partition_id, from_flownode, to_flownode)`, +/// with timeout(300 seconds). +/// - `migrate_flow(catalog, flow_id, partition_id, from_flownode, to_flownode, timeout(secs))`. +/// +/// The parameters: +/// - `catalog`: the catalog name of the flow +/// - `flow_id`: the flow id +/// - `partition_id`: the flow partition id +/// - `from_flownode`: the source flownode id +/// - `to_flownode`: the target flownode id +#[admin_fn( + name = MigrateFlowFunction, + display_name = migrate_flow, + sig_fn = signature, + ret = string +)] +pub(crate) async fn migrate_flow( + procedure_service_handler: &ProcedureServiceHandlerRef, + _ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + let Some(request) = parse_migrate_flow_request(params)? else { + return Ok(Value::Null); + }; + + let pid = procedure_service_handler.migrate_flow(request).await?; + + match pid { + Some(pid) => Ok(Value::from(pid)), + None => Ok(Value::Null), + } +} + +fn parse_migrate_flow_request(params: &[ValueRef<'_>]) -> Result> { + ensure!( + matches!(params.len(), 5 | 6), + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly 5 or 6, have: {}", + params.len() + ), + } + ); + + let ValueRef::String(catalog) = params[0] else { + return common_query::error::UnsupportedInputDataTypeSnafu { + function: "migrate_flow", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + + let flow_id = cast_u32_arg(params, 1, "flow_id")?; + let partition_id = cast_u32_arg(params, 2, "partition_id")?; + let from_flownode = cast_u64(¶ms[3])?; + let to_flownode = cast_u64(¶ms[4])?; + let timeout = match params.len() { + 5 => Some(DEFAULT_TIMEOUT_SECS), + 6 => cast_u64(¶ms[5])?, + _ => unreachable!(), + }; + + match (flow_id, partition_id, from_flownode, to_flownode, timeout) { + ( + Some(flow_id), + Some(partition_id), + Some(from_flownode), + Some(to_flownode), + Some(timeout), + ) => Ok(Some(MigrateFlowRequest { + catalog: catalog.to_string(), + flow_id, + partition_id, + from_flownode, + to_flownode, + timeout: Duration::from_secs(timeout), + })), + _ => Ok(None), + } +} + +fn cast_u32_arg(params: &[ValueRef<'_>], index: usize, name: &str) -> Result> { + let Some(value) = cast_u64(¶ms[index])? else { + return Ok(None); + }; + + ensure!( + u32::try_from(value).is_ok(), + InvalidFuncArgsSnafu { + err_msg: format!("The {} is out of range for uint32: {}", name, value), + } + ); + + Ok(Some(value as u32)) +} + +fn signature() -> Signature { + let numerics = ConcreteDataType::numerics() + .into_iter() + .map(|dt| dt.as_arrow_type()) + .collect::>(); + let mut signatures = Vec::with_capacity(numerics.len() * 2); + + for data_type in numerics { + signatures.push(TypeSignature::Exact(vec![ + ArrowDataType::Utf8, + data_type.clone(), + data_type.clone(), + data_type.clone(), + data_type.clone(), + ])); + signatures.push(TypeSignature::Exact(vec![ + ArrowDataType::Utf8, + data_type.clone(), + data_type.clone(), + data_type.clone(), + data_type.clone(), + data_type, + ])); + } + + Signature::one_of(signatures, Volatility::Immutable) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow::array::{StringArray, UInt64Array}; + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ColumnarValue; + + use super::*; + use crate::function::FunctionContext; + use crate::function_factory::ScalarFunctionFactory; + + #[test] + fn test_migrate_flow_misc() { + let factory: ScalarFunctionFactory = MigrateFlowFunction::factory().into(); + let f = factory.provide(FunctionContext::mock()); + + assert_eq!("migrate_flow", f.name()); + assert_eq!(DataType::Utf8, f.return_type(&[]).unwrap()); + assert!(matches!(f.signature(), + datafusion_expr::Signature { + type_signature: datafusion_expr::TypeSignature::OneOf(sigs), + volatility: datafusion_expr::Volatility::Immutable, + .. + } if !sigs.is_empty())); + } + + #[test] + fn test_parse_migrate_flow_request() { + let params = vec![ + ValueRef::String("greptime"), + ValueRef::UInt32(1024), + ValueRef::UInt32(0), + ValueRef::UInt64(1), + ValueRef::UInt64(2), + ]; + + let request = parse_migrate_flow_request(¶ms).unwrap().unwrap(); + + assert_eq!("greptime", request.catalog); + assert_eq!(1024, request.flow_id); + assert_eq!(0, request.partition_id); + assert_eq!(1, request.from_flownode); + assert_eq!(2, request.to_flownode); + assert_eq!(Duration::from_secs(DEFAULT_TIMEOUT_SECS), request.timeout); + } + + #[test] + fn test_parse_migrate_flow_request_with_timeout() { + let params = vec![ + ValueRef::String("greptime"), + ValueRef::UInt32(1024), + ValueRef::UInt32(0), + ValueRef::UInt64(1), + ValueRef::UInt64(2), + ValueRef::UInt64(60), + ]; + + let request = parse_migrate_flow_request(¶ms).unwrap().unwrap(); + + assert_eq!(Duration::from_secs(60), request.timeout); + } + + #[test] + fn test_parse_migrate_flow_request_with_nullable_arg() { + let params = vec![ + ValueRef::String("greptime"), + ValueRef::Null, + ValueRef::UInt32(0), + ValueRef::UInt64(1), + ValueRef::UInt64(2), + ]; + + assert!(parse_migrate_flow_request(¶ms).unwrap().is_none()); + } + + #[test] + fn test_parse_migrate_flow_request_rejects_bad_args() { + let params = vec![ValueRef::String("greptime")]; + + let err = parse_migrate_flow_request(¶ms).unwrap_err(); + assert!(err.to_string().contains("expect exactly 5 or 6")); + + let params = vec![ + ValueRef::String("greptime"), + ValueRef::UInt64(u64::from(u32::MAX) + 1), + ValueRef::UInt32(0), + ValueRef::UInt64(1), + ValueRef::UInt64(2), + ]; + let err = parse_migrate_flow_request(¶ms).unwrap_err(); + assert!(err.to_string().contains("flow_id is out of range")); + + let params = vec![ + ValueRef::UInt64(1), + ValueRef::UInt32(1024), + ValueRef::UInt32(0), + ValueRef::UInt64(1), + ValueRef::UInt64(2), + ]; + let err = parse_migrate_flow_request(¶ms).unwrap_err(); + assert!(err.to_string().contains("Unsupported input datatypes")); + } + + #[tokio::test] + async fn test_missing_procedure_service() { + let factory: ScalarFunctionFactory = MigrateFlowFunction::factory().into(); + let provider = factory.provide(FunctionContext::default()); + let f = provider.as_async().unwrap(); + + let result = f + .invoke_async_with_args(build_func_args()) + .await + .unwrap_err(); + + assert_eq!( + "Execution error: Missing ProcedureServiceHandler, not expected", + result.to_string() + ); + } + + #[tokio::test] + async fn test_migrate_flow() { + let factory: ScalarFunctionFactory = MigrateFlowFunction::factory().into(); + let provider = factory.provide(FunctionContext::mock()); + let f = provider.as_async().unwrap(); + + let result = f.invoke_async_with_args(build_func_args()).await.unwrap(); + + match result { + ColumnarValue::Array(array) => { + let result_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(result_array.value(0), "test_pid"); + } + ColumnarValue::Scalar(scalar) => { + assert_eq!( + scalar, + datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string())) + ); + } + } + } + + fn build_func_args() -> datafusion::logical_expr::ScalarFunctionArgs { + datafusion::logical_expr::ScalarFunctionArgs { + args: vec![ + ColumnarValue::Array(Arc::new(StringArray::from(vec!["greptime"]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1024]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![0]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), + ColumnarValue::Array(Arc::new(UInt64Array::from(vec![2]))), + ], + arg_fields: vec![ + Arc::new(Field::new("arg_0", DataType::Utf8, false)), + Arc::new(Field::new("arg_1", DataType::UInt64, false)), + Arc::new(Field::new("arg_2", DataType::UInt64, false)), + Arc::new(Field::new("arg_3", DataType::UInt64, false)), + Arc::new(Field::new("arg_4", DataType::UInt64, false)), + ], + return_field: Arc::new(Field::new("result", DataType::Utf8, true)), + number_rows: 1, + config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), + } + } +} diff --git a/src/common/function/src/function_registry.rs b/src/common/function/src/function_registry.rs index a3beca5403..185b709dff 100644 --- a/src/common/function/src/function_registry.rs +++ b/src/common/function/src/function_registry.rs @@ -236,4 +236,13 @@ mod tests { let _ = registry.get_function("test_and").unwrap(); assert_eq!(1, registry.scalar_functions().len()); } + + #[test] + fn test_admin_registers_migrate_flow() { + let registry = FunctionRegistry::default(); + + AdminFunction::register(®istry); + + assert!(registry.get_function("migrate_flow").is_some()); + } } diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index 50318fd14f..713003d9cd 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -20,8 +20,8 @@ use catalog::CatalogManagerRef; use common_base::AffectedRows; use common_meta::rpc::procedure::{ GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse, - GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, - ProcedureStateResponse, + GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateFlowRequest, + MigrateRegionRequest, ProcedureStateResponse, }; use common_query::Output; use common_query::error::Result; @@ -76,6 +76,9 @@ pub trait ProcedureServiceHandler: Send + Sync { /// Migrate a region from source peer to target peer, returns the procedure id if success. async fn migrate_region(&self, request: MigrateRegionRequest) -> Result>; + /// Migrate a flow partition from source flownode to target flownode. + async fn migrate_flow(&self, request: MigrateFlowRequest) -> Result>; + /// Reconcile a table, database or catalog, returns the procedure id if success. async fn reconcile(&self, request: ReconcileRequest) -> Result>; diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index 06dff44e79..c106a377f5 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -38,7 +38,7 @@ impl FunctionState { use common_base::AffectedRows; use common_meta::rpc::procedure::{ GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest, - MigrateRegionRequest, ProcedureStateResponse, + MigrateFlowRequest, MigrateRegionRequest, ProcedureStateResponse, }; use common_query::Output; use common_query::error::Result; @@ -64,6 +64,10 @@ impl FunctionState { Ok(Some("test_pid".to_string())) } + async fn migrate_flow(&self, _request: MigrateFlowRequest) -> Result> { + Ok(Some("test_pid".to_string())) + } + async fn reconcile(&self, _request: ReconcileRequest) -> Result> { Ok(Some("test_pid".to_string())) } diff --git a/src/common/meta/src/procedure_executor.rs b/src/common/meta/src/procedure_executor.rs index c0165dd80f..9477131989 100644 --- a/src/common/meta/src/procedure_executor.rs +++ b/src/common/meta/src/procedure_executor.rs @@ -26,7 +26,8 @@ use crate::error::{ use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{ self, GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest, - MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, + MigrateFlowRequest, MigrateFlowResponse, MigrateRegionRequest, MigrateRegionResponse, + ProcedureStateResponse, }; /// The context of procedure executor. @@ -64,6 +65,18 @@ pub trait ProcedureExecutor: Send + Sync { request: MigrateRegionRequest, ) -> Result; + /// Submit a flow migration task. + async fn migrate_flow( + &self, + _ctx: &ExecutorContext, + _request: MigrateFlowRequest, + ) -> Result { + UnsupportedSnafu { + operation: "migrate_flow", + } + .fail() + } + /// Submit a reconcile task. async fn reconcile( &self, diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs index baab254fef..856d4b6f2a 100644 --- a/src/common/meta/src/rpc/procedure.rs +++ b/src/common/meta/src/rpc/procedure.rs @@ -25,7 +25,9 @@ use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState}; use snafu::ResultExt; use table::metadata::TableId; +use crate::FlownodeId; use crate::error::{ParseProcedureIdSnafu, Result}; +use crate::key::{FlowId, FlowPartitionId}; /// A request to migrate region. #[derive(Clone)] @@ -36,6 +38,23 @@ pub struct MigrateRegionRequest { pub timeout: Duration, } +/// A request to migrate a flow partition from one flownode to another. +#[derive(Debug, Clone)] +pub struct MigrateFlowRequest { + pub catalog: String, + pub flow_id: FlowId, + pub partition_id: FlowPartitionId, + pub from_flownode: FlownodeId, + pub to_flownode: FlownodeId, + pub timeout: Duration, +} + +/// A response for flow migration procedure submission. +#[derive(Debug, Clone, Default)] +pub struct MigrateFlowResponse { + pub pid: Option, +} + /// A request to add region follower. #[derive(Debug, Clone)] pub struct AddRegionFollowerRequest { diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 4a79105615..90bc4710c9 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -49,8 +49,9 @@ use common_meta::rpc::KeyValue; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::procedure::{ AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse, - GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, - ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest, + GcTableRequest, ManageRegionFollowerRequest, MigrateFlowRequest, MigrateFlowResponse, + MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, + RemoveRegionFollowerRequest, RemoveTableFollowerRequest, }; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, @@ -90,6 +91,7 @@ pub struct MetaClientBuilder { enable_procedure: bool, enable_access_cluster_info: bool, region_follower: Option, + flow_migration: Option, channel_manager: Option, ddl_channel_manager: Option, /// The default ddl timeout for each request. @@ -195,6 +197,13 @@ impl MetaClientBuilder { } } + pub fn with_flow_migration(self, flow_migration: FlowMigrationClientRef) -> Self { + Self { + flow_migration: Some(flow_migration), + ..self + } + } + pub fn build(self) -> MetaClient { let mgr = self.channel_manager.unwrap_or_default(); let heartbeat_channel_manager = self @@ -229,6 +238,7 @@ impl MetaClientBuilder { .enable_access_cluster_info .then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY)); let region_follower = self.region_follower.clone(); + let flow_migration = self.flow_migration.clone(); MetaClient { id: self.id, @@ -245,6 +255,7 @@ impl MetaClientBuilder { procedure, cluster, region_follower, + flow_migration, } } } @@ -260,6 +271,7 @@ pub struct MetaClient { procedure: Option, cluster: Option, region_follower: Option, + flow_migration: Option, } impl MetaClient { @@ -279,6 +291,7 @@ impl MetaClient { procedure: None, cluster: None, region_follower: None, + flow_migration: None, } } } @@ -301,6 +314,18 @@ pub trait RegionFollowerClient: Sync + Send + Debug { async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>; } +pub type FlowMigrationClientRef = Arc; + +/// A trait for clients that can migrate flow partitions. +#[async_trait::async_trait] +pub trait FlowMigrationClient: Sync + Send + Debug { + async fn migrate_flow(&self, request: MigrateFlowRequest) -> Result; + + async fn start(&self, urls: &[&str]) -> Result<()>; + + async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>; +} + #[async_trait::async_trait] impl ProcedureExecutor for MetaClient { async fn submit_ddl_task( @@ -376,6 +401,25 @@ impl ProcedureExecutor for MetaClient { } } + async fn migrate_flow( + &self, + _ctx: &ExecutorContext, + request: MigrateFlowRequest, + ) -> MetaResult { + if let Some(flow_migration) = &self.flow_migration { + flow_migration + .migrate_flow(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } else { + UnsupportedSnafu { + operation: "migrate_flow", + } + .fail() + } + } + async fn query_procedure_state( &self, _ctx: &ExecutorContext, @@ -588,6 +632,11 @@ impl MetaClient { client.start_with(leader_provider.clone()).await?; } + if let Some(client) = &self.flow_migration { + info!("Starting flow migration client ..."); + client.start_with(leader_provider.clone()).await?; + } + if let Some(client) = &self.heartbeat { info!("Starting heartbeat client ..."); client.start_with(leader_provider.clone()).await?; @@ -804,9 +853,12 @@ impl MetaClient { #[cfg(test)] mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Mutex}; use api::v1::meta::{HeartbeatRequest, Peer}; + use common_meta::error as meta_error; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; + use common_meta::rpc::procedure::pb_pid_to_pid; use rand::Rng; use super::*; @@ -815,6 +867,41 @@ mod tests { const TEST_KEY_PREFIX: &str = "__unit_test__meta__"; + #[derive(Debug)] + struct MockFlowMigrationClient { + calls: Arc, + last_request: Arc>>, + response: MigrateFlowResponse, + } + + #[async_trait::async_trait] + impl FlowMigrationClient for MockFlowMigrationClient { + async fn migrate_flow(&self, request: MigrateFlowRequest) -> Result { + self.calls.fetch_add(1, Ordering::Relaxed); + *self.last_request.lock().unwrap() = Some(request); + Ok(self.response.clone()) + } + + async fn start(&self, _urls: &[&str]) -> Result<()> { + Ok(()) + } + + async fn start_with(&self, _leader_provider: LeaderProviderRef) -> Result<()> { + Ok(()) + } + } + + fn mock_migrate_flow_request() -> MigrateFlowRequest { + MigrateFlowRequest { + catalog: "greptime".to_string(), + flow_id: 1, + partition_id: 0, + from_flownode: 1, + to_flownode: 2, + timeout: Duration::from_secs(300), + } + } + struct TestClient { ns: String, client: MetaClient, @@ -904,6 +991,61 @@ mod tests { meta_client.start(urls).await.unwrap(); } + #[tokio::test] + async fn test_migrate_flow_returns_unsupported_without_flow_client() { + let meta_client = MetaClientBuilder::frontend_default_options().build(); + + let err = ProcedureExecutor::migrate_flow( + &meta_client, + &ExecutorContext::default(), + mock_migrate_flow_request(), + ) + .await + .unwrap_err(); + + assert!(matches!( + err, + meta_error::Error::Unsupported { operation, .. } if operation == "migrate_flow" + )); + } + + #[tokio::test] + async fn test_migrate_flow_forwards_to_flow_client() { + let calls = Arc::new(AtomicUsize::new(0)); + let last_request = Arc::new(Mutex::new(None)); + let pid = pb_pid_to_pid(&api::v1::meta::ProcedureId { + key: b"00000000-0000-0000-0000-000000000001".to_vec(), + }) + .unwrap(); + let flow_migration: FlowMigrationClientRef = Arc::new(MockFlowMigrationClient { + calls: calls.clone(), + last_request: last_request.clone(), + response: MigrateFlowResponse { pid: Some(pid) }, + }); + let meta_client = MetaClientBuilder::frontend_default_options() + .with_flow_migration(flow_migration) + .build(); + let request = mock_migrate_flow_request(); + + let response = ProcedureExecutor::migrate_flow( + &meta_client, + &ExecutorContext::default(), + request.clone(), + ) + .await + .unwrap(); + + assert_eq!(Some(pid), response.pid); + assert_eq!(1, calls.load(Ordering::Relaxed)); + let forwarded_request = last_request.lock().unwrap().clone().unwrap(); + assert_eq!(request.catalog, forwarded_request.catalog); + assert_eq!(request.flow_id, forwarded_request.flow_id); + assert_eq!(request.partition_id, forwarded_request.partition_id); + assert_eq!(request.from_flownode, forwarded_request.from_flownode); + assert_eq!(request.to_flownode, forwarded_request.to_flownode); + assert_eq!(request.timeout, forwarded_request.timeout); + } + #[tokio::test] async fn test_not_start_heartbeat_client() { let urls = &["127.0.0.1:3001", "127.0.0.1:3002"]; diff --git a/src/meta-client/src/lib.rs b/src/meta-client/src/lib.rs index 91c3e8e0b2..b8483e185d 100644 --- a/src/meta-client/src/lib.rs +++ b/src/meta-client/src/lib.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use std::time::Duration; -use client::RegionFollowerClientRef; +use client::{FlowMigrationClientRef, RegionFollowerClientRef}; use common_base::Plugins; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::distributed_time_constants::{ @@ -117,6 +117,12 @@ pub async fn create_meta_client( debug!("Region follower client found in plugins"); builder = builder.with_region_follower(region_follower); } + + let flow_migration = plugins.get::(); + if let Some(flow_migration) = flow_migration { + debug!("Flow migration client found in plugins"); + builder = builder.with_flow_migration(flow_migration); + } } } diff --git a/src/operator/src/procedure.rs b/src/operator/src/procedure.rs index d98c09b77c..d7b7cc3dca 100644 --- a/src/operator/src/procedure.rs +++ b/src/operator/src/procedure.rs @@ -20,8 +20,8 @@ use common_function::handlers::ProcedureServiceHandler; use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef}; use common_meta::rpc::procedure::{ GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse, - GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, - ProcedureStateResponse, + GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateFlowRequest, + MigrateRegionRequest, ProcedureStateResponse, }; use common_query::error as query_error; use common_query::error::Result as QueryResult; @@ -59,6 +59,17 @@ impl ProcedureServiceHandler for ProcedureServiceOperator { .map(|pid| String::from_utf8_lossy(&pid.key).to_string())) } + async fn migrate_flow(&self, request: MigrateFlowRequest) -> QueryResult> { + Ok(self + .procedure_executor + .migrate_flow(&ExecutorContext::default(), request) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu)? + .pid + .map(|pid| pid.to_string())) + } + async fn reconcile(&self, request: ReconcileRequest) -> QueryResult> { Ok(self .procedure_executor diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 7d8c01f048..28bb55a200 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -7,7 +7,7 @@ license.workspace = true [features] default = [] dashboard = ["dep:rust-embed"] -enterprise = ["sql/enterprise"] +enterprise = ["operator/enterprise", "sql/enterprise"] mem-prof = ["dep:common-mem-prof"] pprof = ["dep:common-pprof"] testing = []