feat: add AddRegionFollower and RemoveRegionFollower admin fn (#5780)

This commit is contained in:
Weny Xu
2025-03-27 14:30:50 +08:00
committed by GitHub
parent 0cd219a5d2
commit 8f2ae4e136
6 changed files with 312 additions and 3 deletions

View File

@@ -12,15 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod add_region_follower;
mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;
mod remove_region_follower;
use std::sync::Arc;
use add_region_follower::AddRegionFollowerFunction;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use remove_region_follower::RemoveRegionFollowerFunction;
use crate::flush_flow::FlushFlowFunction;
use crate::function_registry::FunctionRegistry;
@@ -32,6 +36,8 @@ impl AdminFunction {
/// Register all table functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register_async(Arc::new(MigrateRegionFunction));
registry.register_async(Arc::new(AddRegionFollowerFunction));
registry.register_async(Arc::new(RemoveRegionFollowerFunction));
registry.register_async(Arc::new(FlushRegionFunction));
registry.register_async(Arc::new(CompactRegionFunction));
registry.register_async(Arc::new(FlushTableFunction));

View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_meta::rpc::procedure::AddRegionFollowerRequest;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
/// A function to add a follower to a region.
/// Only available in cluster mode.
///
/// - `add_region_follower(region_id, peer_id)`.
///
/// The parameters:
/// - `region_id`: the region id
/// - `peer_id`: the peer id
#[admin_fn(
name = AddRegionFollowerFunction,
display_name = add_region_follower,
sig_fn = signature,
ret = uint64
)]
pub(crate) async fn add_region_follower(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let Some(peer_id) = cast_u64(&params[1])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
procedure_service_handler
.add_region_follower(AddRegionFollowerRequest { region_id, peer_id })
.await?;
Ok(Value::from(0u64))
}
fn signature() -> Signature {
Signature::one_of(
vec![
// add_region_follower(region_id, peer)
TypeSignature::Uniform(2, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{UInt64Vector, VectorRef};
use super::*;
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_add_region_follower_misc() {
let f = AddRegionFollowerFunction;
assert_eq!("add_region_follower", f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 1));
}
#[tokio::test]
async fn test_add_region_follower() {
let f = AddRegionFollowerFunction;
let args = vec![1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64]));
assert_eq!(result, expect);
}
}

View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_macro::admin_fn;
use common_meta::rpc::procedure::RemoveRegionFollowerRequest;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use session::context::QueryContextRef;
use snafu::ensure;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
/// A function to remove a follower from a region.
//// Only available in cluster mode.
///
/// - `remove_region_follower(region_id, peer_id)`.
///
/// The parameters:
/// - `region_id`: the region id
/// - `peer_id`: the peer id
#[admin_fn(
name = RemoveRegionFollowerFunction,
display_name = remove_region_follower,
sig_fn = signature,
ret = uint64
)]
pub(crate) async fn remove_region_follower(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 2,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 2, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let Some(peer_id) = cast_u64(&params[1])? else {
return UnsupportedInputDataTypeSnafu {
function: "add_region_follower",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
procedure_service_handler
.remove_region_follower(RemoveRegionFollowerRequest { region_id, peer_id })
.await?;
Ok(Value::from(0u64))
}
fn signature() -> Signature {
Signature::one_of(
vec![
// remove_region_follower(region_id, peer_id)
TypeSignature::Uniform(2, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{UInt64Vector, VectorRef};
use super::*;
use crate::function::{AsyncFunction, FunctionContext};
#[test]
fn test_remove_region_follower_misc() {
let f = RemoveRegionFollowerFunction;
assert_eq!("remove_region_follower", f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 1));
}
#[tokio::test]
async fn test_remove_region_follower() {
let f = RemoveRegionFollowerFunction;
let args = vec![1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64]));
assert_eq!(result, expect);
}
}

View File

@@ -16,7 +16,10 @@ use std::sync::Arc;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
@@ -63,6 +66,12 @@ pub trait ProcedureServiceHandler: Send + Sync {
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
/// Add a region follower to a region.
async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>;
/// Remove a region follower from a region.
async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>;
}
/// This flow service handler is only use for flush flow for now.

View File

@@ -35,7 +35,10 @@ impl FunctionState {
use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
@@ -66,6 +69,17 @@ impl FunctionState {
..Default::default()
})
}
async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> {
Ok(())
}
async fn remove_region_follower(
&self,
_request: RemoveRegionFollowerRequest,
) -> Result<()> {
Ok(())
}
}
#[async_trait]

View File

@@ -16,7 +16,10 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::handlers::ProcedureServiceHandler;
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_meta::rpc::procedure::{
AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse,
RemoveRegionFollowerRequest,
};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use snafu::ResultExt;
@@ -53,4 +56,23 @@ impl ProcedureServiceHandler for ProcedureServiceOperator {
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)
}
async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> QueryResult<()> {
self.procedure_executor
.add_region_follower(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)
}
async fn remove_region_follower(
&self,
request: RemoveRegionFollowerRequest,
) -> QueryResult<()> {
self.procedure_executor
.remove_region_follower(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)
}
}