diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index b2f916d876..c06b28e7d5 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -12,15 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod add_region_follower; mod flush_compact_region; mod flush_compact_table; mod migrate_region; +mod remove_region_follower; use std::sync::Arc; +use add_region_follower::AddRegionFollowerFunction; use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; use flush_compact_table::{CompactTableFunction, FlushTableFunction}; use migrate_region::MigrateRegionFunction; +use remove_region_follower::RemoveRegionFollowerFunction; use crate::flush_flow::FlushFlowFunction; use crate::function_registry::FunctionRegistry; @@ -32,6 +36,8 @@ impl AdminFunction { /// Register all table functions to [`FunctionRegistry`]. pub fn register(registry: &FunctionRegistry) { registry.register_async(Arc::new(MigrateRegionFunction)); + registry.register_async(Arc::new(AddRegionFollowerFunction)); + registry.register_async(Arc::new(RemoveRegionFollowerFunction)); registry.register_async(Arc::new(FlushRegionFunction)); registry.register_async(Arc::new(CompactRegionFunction)); registry.register_async(Arc::new(FlushTableFunction)); diff --git a/src/common/function/src/admin/add_region_follower.rs b/src/common/function/src/admin/add_region_follower.rs new file mode 100644 index 0000000000..757c715ddd --- /dev/null +++ b/src/common/function/src/admin/add_region_follower.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_macro::admin_fn; +use common_meta::rpc::procedure::AddRegionFollowerRequest; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, + UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::{Signature, TypeSignature, Volatility}; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::{Value, ValueRef}; +use session::context::QueryContextRef; +use snafu::ensure; + +use crate::handlers::ProcedureServiceHandlerRef; +use crate::helper::cast_u64; + +/// A function to add a follower to a region. +/// Only available in cluster mode. +/// +/// - `add_region_follower(region_id, peer_id)`. +/// +/// The parameters: +/// - `region_id`: the region id +/// - `peer_id`: the peer id +#[admin_fn( + name = AddRegionFollowerFunction, + display_name = add_region_follower, + sig_fn = signature, + ret = uint64 +)] +pub(crate) async fn add_region_follower( + procedure_service_handler: &ProcedureServiceHandlerRef, + _ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + ensure!( + params.len() == 2, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly 2, have: {}", + params.len() + ), + } + ); + + let Some(region_id) = cast_u64(¶ms[0])? else { + return UnsupportedInputDataTypeSnafu { + function: "add_region_follower", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + let Some(peer_id) = cast_u64(¶ms[1])? else { + return UnsupportedInputDataTypeSnafu { + function: "add_region_follower", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + + procedure_service_handler + .add_region_follower(AddRegionFollowerRequest { region_id, peer_id }) + .await?; + + Ok(Value::from(0u64)) +} + +fn signature() -> Signature { + Signature::one_of( + vec![ + // add_region_follower(region_id, peer) + TypeSignature::Uniform(2, ConcreteDataType::numerics()), + ], + Volatility::Immutable, + ) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use datatypes::vectors::{UInt64Vector, VectorRef}; + + use super::*; + use crate::function::{AsyncFunction, FunctionContext}; + + #[test] + fn test_add_region_follower_misc() { + let f = AddRegionFollowerFunction; + assert_eq!("add_region_follower", f.name()); + assert_eq!( + ConcreteDataType::uint64_datatype(), + f.return_type(&[]).unwrap() + ); + assert!(matches!(f.signature(), + Signature { + type_signature: TypeSignature::OneOf(sigs), + volatility: Volatility::Immutable + } if sigs.len() == 1)); + } + + #[tokio::test] + async fn test_add_region_follower() { + let f = AddRegionFollowerFunction; + let args = vec![1, 1]; + let args = args + .into_iter() + .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64])); + assert_eq!(result, expect); + } +} diff --git a/src/common/function/src/admin/remove_region_follower.rs b/src/common/function/src/admin/remove_region_follower.rs new file mode 100644 index 0000000000..f63beec738 --- /dev/null +++ b/src/common/function/src/admin/remove_region_follower.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_macro::admin_fn; +use common_meta::rpc::procedure::RemoveRegionFollowerRequest; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, + UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::{Signature, TypeSignature, Volatility}; +use datatypes::prelude::ConcreteDataType; +use datatypes::value::{Value, ValueRef}; +use session::context::QueryContextRef; +use snafu::ensure; + +use crate::handlers::ProcedureServiceHandlerRef; +use crate::helper::cast_u64; + +/// A function to remove a follower from a region. +//// Only available in cluster mode. +/// +/// - `remove_region_follower(region_id, peer_id)`. +/// +/// The parameters: +/// - `region_id`: the region id +/// - `peer_id`: the peer id +#[admin_fn( + name = RemoveRegionFollowerFunction, + display_name = remove_region_follower, + sig_fn = signature, + ret = uint64 +)] +pub(crate) async fn remove_region_follower( + procedure_service_handler: &ProcedureServiceHandlerRef, + _ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + ensure!( + params.len() == 2, + InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect exactly 2, have: {}", + params.len() + ), + } + ); + + let Some(region_id) = cast_u64(¶ms[0])? else { + return UnsupportedInputDataTypeSnafu { + function: "add_region_follower", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + let Some(peer_id) = cast_u64(¶ms[1])? else { + return UnsupportedInputDataTypeSnafu { + function: "add_region_follower", + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + + procedure_service_handler + .remove_region_follower(RemoveRegionFollowerRequest { region_id, peer_id }) + .await?; + + Ok(Value::from(0u64)) +} + +fn signature() -> Signature { + Signature::one_of( + vec![ + // remove_region_follower(region_id, peer_id) + TypeSignature::Uniform(2, ConcreteDataType::numerics()), + ], + Volatility::Immutable, + ) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use common_query::prelude::TypeSignature; + use datatypes::vectors::{UInt64Vector, VectorRef}; + + use super::*; + use crate::function::{AsyncFunction, FunctionContext}; + + #[test] + fn test_remove_region_follower_misc() { + let f = RemoveRegionFollowerFunction; + assert_eq!("remove_region_follower", f.name()); + assert_eq!( + ConcreteDataType::uint64_datatype(), + f.return_type(&[]).unwrap() + ); + assert!(matches!(f.signature(), + Signature { + type_signature: TypeSignature::OneOf(sigs), + volatility: Volatility::Immutable + } if sigs.len() == 1)); + } + + #[tokio::test] + async fn test_remove_region_follower() { + let f = RemoveRegionFollowerFunction; + let args = vec![1, 1]; + let args = args + .into_iter() + .map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _) + .collect::>(); + + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(UInt64Vector::from_slice([0u64])); + assert_eq!(result, expect); + } +} diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index ddc233ed7d..1d994731d5 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -16,7 +16,10 @@ use std::sync::Arc; use async_trait::async_trait; use common_base::AffectedRows; -use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; +use common_meta::rpc::procedure::{ + AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, + RemoveRegionFollowerRequest, +}; use common_query::error::Result; use common_query::Output; use session::context::QueryContextRef; @@ -63,6 +66,12 @@ pub trait ProcedureServiceHandler: Send + Sync { /// Query the procedure' state by its id async fn query_procedure_state(&self, pid: &str) -> Result; + + /// Add a region follower to a region. + async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>; + + /// Remove a region follower from a region. + async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>; } /// This flow service handler is only use for flush flow for now. diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index 89f130a0bb..a2c63f75ac 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -35,7 +35,10 @@ impl FunctionState { use api::v1::meta::ProcedureStatus; use async_trait::async_trait; use common_base::AffectedRows; - use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; + use common_meta::rpc::procedure::{ + AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, + RemoveRegionFollowerRequest, + }; use common_query::error::Result; use common_query::Output; use session::context::QueryContextRef; @@ -66,6 +69,17 @@ impl FunctionState { ..Default::default() }) } + + async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> { + Ok(()) + } + + async fn remove_region_follower( + &self, + _request: RemoveRegionFollowerRequest, + ) -> Result<()> { + Ok(()) + } } #[async_trait] diff --git a/src/operator/src/procedure.rs b/src/operator/src/procedure.rs index d36e6b6b85..e2c27c024f 100644 --- a/src/operator/src/procedure.rs +++ b/src/operator/src/procedure.rs @@ -16,7 +16,10 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_function::handlers::ProcedureServiceHandler; use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef}; -use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; +use common_meta::rpc::procedure::{ + AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, + RemoveRegionFollowerRequest, +}; use common_query::error as query_error; use common_query::error::Result as QueryResult; use snafu::ResultExt; @@ -53,4 +56,23 @@ impl ProcedureServiceHandler for ProcedureServiceOperator { .map_err(BoxedError::new) .context(query_error::ProcedureServiceSnafu) } + + async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> QueryResult<()> { + self.procedure_executor + .add_region_follower(&ExecutorContext::default(), request) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu) + } + + async fn remove_region_follower( + &self, + request: RemoveRegionFollowerRequest, + ) -> QueryResult<()> { + self.procedure_executor + .remove_region_follower(&ExecutorContext::default(), request) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu) + } }