refactor: region follower management with unified interface (#6986)

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-09-17 18:01:03 +08:00
committed by GitHub
parent edf4b3f7f8
commit c35407fdce
9 changed files with 76 additions and 390 deletions

View File

@@ -12,23 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod add_region_follower;
mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;
mod reconcile_catalog;
mod reconcile_database;
mod reconcile_table;
mod remove_region_follower;
use add_region_follower::AddRegionFollowerFunction;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use reconcile_catalog::ReconcileCatalogFunction;
use reconcile_database::ReconcileDatabaseFunction;
use reconcile_table::ReconcileTableFunction;
use remove_region_follower::RemoveRegionFollowerFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
@@ -40,8 +36,6 @@ impl AdminFunction {
/// Register all admin functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register(MigrateRegionFunction::factory());
registry.register(AddRegionFollowerFunction::factory());
registry.register(RemoveRegionFollowerFunction::factory());
registry.register(FlushRegionFunction::factory());
registry.register(CompactRegionFunction::factory());
registry.register(FlushTableFunction::factory());

View File

@@ -1,155 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_meta::rpc::procedure::AddRegionFollowerRequest;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
/// A function to add a follower to a region.
/// Only available in cluster mode.
///
/// - `add_region_follower(region_id, peer_id)`.
///
/// The parameters:
/// - `region_id`: the region id
/// - `peer_id`: the peer id
#[admin_fn(
name = AddRegionFollowerFunction,
display_name = add_region_follower,
sig_fn = signature,
ret = uint64
)]
pub(crate) async fn add_region_follower(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let Some(peer_id) = cast_u64(&params[1])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
procedure_service_handler
.add_region_follower(AddRegionFollowerRequest { region_id, peer_id })
.await?;
Ok(Value::from(0u64))
}
fn signature() -> Signature {
Signature::one_of(
vec![
// add_region_follower(region_id, peer)
TypeSignature::Uniform(
2,
ConcreteDataType::numerics()
.into_iter()
.map(|dt| dt.as_arrow_type())
.collect(),
),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::UInt64Array;
use arrow::datatypes::{DataType, Field};
use datafusion_expr::ColumnarValue;
use super::*;
use crate::function::FunctionContext;
use crate::function_factory::ScalarFunctionFactory;
#[test]
fn test_add_region_follower_misc() {
let factory: ScalarFunctionFactory = AddRegionFollowerFunction::factory().into();
let f = factory.provide(FunctionContext::mock());
assert_eq!("add_region_follower", f.name());
assert_eq!(DataType::UInt64, f.return_type(&[]).unwrap());
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::OneOf(sigs),
volatility: datafusion_expr::Volatility::Immutable
} if sigs.len() == 1));
}
#[tokio::test]
async fn test_add_region_follower() {
let factory: ScalarFunctionFactory = AddRegionFollowerFunction::factory().into();
let provider = factory.provide(FunctionContext::mock());
let f = provider.as_async().unwrap();
let func_args = datafusion::logical_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))),
ColumnarValue::Array(Arc::new(UInt64Array::from(vec![2]))),
],
arg_fields: vec![
Arc::new(Field::new("arg_0", DataType::UInt64, false)),
Arc::new(Field::new("arg_1", DataType::UInt64, false)),
],
return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
number_rows: 1,
config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
};
let result = f.invoke_async_with_args(func_args).await.unwrap();
match result {
ColumnarValue::Array(array) => {
let result_array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
assert_eq!(result_array.value(0), 0u64);
}
ColumnarValue::Scalar(scalar) => {
assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(0)));
}
}
}
}

View File

@@ -1,155 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_meta::rpc::procedure::RemoveRegionFollowerRequest;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use datafusion_expr::{Signature, TypeSignature, Volatility};
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
/// A function to remove a follower from a region.
//// Only available in cluster mode.
///
/// - `remove_region_follower(region_id, peer_id)`.
///
/// The parameters:
/// - `region_id`: the region id
/// - `peer_id`: the peer id
#[admin_fn(
name = RemoveRegionFollowerFunction,
display_name = remove_region_follower,
sig_fn = signature,
ret = uint64
)]
pub(crate) async fn remove_region_follower(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let Some(peer_id) = cast_u64(&params[1])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
procedure_service_handler
.remove_region_follower(RemoveRegionFollowerRequest { region_id, peer_id })
.await?;
Ok(Value::from(0u64))
}
fn signature() -> Signature {
Signature::one_of(
vec![
// remove_region_follower(region_id, peer_id)
TypeSignature::Uniform(
2,
ConcreteDataType::numerics()
.into_iter()
.map(|dt| dt.as_arrow_type())
.collect(),
),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::array::UInt64Array;
use arrow::datatypes::{DataType, Field};
use datafusion_expr::ColumnarValue;
use super::*;
use crate::function::FunctionContext;
use crate::function_factory::ScalarFunctionFactory;
#[test]
fn test_remove_region_follower_misc() {
let factory: ScalarFunctionFactory = RemoveRegionFollowerFunction::factory().into();
let f = factory.provide(FunctionContext::mock());
assert_eq!("remove_region_follower", f.name());
assert_eq!(DataType::UInt64, f.return_type(&[]).unwrap());
assert!(matches!(f.signature(),
datafusion_expr::Signature {
type_signature: datafusion_expr::TypeSignature::OneOf(sigs),
volatility: datafusion_expr::Volatility::Immutable
} if sigs.len() == 1));
}
#[tokio::test]
async fn test_remove_region_follower() {
let factory: ScalarFunctionFactory = RemoveRegionFollowerFunction::factory().into();
let provider = factory.provide(FunctionContext::mock());
let f = provider.as_async().unwrap();
let func_args = datafusion::logical_expr::ScalarFunctionArgs {
args: vec![
ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))),
ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))),
],
arg_fields: vec![
Arc::new(Field::new("arg_0", DataType::UInt64, false)),
Arc::new(Field::new("arg_1", DataType::UInt64, false)),
],
return_field: Arc::new(Field::new("result", DataType::UInt64, true)),
number_rows: 1,
config_options: Arc::new(datafusion_common::config::ConfigOptions::default()),
};
let result = f.invoke_async_with_args(func_args).await.unwrap();
match result {
ColumnarValue::Array(array) => {
let result_array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
assert_eq!(result_array.value(0), 0u64);
}
ColumnarValue::Scalar(scalar) => {
assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(0)));
}
}
}
}

View File

@@ -19,8 +19,7 @@ use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
};
use common_query::Output;
use common_query::error::Result;
@@ -72,11 +71,8 @@ pub trait ProcedureServiceHandler: Send + Sync {
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
/// Add a region follower to a region.
async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
/// Remove a region follower from a region.
async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
/// Manage a region follower to a region.
async fn manage_region_follower(&self, request: ManageRegionFollowerRequest) -> Result<()>;
/// Get the catalog manager
fn catalog_manager(&self) -> &CatalogManagerRef;

View File

@@ -37,8 +37,7 @@ impl FunctionState {
use catalog::CatalogManagerRef;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
};
use common_query::Output;
use common_query::error::Result;
@@ -75,13 +74,9 @@ impl FunctionState {
})
}
async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> {
Ok(())
}
async fn remove_region_follower(
async fn manage_region_follower(
&self,
_request: RemoveRegionFollowerRequest,
_request: ManageRegionFollowerRequest,
) -> Result<()> {
Ok(())
}

View File

@@ -25,8 +25,8 @@ use crate::error::{
};
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{
self, AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
ProcedureStateResponse, RemoveRegionFollowerRequest,
self, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse,
ProcedureStateResponse,
};
/// The context of procedure executor.
@@ -45,26 +45,14 @@ pub trait ProcedureExecutor: Send + Sync {
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
/// Add a region follower
async fn add_region_follower(
/// Submit ad manage region follower task
async fn manage_region_follower(
&self,
_ctx: &ExecutorContext,
_request: AddRegionFollowerRequest,
_request: ManageRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "add_region_follower",
}
.fail()
}
/// Remove a region follower
async fn remove_region_follower(
&self,
_ctx: &ExecutorContext,
_request: RemoveRegionFollowerRequest,
) -> Result<()> {
UnsupportedSnafu {
operation: "remove_region_follower",
operation: "manage_region_follower",
}
.fail()
}

View File

@@ -23,6 +23,7 @@ use api::v1::meta::{
use common_error::ext::ErrorExt;
use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState};
use snafu::ResultExt;
use table::metadata::TableId;
use crate::error::{ParseProcedureIdSnafu, Result};
@@ -44,6 +45,30 @@ pub struct AddRegionFollowerRequest {
pub peer_id: u64,
}
#[derive(Debug, Clone)]
pub struct AddTableFollowerRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
}
#[derive(Debug, Clone)]
pub struct RemoveTableFollowerRequest {
pub catalog_name: String,
pub schema_name: String,
pub table_name: String,
pub table_id: TableId,
}
#[derive(Debug, Clone)]
pub enum ManageRegionFollowerRequest {
AddRegionFollower(AddRegionFollowerRequest),
RemoveRegionFollower(RemoveRegionFollowerRequest),
AddTableFollower(AddTableFollowerRequest),
RemoveTableFollower(RemoveTableFollowerRequest),
}
/// A request to remove region follower.
#[derive(Debug, Clone)]
pub struct RemoveRegionFollowerRequest {

View File

@@ -44,8 +44,9 @@ use common_meta::range_stream::PaginationStream;
use common_meta::rpc::KeyValue;
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
RemoveRegionFollowerRequest,
AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest,
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
RemoveRegionFollowerRequest, RemoveTableFollowerRequest,
};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
@@ -246,6 +247,10 @@ pub trait RegionFollowerClient: Sync + Send + Debug {
async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>;
async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>;
async fn start(&self, urls: &[&str]) -> Result<()>;
async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>;
@@ -286,39 +291,41 @@ impl ProcedureExecutor for MetaClient {
.context(meta_error::ExternalSnafu)
}
async fn add_region_follower(
async fn manage_region_follower(
&self,
_ctx: &ExecutorContext,
request: AddRegionFollowerRequest,
request: ManageRegionFollowerRequest,
) -> MetaResult<()> {
if let Some(region_follower) = &self.region_follower {
region_follower
.add_region_follower(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
} else {
UnsupportedSnafu {
operation: "add_region_follower",
match request {
ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => {
region_follower
.add_region_follower(add_region_follower_request)
.await
}
ManageRegionFollowerRequest::RemoveRegionFollower(
remove_region_follower_request,
) => {
region_follower
.remove_region_follower(remove_region_follower_request)
.await
}
ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => {
region_follower
.add_table_follower(add_table_follower_request)
.await
}
ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => {
region_follower
.remove_table_follower(remove_table_follower_request)
.await
}
}
.fail()
}
}
async fn remove_region_follower(
&self,
_ctx: &ExecutorContext,
request: RemoveRegionFollowerRequest,
) -> MetaResult<()> {
if let Some(region_follower) = &self.region_follower {
region_follower
.remove_region_follower(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
} else {
UnsupportedSnafu {
operation: "remove_region_follower",
operation: "manage_region_follower",
}
.fail()
}

View File

@@ -19,8 +19,7 @@ use common_error::ext::BoxedError;
use common_function::handlers::ProcedureServiceHandler;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
@@ -77,20 +76,12 @@ impl ProcedureServiceHandler for ProcedureServiceOperator {
.context(query_error::ProcedureServiceSnafu)
}
async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> QueryResult<()> {
self.procedure_executor
.add_region_follower(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)
}
async fn remove_region_follower(
async fn manage_region_follower(
&self,
request: RemoveRegionFollowerRequest,
request: ManageRegionFollowerRequest,
) -> QueryResult<()> {
self.procedure_executor
.remove_region_follower(&ExecutorContext::default(), request)
.manage_region_follower(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)