feat: add migrate flow admin plumbing

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-05-25 16:24:15 +08:00
parent dc2f2cbfae
commit a285e576b1
11 changed files with 545 additions and 10 deletions

View File

@@ -16,6 +16,7 @@ mod build_index_table;
mod flush_compact_region;
mod flush_compact_table;
mod gc;
mod migrate_flow;
mod migrate_region;
mod reconcile_catalog;
mod reconcile_database;
@@ -24,6 +25,7 @@ mod reconcile_table;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use gc::{GcRegionsFunction, GcTableFunction};
use migrate_flow::MigrateFlowFunction;
use migrate_region::MigrateRegionFunction;
use reconcile_catalog::ReconcileCatalogFunction;
use reconcile_database::ReconcileDatabaseFunction;
@@ -48,6 +50,7 @@ impl AdminFunction {
registry.register(GcTableFunction::factory());
registry.register(BuildIndexFunction::factory());
registry.register(FlushFlowFunction::factory());
registry.register(MigrateFlowFunction::factory());
registry.register(ReconcileCatalogFunction::factory());
registry.register(ReconcileDatabaseFunction::factory());
registry.register(ReconcileTableFunction::factory());

View File

@@ -0,0 +1,325 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use common_macro::admin_fn;
use common_meta::rpc::procedure::MigrateFlowRequest;
use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::arrow::datatypes::DataType as ArrowDataType;
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
/// The default timeout for migrate flow procedure.
const DEFAULT_TIMEOUT_SECS: u64 = 300;
/// A function to migrate a flow partition from source flownode to target flownode.
/// Returns the submitted procedure id if success.
///
/// - `migrate_flow(catalog, flow_id, partition_id, from_flownode, to_flownode)`,
/// with timeout(300 seconds).
/// - `migrate_flow(catalog, flow_id, partition_id, from_flownode, to_flownode, timeout(secs))`.
///
/// The parameters:
/// - `catalog`: the catalog name of the flow
/// - `flow_id`: the flow id
/// - `partition_id`: the flow partition id
/// - `from_flownode`: the source flownode id
/// - `to_flownode`: the target flownode id
#[admin_fn(
name = MigrateFlowFunction,
display_name = migrate_flow,
sig_fn = signature,
ret = string
)]
pub(crate) async fn migrate_flow(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let Some(request) = parse_migrate_flow_request(params)? else {
return Ok(Value::Null);
};
let pid = procedure_service_handler.migrate_flow(request).await?;
match pid {
Some(pid) => Ok(Value::from(pid)),
None => Ok(Value::Null),
}
}
fn parse_migrate_flow_request(params: &[ValueRef<'_>]) -> Result<Option<MigrateFlowRequest>> {
ensure!(
matches!(params.len(), 5 | 6),
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 5 or 6, have: {}",
params.len()
),
}
);
let ValueRef::String(catalog) = params[0] else {
return common_query::error::UnsupportedInputDataTypeSnafu {
function: "migrate_flow",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let flow_id = cast_u32_arg(params, 1, "flow_id")?;
let partition_id = cast_u32_arg(params, 2, "partition_id")?;
let from_flownode = cast_u64(&params[3])?;
let to_flownode = cast_u64(&params[4])?;
let timeout = match params.len() {
5 => Some(DEFAULT_TIMEOUT_SECS),
6 => cast_u64(&params[5])?,
_ => unreachable!(),
};
match (flow_id, partition_id, from_flownode, to_flownode, timeout) {
(
Some(flow_id),
Some(partition_id),
Some(from_flownode),
Some(to_flownode),
Some(timeout),
) => Ok(Some(MigrateFlowRequest {
catalog: catalog.to_string(),
flow_id,
partition_id,
from_flownode,
to_flownode,
timeout: Duration::from_secs(timeout),
})),
_ => Ok(None),
}
}
fn cast_u32_arg(params: &[ValueRef<'_>], index: usize, name: &str) -> Result<Option<u32>> {
let Some(value) = cast_u64(&params[index])? else {
return Ok(None);
};
ensure!(
u32::try_from(value).is_ok(),
InvalidFuncArgsSnafu {
err_msg: format!("The {} is out of range for uint32: {}", name, value),
}
);
Ok(Some(value as u32))
}
fn signature() -> Signature {
let numerics = ConcreteDataType::numerics()
.into_iter()
.map(|dt| dt.as_arrow_type())
.collect::<Vec<_>>();
let mut signatures = Vec::with_capacity(numerics.len() * 2);
for data_type in numerics {
signatures.push(TypeSignature::Exact(vec![
ArrowDataType::Utf8,
data_type.clone(),
data_type.clone(),
data_type.clone(),
data_type.clone(),
]));
signatures.push(TypeSignature::Exact(vec![
ArrowDataType::Utf8,
data_type.clone(),
data_type.clone(),
data_type.clone(),
data_type.clone(),
data_type,
]));
}
Signature::one_of(signatures, Volatility::Immutable)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::{StringArray, UInt64Array};
use arrow::datatypes::{DataType, Field};
use datafusion_expr::ColumnarValue;
use super::*;
use crate::function::FunctionContext;
use crate::function_factory::ScalarFunctionFactory;
#[test]
fn test_migrate_flow_misc() {
let factory: ScalarFunctionFactory = MigrateFlowFunction::factory().into();
let f = factory.provide(FunctionContext::mock());
assert_eq!("migrate_flow", f.name());
assert_eq!(DataType::Utf8, f.return_type(&[]).unwrap());
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::OneOf(sigs),
volatility: datafusion_expr::Volatility::Immutable,
..
} if !sigs.is_empty()));
}
#[test]
fn test_parse_migrate_flow_request() {
let params = vec![
ValueRef::String("greptime"),
ValueRef::UInt32(1024),
ValueRef::UInt32(0),
ValueRef::UInt64(1),
ValueRef::UInt64(2),
];
let request = parse_migrate_flow_request(&params).unwrap().unwrap();
assert_eq!("greptime", request.catalog);
assert_eq!(1024, request.flow_id);
assert_eq!(0, request.partition_id);
assert_eq!(1, request.from_flownode);
assert_eq!(2, request.to_flownode);
assert_eq!(Duration::from_secs(DEFAULT_TIMEOUT_SECS), request.timeout);
}
#[test]
fn test_parse_migrate_flow_request_with_timeout() {
let params = vec![
ValueRef::String("greptime"),
ValueRef::UInt32(1024),
ValueRef::UInt32(0),
ValueRef::UInt64(1),
ValueRef::UInt64(2),
ValueRef::UInt64(60),
];
let request = parse_migrate_flow_request(&params).unwrap().unwrap();
assert_eq!(Duration::from_secs(60), request.timeout);
}
#[test]
fn test_parse_migrate_flow_request_with_nullable_arg() {
let params = vec![
ValueRef::String("greptime"),
ValueRef::Null,
ValueRef::UInt32(0),
ValueRef::UInt64(1),
ValueRef::UInt64(2),
];
assert!(parse_migrate_flow_request(&params).unwrap().is_none());
}
#[test]
fn test_parse_migrate_flow_request_rejects_bad_args() {
let params = vec![ValueRef::String("greptime")];
let err = parse_migrate_flow_request(&params).unwrap_err();
assert!(err.to_string().contains("expect exactly 5 or 6"));
let params = vec![
ValueRef::String("greptime"),
ValueRef::UInt64(u64::from(u32::MAX) + 1),
ValueRef::UInt32(0),
ValueRef::UInt64(1),
ValueRef::UInt64(2),
];
let err = parse_migrate_flow_request(&params).unwrap_err();
assert!(err.to_string().contains("flow_id is out of range"));
let params = vec![
ValueRef::UInt64(1),
ValueRef::UInt32(1024),
ValueRef::UInt32(0),
ValueRef::UInt64(1),
ValueRef::UInt64(2),
];
let err = parse_migrate_flow_request(&params).unwrap_err();
assert!(err.to_string().contains("Unsupported input datatypes"));
}
#[tokio::test]
async fn test_missing_procedure_service() {
let factory: ScalarFunctionFactory = MigrateFlowFunction::factory().into();
let provider = factory.provide(FunctionContext::default());
let f = provider.as_async().unwrap();
let result = f
.invoke_async_with_args(build_func_args())
.await
.unwrap_err();
assert_eq!(
"Execution error: Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}
#[tokio::test]
async fn test_migrate_flow() {
let factory: ScalarFunctionFactory = MigrateFlowFunction::factory().into();
let provider = factory.provide(FunctionContext::mock());
let f = provider.as_async().unwrap();
let result = f.invoke_async_with_args(build_func_args()).await.unwrap();
match result {
ColumnarValue::Array(array) => {
let result_array = array.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(result_array.value(0), "test_pid");
}
ColumnarValue::Scalar(scalar) => {
assert_eq!(
scalar,
datafusion_common::ScalarValue::Utf8(Some("test_pid".to_string()))
);
}
}
}
fn build_func_args() -> datafusion::logical_expr::ScalarFunctionArgs {
datafusion::logical_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(StringArray::from(vec!["greptime"]))),
ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1024]))),
ColumnarValue::Array(Arc::new(UInt64Array::from(vec![0]))),
ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))),
ColumnarValue::Array(Arc::new(UInt64Array::from(vec![2]))),
],
arg_fields: vec![
Arc::new(Field::new("arg_0", DataType::Utf8, false)),
Arc::new(Field::new("arg_1", DataType::UInt64, false)),
Arc::new(Field::new("arg_2", DataType::UInt64, false)),
Arc::new(Field::new("arg_3", DataType::UInt64, false)),
Arc::new(Field::new("arg_4", DataType::UInt64, false)),
],
return_field: Arc::new(Field::new("result", DataType::Utf8, true)),
number_rows: 1,
config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
}
}
}

View File

@@ -236,4 +236,13 @@ mod tests {
let _ = registry.get_function("test_and").unwrap();
assert_eq!(1, registry.scalar_functions().len());
}
#[test]
fn test_admin_registers_migrate_flow() {
let registry = FunctionRegistry::default();
AdminFunction::register(&registry);
assert!(registry.get_function("migrate_flow").is_some());
}
}

View File

@@ -20,8 +20,8 @@ use catalog::CatalogManagerRef;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest,
ProcedureStateResponse,
GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateFlowRequest,
MigrateRegionRequest, ProcedureStateResponse,
};
use common_query::Output;
use common_query::error::Result;
@@ -76,6 +76,9 @@ pub trait ProcedureServiceHandler: Send + Sync {
/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(&self, request: MigrateRegionRequest) -> Result<Option<String>>;
/// Migrate a flow partition from source flownode to target flownode.
async fn migrate_flow(&self, request: MigrateFlowRequest) -> Result<Option<String>>;
/// Reconcile a table, database or catalog, returns the procedure id if success.
async fn reconcile(&self, request: ReconcileRequest) -> Result<Option<String>>;

View File

@@ -38,7 +38,7 @@ impl FunctionState {
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
MigrateRegionRequest, ProcedureStateResponse,
MigrateFlowRequest, MigrateRegionRequest, ProcedureStateResponse,
};
use common_query::Output;
use common_query::error::Result;
@@ -64,6 +64,10 @@ impl FunctionState {
Ok(Some("test_pid".to_string()))
}
async fn migrate_flow(&self, _request: MigrateFlowRequest) -> Result<Option<String>> {
Ok(Some("test_pid".to_string()))
}
async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
Ok(Some("test_pid".to_string()))
}

View File

@@ -26,7 +26,8 @@ use crate::error::{
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{
self, GcRegionsRequest, GcResponse, GcTableRequest, ManageRegionFollowerRequest,
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
MigrateFlowRequest, MigrateFlowResponse, MigrateRegionRequest, MigrateRegionResponse,
ProcedureStateResponse,
};
/// The context of procedure executor.
@@ -64,6 +65,18 @@ pub trait ProcedureExecutor: Send + Sync {
request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse>;
/// Submit a flow migration task.
async fn migrate_flow(
&self,
_ctx: &ExecutorContext,
_request: MigrateFlowRequest,
) -> Result<MigrateFlowResponse> {
UnsupportedSnafu {
operation: "migrate_flow",
}
.fail()
}
/// Submit a reconcile task.
async fn reconcile(
&self,

View File

@@ -25,7 +25,9 @@ use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState};
use snafu::ResultExt;
use table::metadata::TableId;
use crate::FlownodeId;
use crate::error::{ParseProcedureIdSnafu, Result};
use crate::key::{FlowId, FlowPartitionId};
/// A request to migrate region.
#[derive(Clone)]
@@ -36,6 +38,23 @@ pub struct MigrateRegionRequest {
pub timeout: Duration,
}
/// A request to migrate a flow partition from one flownode to another.
#[derive(Debug, Clone)]
pub struct MigrateFlowRequest {
pub catalog: String,
pub flow_id: FlowId,
pub partition_id: FlowPartitionId,
pub from_flownode: FlownodeId,
pub to_flownode: FlownodeId,
pub timeout: Duration,
}
/// A response for flow migration procedure submission.
#[derive(Debug, Clone, Default)]
pub struct MigrateFlowResponse {
pub pid: Option<ProcedureId>,
}
/// A request to add region follower.
#[derive(Debug, Clone)]
pub struct AddRegionFollowerRequest {

View File

@@ -49,8 +49,9 @@ use common_meta::rpc::KeyValue;
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, AddTableFollowerRequest, GcRegionsRequest, GcResponse,
GcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
ProcedureStateResponse, RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
GcTableRequest, ManageRegionFollowerRequest, MigrateFlowRequest, MigrateFlowResponse,
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -90,6 +91,7 @@ pub struct MetaClientBuilder {
enable_procedure: bool,
enable_access_cluster_info: bool,
region_follower: Option<RegionFollowerClientRef>,
flow_migration: Option<FlowMigrationClientRef>,
channel_manager: Option<ChannelManager>,
ddl_channel_manager: Option<ChannelManager>,
/// The default ddl timeout for each request.
@@ -195,6 +197,13 @@ impl MetaClientBuilder {
}
}
pub fn with_flow_migration(self, flow_migration: FlowMigrationClientRef) -> Self {
Self {
flow_migration: Some(flow_migration),
..self
}
}
pub fn build(self) -> MetaClient {
let mgr = self.channel_manager.unwrap_or_default();
let heartbeat_channel_manager = self
@@ -229,6 +238,7 @@ impl MetaClientBuilder {
.enable_access_cluster_info
.then(|| ClusterClient::new(mgr.clone(), DEFAULT_CLUSTER_CLIENT_MAX_RETRY));
let region_follower = self.region_follower.clone();
let flow_migration = self.flow_migration.clone();
MetaClient {
id: self.id,
@@ -245,6 +255,7 @@ impl MetaClientBuilder {
procedure,
cluster,
region_follower,
flow_migration,
}
}
}
@@ -260,6 +271,7 @@ pub struct MetaClient {
procedure: Option<ProcedureClient>,
cluster: Option<ClusterClient>,
region_follower: Option<RegionFollowerClientRef>,
flow_migration: Option<FlowMigrationClientRef>,
}
impl MetaClient {
@@ -279,6 +291,7 @@ impl MetaClient {
procedure: None,
cluster: None,
region_follower: None,
flow_migration: None,
}
}
}
@@ -301,6 +314,18 @@ pub trait RegionFollowerClient: Sync + Send + Debug {
async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
}
pub type FlowMigrationClientRef = Arc<dyn FlowMigrationClient>;
/// A trait for clients that can migrate flow partitions.
#[async_trait::async_trait]
pub trait FlowMigrationClient: Sync + Send + Debug {
async fn migrate_flow(&self, request: MigrateFlowRequest) -> Result<MigrateFlowResponse>;
async fn start(&self, urls: &[&str]) -> Result<()>;
async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
}
#[async_trait::async_trait]
impl ProcedureExecutor for MetaClient {
async fn submit_ddl_task(
@@ -376,6 +401,25 @@ impl ProcedureExecutor for MetaClient {
}
}
async fn migrate_flow(
&self,
_ctx: &ExecutorContext,
request: MigrateFlowRequest,
) -> MetaResult<MigrateFlowResponse> {
if let Some(flow_migration) = &self.flow_migration {
flow_migration
.migrate_flow(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
} else {
UnsupportedSnafu {
operation: "migrate_flow",
}
.fail()
}
}
async fn query_procedure_state(
&self,
_ctx: &ExecutorContext,
@@ -588,6 +632,11 @@ impl MetaClient {
client.start_with(leader_provider.clone()).await?;
}
if let Some(client) = &self.flow_migration {
info!("Starting flow migration client ...");
client.start_with(leader_provider.clone()).await?;
}
if let Some(client) = &self.heartbeat {
info!("Starting heartbeat client ...");
client.start_with(leader_provider.clone()).await?;
@@ -804,9 +853,12 @@ impl MetaClient {
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use api::v1::meta::{HeartbeatRequest, Peer};
use common_meta::error as meta_error;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::rpc::procedure::pb_pid_to_pid;
use rand::Rng;
use super::*;
@@ -815,6 +867,41 @@ mod tests {
const TEST_KEY_PREFIX: &str = "__unit_test__meta__";
#[derive(Debug)]
struct MockFlowMigrationClient {
calls: Arc<AtomicUsize>,
last_request: Arc<Mutex<Option<MigrateFlowRequest>>>,
response: MigrateFlowResponse,
}
#[async_trait::async_trait]
impl FlowMigrationClient for MockFlowMigrationClient {
async fn migrate_flow(&self, request: MigrateFlowRequest) -> Result<MigrateFlowResponse> {
self.calls.fetch_add(1, Ordering::Relaxed);
*self.last_request.lock().unwrap() = Some(request);
Ok(self.response.clone())
}
async fn start(&self, _urls: &[&str]) -> Result<()> {
Ok(())
}
async fn start_with(&self, _leader_provider: LeaderProviderRef) -> Result<()> {
Ok(())
}
}
fn mock_migrate_flow_request() -> MigrateFlowRequest {
MigrateFlowRequest {
catalog: "greptime".to_string(),
flow_id: 1,
partition_id: 0,
from_flownode: 1,
to_flownode: 2,
timeout: Duration::from_secs(300),
}
}
struct TestClient {
ns: String,
client: MetaClient,
@@ -904,6 +991,61 @@ mod tests {
meta_client.start(urls).await.unwrap();
}
#[tokio::test]
async fn test_migrate_flow_returns_unsupported_without_flow_client() {
let meta_client = MetaClientBuilder::frontend_default_options().build();
let err = ProcedureExecutor::migrate_flow(
&meta_client,
&ExecutorContext::default(),
mock_migrate_flow_request(),
)
.await
.unwrap_err();
assert!(matches!(
err,
meta_error::Error::Unsupported { operation, .. } if operation == "migrate_flow"
));
}
#[tokio::test]
async fn test_migrate_flow_forwards_to_flow_client() {
let calls = Arc::new(AtomicUsize::new(0));
let last_request = Arc::new(Mutex::new(None));
let pid = pb_pid_to_pid(&api::v1::meta::ProcedureId {
key: b"00000000-0000-0000-0000-000000000001".to_vec(),
})
.unwrap();
let flow_migration: FlowMigrationClientRef = Arc::new(MockFlowMigrationClient {
calls: calls.clone(),
last_request: last_request.clone(),
response: MigrateFlowResponse { pid: Some(pid) },
});
let meta_client = MetaClientBuilder::frontend_default_options()
.with_flow_migration(flow_migration)
.build();
let request = mock_migrate_flow_request();
let response = ProcedureExecutor::migrate_flow(
&meta_client,
&ExecutorContext::default(),
request.clone(),
)
.await
.unwrap();
assert_eq!(Some(pid), response.pid);
assert_eq!(1, calls.load(Ordering::Relaxed));
let forwarded_request = last_request.lock().unwrap().clone().unwrap();
assert_eq!(request.catalog, forwarded_request.catalog);
assert_eq!(request.flow_id, forwarded_request.flow_id);
assert_eq!(request.partition_id, forwarded_request.partition_id);
assert_eq!(request.from_flownode, forwarded_request.from_flownode);
assert_eq!(request.to_flownode, forwarded_request.to_flownode);
assert_eq!(request.timeout, forwarded_request.timeout);
}
#[tokio::test]
async fn test_not_start_heartbeat_client() {
let urls = &["127.0.0.1:3001", "127.0.0.1:3002"];

View File

@@ -15,7 +15,7 @@
use std::sync::Arc;
use std::time::Duration;
use client::RegionFollowerClientRef;
use client::{FlowMigrationClientRef, RegionFollowerClientRef};
use common_base::Plugins;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::distributed_time_constants::{
@@ -117,6 +117,12 @@ pub async fn create_meta_client(
debug!("Region follower client found in plugins");
builder = builder.with_region_follower(region_follower);
}
let flow_migration = plugins.get::<FlowMigrationClientRef>();
if let Some(flow_migration) = flow_migration {
debug!("Flow migration client found in plugins");
builder = builder.with_flow_migration(flow_migration);
}
}
}

View File

@@ -20,8 +20,8 @@ use common_function::handlers::ProcedureServiceHandler;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
use common_meta::rpc::procedure::{
GcRegionsRequest as MetaGcRegionsRequest, GcResponse as MetaGcResponse,
GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateRegionRequest,
ProcedureStateResponse,
GcTableRequest as MetaGcTableRequest, ManageRegionFollowerRequest, MigrateFlowRequest,
MigrateRegionRequest, ProcedureStateResponse,
};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
@@ -59,6 +59,17 @@ impl ProcedureServiceHandler for ProcedureServiceOperator {
.map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
}
async fn migrate_flow(&self, request: MigrateFlowRequest) -> QueryResult<Option<String>> {
Ok(self
.procedure_executor
.migrate_flow(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)?
.pid
.map(|pid| pid.to_string()))
}
async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
Ok(self
.procedure_executor

View File

@@ -7,7 +7,7 @@ license.workspace = true
[features]
default = []
dashboard = ["dep:rust-embed"]
enterprise = ["sql/enterprise"]
enterprise = ["operator/enterprise", "sql/enterprise"]
mem-prof = ["dep:common-mem-prof"]
pprof = ["dep:common-pprof"]
testing = []