diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index e311d99818..11270c3282 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -12,23 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod add_region_follower; mod flush_compact_region; mod flush_compact_table; mod migrate_region; mod reconcile_catalog; mod reconcile_database; mod reconcile_table; -mod remove_region_follower; -use add_region_follower::AddRegionFollowerFunction; use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; use flush_compact_table::{CompactTableFunction, FlushTableFunction}; use migrate_region::MigrateRegionFunction; use reconcile_catalog::ReconcileCatalogFunction; use reconcile_database::ReconcileDatabaseFunction; use reconcile_table::ReconcileTableFunction; -use remove_region_follower::RemoveRegionFollowerFunction; use crate::flush_flow::FlushFlowFunction; use crate::function_registry::FunctionRegistry; @@ -40,8 +36,6 @@ impl AdminFunction { /// Register all admin functions to [`FunctionRegistry`]. pub fn register(registry: &FunctionRegistry) { registry.register(MigrateRegionFunction::factory()); - registry.register(AddRegionFollowerFunction::factory()); - registry.register(RemoveRegionFollowerFunction::factory()); registry.register(FlushRegionFunction::factory()); registry.register(CompactRegionFunction::factory()); registry.register(FlushTableFunction::factory()); diff --git a/src/common/function/src/admin/add_region_follower.rs b/src/common/function/src/admin/add_region_follower.rs deleted file mode 100644 index 976c6848a3..0000000000 --- a/src/common/function/src/admin/add_region_follower.rs +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_macro::admin_fn; -use common_meta::rpc::procedure::AddRegionFollowerRequest; -use common_query::error::{ - InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, - UnsupportedInputDataTypeSnafu, -}; -use datafusion_expr::{Signature, TypeSignature, Volatility}; -use datatypes::data_type::DataType; -use datatypes::prelude::ConcreteDataType; -use datatypes::value::{Value, ValueRef}; -use session::context::QueryContextRef; -use snafu::ensure; - -use crate::handlers::ProcedureServiceHandlerRef; -use crate::helper::cast_u64; - -/// A function to add a follower to a region. -/// Only available in cluster mode. -/// -/// - `add_region_follower(region_id, peer_id)`. -/// -/// The parameters: -/// - `region_id`: the region id -/// - `peer_id`: the peer id -#[admin_fn( - name = AddRegionFollowerFunction, - display_name = add_region_follower, - sig_fn = signature, - ret = uint64 -)] -pub(crate) async fn add_region_follower( - procedure_service_handler: &ProcedureServiceHandlerRef, - _ctx: &QueryContextRef, - params: &[ValueRef<'_>], -) -> Result { - ensure!( - params.len() == 2, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly 2, have: {}", - params.len() - ), - } - ); - - let Some(region_id) = cast_u64(¶ms[0])? else { - return UnsupportedInputDataTypeSnafu { - function: "add_region_follower", - datatypes: params.iter().map(|v| v.data_type()).collect::>(), - } - .fail(); - }; - let Some(peer_id) = cast_u64(¶ms[1])? else { - return UnsupportedInputDataTypeSnafu { - function: "add_region_follower", - datatypes: params.iter().map(|v| v.data_type()).collect::>(), - } - .fail(); - }; - - procedure_service_handler - .add_region_follower(AddRegionFollowerRequest { region_id, peer_id }) - .await?; - - Ok(Value::from(0u64)) -} - -fn signature() -> Signature { - Signature::one_of( - vec![ - // add_region_follower(region_id, peer) - TypeSignature::Uniform( - 2, - ConcreteDataType::numerics() - .into_iter() - .map(|dt| dt.as_arrow_type()) - .collect(), - ), - ], - Volatility::Immutable, - ) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow::array::UInt64Array; - use arrow::datatypes::{DataType, Field}; - use datafusion_expr::ColumnarValue; - - use super::*; - use crate::function::FunctionContext; - use crate::function_factory::ScalarFunctionFactory; - - #[test] - fn test_add_region_follower_misc() { - let factory: ScalarFunctionFactory = AddRegionFollowerFunction::factory().into(); - let f = factory.provide(FunctionContext::mock()); - assert_eq!("add_region_follower", f.name()); - assert_eq!(DataType::UInt64, f.return_type(&[]).unwrap()); - assert!(matches!(f.signature(), - datafusion_expr::Signature { - type_signature: datafusion_expr::TypeSignature::OneOf(sigs), - volatility: datafusion_expr::Volatility::Immutable - } if sigs.len() == 1)); - } - - #[tokio::test] - async fn test_add_region_follower() { - let factory: ScalarFunctionFactory = AddRegionFollowerFunction::factory().into(); - let provider = factory.provide(FunctionContext::mock()); - let f = provider.as_async().unwrap(); - - let func_args = datafusion::logical_expr::ScalarFunctionArgs { - args: vec![ - ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), - ColumnarValue::Array(Arc::new(UInt64Array::from(vec![2]))), - ], - arg_fields: vec![ - Arc::new(Field::new("arg_0", DataType::UInt64, false)), - Arc::new(Field::new("arg_1", DataType::UInt64, false)), - ], - return_field: Arc::new(Field::new("result", DataType::UInt64, true)), - number_rows: 1, - config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), - }; - - let result = f.invoke_async_with_args(func_args).await.unwrap(); - - match result { - ColumnarValue::Array(array) => { - let result_array = array.as_any().downcast_ref::().unwrap(); - assert_eq!(result_array.value(0), 0u64); - } - ColumnarValue::Scalar(scalar) => { - assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(0))); - } - } - } -} diff --git a/src/common/function/src/admin/remove_region_follower.rs b/src/common/function/src/admin/remove_region_follower.rs deleted file mode 100644 index f461970b0c..0000000000 --- a/src/common/function/src/admin/remove_region_follower.rs +++ /dev/null @@ -1,155 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_macro::admin_fn; -use common_meta::rpc::procedure::RemoveRegionFollowerRequest; -use common_query::error::{ - InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, - UnsupportedInputDataTypeSnafu, -}; -use datafusion_expr::{Signature, TypeSignature, Volatility}; -use datatypes::data_type::DataType; -use datatypes::prelude::ConcreteDataType; -use datatypes::value::{Value, ValueRef}; -use session::context::QueryContextRef; -use snafu::ensure; - -use crate::handlers::ProcedureServiceHandlerRef; -use crate::helper::cast_u64; - -/// A function to remove a follower from a region. -//// Only available in cluster mode. -/// -/// - `remove_region_follower(region_id, peer_id)`. -/// -/// The parameters: -/// - `region_id`: the region id -/// - `peer_id`: the peer id -#[admin_fn( - name = RemoveRegionFollowerFunction, - display_name = remove_region_follower, - sig_fn = signature, - ret = uint64 -)] -pub(crate) async fn remove_region_follower( - procedure_service_handler: &ProcedureServiceHandlerRef, - _ctx: &QueryContextRef, - params: &[ValueRef<'_>], -) -> Result { - ensure!( - params.len() == 2, - InvalidFuncArgsSnafu { - err_msg: format!( - "The length of the args is not correct, expect exactly 2, have: {}", - params.len() - ), - } - ); - - let Some(region_id) = cast_u64(¶ms[0])? else { - return UnsupportedInputDataTypeSnafu { - function: "add_region_follower", - datatypes: params.iter().map(|v| v.data_type()).collect::>(), - } - .fail(); - }; - let Some(peer_id) = cast_u64(¶ms[1])? else { - return UnsupportedInputDataTypeSnafu { - function: "add_region_follower", - datatypes: params.iter().map(|v| v.data_type()).collect::>(), - } - .fail(); - }; - - procedure_service_handler - .remove_region_follower(RemoveRegionFollowerRequest { region_id, peer_id }) - .await?; - - Ok(Value::from(0u64)) -} - -fn signature() -> Signature { - Signature::one_of( - vec![ - // remove_region_follower(region_id, peer_id) - TypeSignature::Uniform( - 2, - ConcreteDataType::numerics() - .into_iter() - .map(|dt| dt.as_arrow_type()) - .collect(), - ), - ], - Volatility::Immutable, - ) -} - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow::array::UInt64Array; - use arrow::datatypes::{DataType, Field}; - use datafusion_expr::ColumnarValue; - - use super::*; - use crate::function::FunctionContext; - use crate::function_factory::ScalarFunctionFactory; - - #[test] - fn test_remove_region_follower_misc() { - let factory: ScalarFunctionFactory = RemoveRegionFollowerFunction::factory().into(); - let f = factory.provide(FunctionContext::mock()); - assert_eq!("remove_region_follower", f.name()); - assert_eq!(DataType::UInt64, f.return_type(&[]).unwrap()); - assert!(matches!(f.signature(), - datafusion_expr::Signature { - type_signature: datafusion_expr::TypeSignature::OneOf(sigs), - volatility: datafusion_expr::Volatility::Immutable - } if sigs.len() == 1)); - } - - #[tokio::test] - async fn test_remove_region_follower() { - let factory: ScalarFunctionFactory = RemoveRegionFollowerFunction::factory().into(); - let provider = factory.provide(FunctionContext::mock()); - let f = provider.as_async().unwrap(); - - let func_args = datafusion::logical_expr::ScalarFunctionArgs { - args: vec![ - ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), - ColumnarValue::Array(Arc::new(UInt64Array::from(vec![1]))), - ], - arg_fields: vec![ - Arc::new(Field::new("arg_0", DataType::UInt64, false)), - Arc::new(Field::new("arg_1", DataType::UInt64, false)), - ], - return_field: Arc::new(Field::new("result", DataType::UInt64, true)), - number_rows: 1, - config_options: Arc::new(datafusion_common::config::ConfigOptions::default()), - }; - - let result = f.invoke_async_with_args(func_args).await.unwrap(); - - match result { - ColumnarValue::Array(array) => { - let result_array = array.as_any().downcast_ref::().unwrap(); - assert_eq!(result_array.value(0), 0u64); - } - ColumnarValue::Scalar(scalar) => { - assert_eq!(scalar, datafusion_common::ScalarValue::UInt64(Some(0))); - } - } - } -} diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index c5ef9ad220..e7ab67e312 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -19,8 +19,7 @@ use async_trait::async_trait; use catalog::CatalogManagerRef; use common_base::AffectedRows; use common_meta::rpc::procedure::{ - AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, - RemoveRegionFollowerRequest, + ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, }; use common_query::Output; use common_query::error::Result; @@ -72,11 +71,8 @@ pub trait ProcedureServiceHandler: Send + Sync { /// Query the procedure' state by its id async fn query_procedure_state(&self, pid: &str) -> Result; - /// Add a region follower to a region. - async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> Result<()>; - - /// Remove a region follower from a region. - async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>; + /// Manage a region follower to a region. + async fn manage_region_follower(&self, request: ManageRegionFollowerRequest) -> Result<()>; /// Get the catalog manager fn catalog_manager(&self) -> &CatalogManagerRef; diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index ea331fd57b..f90479b923 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -37,8 +37,7 @@ impl FunctionState { use catalog::CatalogManagerRef; use common_base::AffectedRows; use common_meta::rpc::procedure::{ - AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, - RemoveRegionFollowerRequest, + ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, }; use common_query::Output; use common_query::error::Result; @@ -75,13 +74,9 @@ impl FunctionState { }) } - async fn add_region_follower(&self, _request: AddRegionFollowerRequest) -> Result<()> { - Ok(()) - } - - async fn remove_region_follower( + async fn manage_region_follower( &self, - _request: RemoveRegionFollowerRequest, + _request: ManageRegionFollowerRequest, ) -> Result<()> { Ok(()) } diff --git a/src/common/meta/src/procedure_executor.rs b/src/common/meta/src/procedure_executor.rs index 41567a02ca..f27f7ebf47 100644 --- a/src/common/meta/src/procedure_executor.rs +++ b/src/common/meta/src/procedure_executor.rs @@ -25,8 +25,8 @@ use crate::error::{ }; use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use crate::rpc::procedure::{ - self, AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, - ProcedureStateResponse, RemoveRegionFollowerRequest, + self, ManageRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, + ProcedureStateResponse, }; /// The context of procedure executor. @@ -45,26 +45,14 @@ pub trait ProcedureExecutor: Send + Sync { request: SubmitDdlTaskRequest, ) -> Result; - /// Add a region follower - async fn add_region_follower( + /// Submit ad manage region follower task + async fn manage_region_follower( &self, _ctx: &ExecutorContext, - _request: AddRegionFollowerRequest, + _request: ManageRegionFollowerRequest, ) -> Result<()> { UnsupportedSnafu { - operation: "add_region_follower", - } - .fail() - } - - /// Remove a region follower - async fn remove_region_follower( - &self, - _ctx: &ExecutorContext, - _request: RemoveRegionFollowerRequest, - ) -> Result<()> { - UnsupportedSnafu { - operation: "remove_region_follower", + operation: "manage_region_follower", } .fail() } diff --git a/src/common/meta/src/rpc/procedure.rs b/src/common/meta/src/rpc/procedure.rs index 65eaaca7c7..be41071b54 100644 --- a/src/common/meta/src/rpc/procedure.rs +++ b/src/common/meta/src/rpc/procedure.rs @@ -23,6 +23,7 @@ use api::v1::meta::{ use common_error::ext::ErrorExt; use common_procedure::{ProcedureId, ProcedureInfo, ProcedureState}; use snafu::ResultExt; +use table::metadata::TableId; use crate::error::{ParseProcedureIdSnafu, Result}; @@ -44,6 +45,30 @@ pub struct AddRegionFollowerRequest { pub peer_id: u64, } +#[derive(Debug, Clone)] +pub struct AddTableFollowerRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub table_id: TableId, +} + +#[derive(Debug, Clone)] +pub struct RemoveTableFollowerRequest { + pub catalog_name: String, + pub schema_name: String, + pub table_name: String, + pub table_id: TableId, +} + +#[derive(Debug, Clone)] +pub enum ManageRegionFollowerRequest { + AddRegionFollower(AddRegionFollowerRequest), + RemoveRegionFollower(RemoveRegionFollowerRequest), + AddTableFollower(AddTableFollowerRequest), + RemoveTableFollower(RemoveTableFollowerRequest), +} + /// A request to remove region follower. #[derive(Debug, Clone)] pub struct RemoveRegionFollowerRequest { diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index f19f00aade..5f887fb0ec 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -44,8 +44,9 @@ use common_meta::range_stream::PaginationStream; use common_meta::rpc::KeyValue; use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse}; use common_meta::rpc::procedure::{ - AddRegionFollowerRequest, MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, - RemoveRegionFollowerRequest, + AddRegionFollowerRequest, AddTableFollowerRequest, ManageRegionFollowerRequest, + MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse, + RemoveRegionFollowerRequest, RemoveTableFollowerRequest, }; use common_meta::rpc::store::{ BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest, @@ -246,6 +247,10 @@ pub trait RegionFollowerClient: Sync + Send + Debug { async fn remove_region_follower(&self, request: RemoveRegionFollowerRequest) -> Result<()>; + async fn add_table_follower(&self, request: AddTableFollowerRequest) -> Result<()>; + + async fn remove_table_follower(&self, request: RemoveTableFollowerRequest) -> Result<()>; + async fn start(&self, urls: &[&str]) -> Result<()>; async fn start_with(&self, leader_provider: LeaderProviderRef) -> Result<()>; @@ -286,39 +291,41 @@ impl ProcedureExecutor for MetaClient { .context(meta_error::ExternalSnafu) } - async fn add_region_follower( + async fn manage_region_follower( &self, _ctx: &ExecutorContext, - request: AddRegionFollowerRequest, + request: ManageRegionFollowerRequest, ) -> MetaResult<()> { if let Some(region_follower) = &self.region_follower { - region_follower - .add_region_follower(request) - .await - .map_err(BoxedError::new) - .context(meta_error::ExternalSnafu) - } else { - UnsupportedSnafu { - operation: "add_region_follower", + match request { + ManageRegionFollowerRequest::AddRegionFollower(add_region_follower_request) => { + region_follower + .add_region_follower(add_region_follower_request) + .await + } + ManageRegionFollowerRequest::RemoveRegionFollower( + remove_region_follower_request, + ) => { + region_follower + .remove_region_follower(remove_region_follower_request) + .await + } + ManageRegionFollowerRequest::AddTableFollower(add_table_follower_request) => { + region_follower + .add_table_follower(add_table_follower_request) + .await + } + ManageRegionFollowerRequest::RemoveTableFollower(remove_table_follower_request) => { + region_follower + .remove_table_follower(remove_table_follower_request) + .await + } } - .fail() - } - } - - async fn remove_region_follower( - &self, - _ctx: &ExecutorContext, - request: RemoveRegionFollowerRequest, - ) -> MetaResult<()> { - if let Some(region_follower) = &self.region_follower { - region_follower - .remove_region_follower(request) - .await - .map_err(BoxedError::new) - .context(meta_error::ExternalSnafu) + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) } else { UnsupportedSnafu { - operation: "remove_region_follower", + operation: "manage_region_follower", } .fail() } diff --git a/src/operator/src/procedure.rs b/src/operator/src/procedure.rs index 6212bdf4c3..d6a2fbe03f 100644 --- a/src/operator/src/procedure.rs +++ b/src/operator/src/procedure.rs @@ -19,8 +19,7 @@ use common_error::ext::BoxedError; use common_function::handlers::ProcedureServiceHandler; use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutorRef}; use common_meta::rpc::procedure::{ - AddRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, - RemoveRegionFollowerRequest, + ManageRegionFollowerRequest, MigrateRegionRequest, ProcedureStateResponse, }; use common_query::error as query_error; use common_query::error::Result as QueryResult; @@ -77,20 +76,12 @@ impl ProcedureServiceHandler for ProcedureServiceOperator { .context(query_error::ProcedureServiceSnafu) } - async fn add_region_follower(&self, request: AddRegionFollowerRequest) -> QueryResult<()> { - self.procedure_executor - .add_region_follower(&ExecutorContext::default(), request) - .await - .map_err(BoxedError::new) - .context(query_error::ProcedureServiceSnafu) - } - - async fn remove_region_follower( + async fn manage_region_follower( &self, - request: RemoveRegionFollowerRequest, + request: ManageRegionFollowerRequest, ) -> QueryResult<()> { self.procedure_executor - .remove_region_follower(&ExecutorContext::default(), request) + .manage_region_follower(&ExecutorContext::default(), request) .await .map_err(BoxedError::new) .context(query_error::ProcedureServiceSnafu)