From 414101fafa7cb43c64252f593183ffff8682d24d Mon Sep 17 00:00:00 2001 From: Weny Xu Date: Mon, 4 Aug 2025 17:12:48 +0800 Subject: [PATCH] feat: introduce reconciliation interface (#6614) * feat: introduce reconcile interface Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * chore: apply suggestions from CR Signed-off-by: WenyXu * chore: upgrade proto Signed-off-by: WenyXu --------- Signed-off-by: WenyXu --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/common/function/src/admin.rs | 9 + .../function/src/admin/reconcile_catalog.rs | 179 +++++++++++++ .../function/src/admin/reconcile_database.rs | 198 ++++++++++++++ .../function/src/admin/reconcile_table.rs | 149 +++++++++++ src/common/function/src/handlers.rs | 4 + src/common/function/src/helper.rs | 68 ++++- src/common/function/src/lib.rs | 1 + src/common/function/src/state.rs | 6 +- src/common/meta/Cargo.toml | 1 + src/common/meta/src/procedure_executor.rs | 20 +- src/common/meta/src/reconciliation.rs | 15 +- src/common/meta/src/reconciliation/manager.rs | 246 ++++++++++++++++++ .../src/reconciliation/reconcile_catalog.rs | 13 +- .../reconcile_catalog/reconcile_databases.rs | 9 +- .../resolve_column_metadata.rs | 21 +- src/meta-client/src/client.rs | 18 +- src/meta-client/src/client/procedure.rs | 28 +- src/meta-srv/src/error.rs | 23 +- src/meta-srv/src/metasrv.rs | 6 + src/meta-srv/src/metasrv/builder.rs | 14 +- src/meta-srv/src/service.rs | 1 + src/meta-srv/src/service/cluster.rs | 41 +-- src/meta-srv/src/service/procedure.rs | 124 ++++++--- src/meta-srv/src/service/utils.rs | 34 +++ src/operator/src/procedure.rs | 12 + 27 files changed, 1138 insertions(+), 107 deletions(-) create mode 100644 src/common/function/src/admin/reconcile_catalog.rs create mode 100644 src/common/function/src/admin/reconcile_database.rs create mode 100644 src/common/function/src/admin/reconcile_table.rs create mode 100644 src/common/meta/src/reconciliation/manager.rs create mode 100644 src/meta-srv/src/service/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 2ad57076ac..bad087953f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2552,6 +2552,7 @@ dependencies = [ "common-procedure-test", "common-query", "common-recordbatch", + "common-runtime", "common-telemetry", "common-test-util", "common-time", @@ -5309,7 +5310,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b515206826fc96ca3310a3f90ad321d614a683bb#b515206826fc96ca3310a3f90ad321d614a683bb" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=91c3d7b97a2850c014aa5ce4ffa4caeb6b918446#91c3d7b97a2850c014aa5ce4ffa4caeb6b918446" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 93bf97d5a5..ade070d082 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,7 +140,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b515206826fc96ca3310a3f90ad321d614a683bb" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "91c3d7b97a2850c014aa5ce4ffa4caeb6b918446" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/function/src/admin.rs b/src/common/function/src/admin.rs index c06b28e7d5..1a02caa088 100644 --- a/src/common/function/src/admin.rs +++ b/src/common/function/src/admin.rs @@ -16,6 +16,9 @@ mod add_region_follower; mod flush_compact_region; mod flush_compact_table; mod migrate_region; +mod reconcile_catalog; +mod reconcile_database; +mod reconcile_table; mod remove_region_follower; use std::sync::Arc; @@ -24,6 +27,9 @@ use add_region_follower::AddRegionFollowerFunction; use flush_compact_region::{CompactRegionFunction, FlushRegionFunction}; use flush_compact_table::{CompactTableFunction, FlushTableFunction}; use migrate_region::MigrateRegionFunction; +use reconcile_catalog::ReconcileCatalogFunction; +use reconcile_database::ReconcileDatabaseFunction; +use reconcile_table::ReconcileTableFunction; use remove_region_follower::RemoveRegionFollowerFunction; use crate::flush_flow::FlushFlowFunction; @@ -43,5 +49,8 @@ impl AdminFunction { registry.register_async(Arc::new(FlushTableFunction)); registry.register_async(Arc::new(CompactTableFunction)); registry.register_async(Arc::new(FlushFlowFunction)); + registry.register_async(Arc::new(ReconcileCatalogFunction)); + registry.register_async(Arc::new(ReconcileDatabaseFunction)); + registry.register_async(Arc::new(ReconcileTableFunction)); } } diff --git a/src/common/function/src/admin/reconcile_catalog.rs b/src/common/function/src/admin/reconcile_catalog.rs new file mode 100644 index 0000000000..fc2fec3273 --- /dev/null +++ b/src/common/function/src/admin/reconcile_catalog.rs @@ -0,0 +1,179 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::reconcile_request::Target; +use api::v1::meta::{ReconcileCatalog, ReconcileRequest}; +use common_macro::admin_fn; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, + UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::{Signature, TypeSignature, Volatility}; +use common_telemetry::info; +use datatypes::prelude::*; +use session::context::QueryContextRef; + +use crate::handlers::ProcedureServiceHandlerRef; +use crate::helper::{ + cast_u32, default_parallelism, default_resolve_strategy, get_string_from_params, + parse_resolve_strategy, +}; + +const FN_NAME: &str = "reconcile_catalog"; + +/// A function to reconcile a catalog. +/// Returns the procedure id if success. +/// +/// - `reconcile_catalog(resolve_strategy)`. +/// - `reconcile_catalog(resolve_strategy, parallelism)`. +/// +/// - `reconcile_catalog()`. +#[admin_fn( + name = ReconcileCatalogFunction, + display_name = reconcile_catalog, + sig_fn = signature, + ret = string +)] +pub(crate) async fn reconcile_catalog( + procedure_service_handler: &ProcedureServiceHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + let (resolve_strategy, parallelism) = match params.len() { + 0 => (default_resolve_strategy(), default_parallelism()), + 1 => ( + parse_resolve_strategy(get_string_from_params(params, 0, FN_NAME)?)?, + default_parallelism(), + ), + 2 => { + let Some(parallelism) = cast_u32(¶ms[1])? else { + return UnsupportedInputDataTypeSnafu { + function: FN_NAME, + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + ( + parse_resolve_strategy(get_string_from_params(params, 0, FN_NAME)?)?, + parallelism, + ) + } + size => { + return InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 0, 1 or 2, have: {}", + size + ), + } + .fail(); + } + }; + info!( + "Reconciling catalog with resolve_strategy: {:?}, parallelism: {}", + resolve_strategy, parallelism + ); + let pid = procedure_service_handler + .reconcile(ReconcileRequest { + target: Some(Target::ReconcileCatalog(ReconcileCatalog { + catalog_name: query_ctx.current_catalog().to_string(), + parallelism, + resolve_strategy: resolve_strategy as i32, + })), + ..Default::default() + }) + .await?; + match pid { + Some(pid) => Ok(Value::from(pid)), + None => Ok(Value::Null), + } +} + +fn signature() -> Signature { + let nums = ConcreteDataType::numerics(); + let mut signs = Vec::with_capacity(2 + nums.len()); + signs.extend([ + // reconcile_catalog() + TypeSignature::NullAry, + // reconcile_catalog(resolve_strategy) + TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]), + ]); + for sign in nums { + // reconcile_catalog(resolve_strategy, parallelism) + signs.push(TypeSignature::Exact(vec![ + ConcreteDataType::string_datatype(), + sign, + ])); + } + Signature::one_of(signs, Volatility::Immutable) +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use common_query::error::Error; + use datatypes::vectors::{StringVector, UInt64Vector, VectorRef}; + + use crate::admin::reconcile_catalog::ReconcileCatalogFunction; + use crate::function::{AsyncFunction, FunctionContext}; + + #[tokio::test] + async fn test_reconcile_catalog() { + common_telemetry::init_default_ut_logging(); + + // reconcile_catalog() + let f = ReconcileCatalogFunction; + let args = vec![]; + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); + assert_eq!(expect, result); + + // reconcile_catalog(resolve_strategy) + let f = ReconcileCatalogFunction; + let args = vec![Arc::new(StringVector::from(vec!["UseMetasrv"])) as _]; + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); + assert_eq!(expect, result); + + // reconcile_catalog(resolve_strategy, parallelism) + let f = ReconcileCatalogFunction; + let args = vec![ + Arc::new(StringVector::from(vec!["UseLatest"])) as _, + Arc::new(UInt64Vector::from_slice([10])) as _, + ]; + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); + assert_eq!(expect, result); + + // unsupported input data type + let f = ReconcileCatalogFunction; + let args = vec![ + Arc::new(StringVector::from(vec!["UseLatest"])) as _, + Arc::new(StringVector::from(vec!["test"])) as _, + ]; + let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); + assert_matches!(err, Error::UnsupportedInputDataType { .. }); + + // invalid function args + let f = ReconcileCatalogFunction; + let args = vec![ + Arc::new(StringVector::from(vec!["UseLatest"])) as _, + Arc::new(UInt64Vector::from_slice([10])) as _, + Arc::new(StringVector::from(vec!["10"])) as _, + ]; + let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); + assert_matches!(err, Error::InvalidFuncArgs { .. }); + } +} diff --git a/src/common/function/src/admin/reconcile_database.rs b/src/common/function/src/admin/reconcile_database.rs new file mode 100644 index 0000000000..622d2bb069 --- /dev/null +++ b/src/common/function/src/admin/reconcile_database.rs @@ -0,0 +1,198 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::reconcile_request::Target; +use api::v1::meta::{ReconcileDatabase, ReconcileRequest}; +use common_macro::admin_fn; +use common_query::error::{ + InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result, + UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::{Signature, TypeSignature, Volatility}; +use common_telemetry::info; +use datatypes::prelude::*; +use session::context::QueryContextRef; + +use crate::handlers::ProcedureServiceHandlerRef; +use crate::helper::{ + cast_u32, default_parallelism, default_resolve_strategy, get_string_from_params, + parse_resolve_strategy, +}; + +const FN_NAME: &str = "reconcile_database"; + +/// A function to reconcile a database. +/// Returns the procedure id if success. +/// +/// - `reconcile_database(database_name)`. +/// - `reconcile_database(database_name, resolve_strategy)`. +/// - `reconcile_database(database_name, resolve_strategy, parallelism)`. +/// +/// The parameters: +/// - `database_name`: the database name +#[admin_fn( + name = ReconcileDatabaseFunction, + display_name = reconcile_database, + sig_fn = signature, + ret = string +)] +pub(crate) async fn reconcile_database( + procedure_service_handler: &ProcedureServiceHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + let (database_name, resolve_strategy, parallelism) = match params.len() { + 1 => ( + get_string_from_params(params, 0, FN_NAME)?, + default_resolve_strategy(), + default_parallelism(), + ), + 2 => ( + get_string_from_params(params, 0, FN_NAME)?, + parse_resolve_strategy(get_string_from_params(params, 1, FN_NAME)?)?, + default_parallelism(), + ), + 3 => { + let Some(parallelism) = cast_u32(¶ms[2])? else { + return UnsupportedInputDataTypeSnafu { + function: FN_NAME, + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + ( + get_string_from_params(params, 0, FN_NAME)?, + parse_resolve_strategy(get_string_from_params(params, 1, FN_NAME)?)?, + parallelism, + ) + } + size => { + return InvalidFuncArgsSnafu { + err_msg: format!( + "The length of the args is not correct, expect 1, 2 or 3, have: {}", + size + ), + } + .fail(); + } + }; + info!( + "Reconciling database: {}, resolve_strategy: {:?}, parallelism: {}", + database_name, resolve_strategy, parallelism + ); + let pid = procedure_service_handler + .reconcile(ReconcileRequest { + target: Some(Target::ReconcileDatabase(ReconcileDatabase { + catalog_name: query_ctx.current_catalog().to_string(), + database_name: database_name.to_string(), + parallelism, + resolve_strategy: resolve_strategy as i32, + })), + ..Default::default() + }) + .await?; + match pid { + Some(pid) => Ok(Value::from(pid)), + None => Ok(Value::Null), + } +} + +fn signature() -> Signature { + let nums = ConcreteDataType::numerics(); + let mut signs = Vec::with_capacity(2 + nums.len()); + signs.extend([ + // reconcile_database(datanode_name) + TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]), + // reconcile_database(database_name, resolve_strategy) + TypeSignature::Exact(vec![ + ConcreteDataType::string_datatype(), + ConcreteDataType::string_datatype(), + ]), + ]); + for sign in nums { + // reconcile_database(database_name, resolve_strategy, parallelism) + signs.push(TypeSignature::Exact(vec![ + ConcreteDataType::string_datatype(), + ConcreteDataType::string_datatype(), + sign, + ])); + } + Signature::one_of(signs, Volatility::Immutable) +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use common_query::error::Error; + use datatypes::vectors::{StringVector, UInt32Vector, VectorRef}; + + use crate::admin::reconcile_database::ReconcileDatabaseFunction; + use crate::function::{AsyncFunction, FunctionContext}; + + #[tokio::test] + async fn test_reconcile_catalog() { + common_telemetry::init_default_ut_logging(); + + // reconcile_database(database_name) + let f = ReconcileDatabaseFunction; + let args = vec![Arc::new(StringVector::from(vec!["test"])) as _]; + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); + assert_eq!(expect, result); + + // reconcile_database(database_name, resolve_strategy) + let f = ReconcileDatabaseFunction; + let args = vec![ + Arc::new(StringVector::from(vec!["test"])) as _, + Arc::new(StringVector::from(vec!["UseLatest"])) as _, + ]; + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); + assert_eq!(expect, result); + + // reconcile_database(database_name, resolve_strategy, parallelism) + let f = ReconcileDatabaseFunction; + let args = vec![ + Arc::new(StringVector::from(vec!["test"])) as _, + Arc::new(StringVector::from(vec!["UseLatest"])) as _, + Arc::new(UInt32Vector::from_slice([10])) as _, + ]; + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); + assert_eq!(expect, result); + + // invalid function args + let f = ReconcileDatabaseFunction; + let args = vec![ + Arc::new(StringVector::from(vec!["UseLatest"])) as _, + Arc::new(UInt32Vector::from_slice([10])) as _, + Arc::new(StringVector::from(vec!["v1"])) as _, + Arc::new(StringVector::from(vec!["v2"])) as _, + ]; + let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); + assert_matches!(err, Error::InvalidFuncArgs { .. }); + + // unsupported input data type + let f = ReconcileDatabaseFunction; + let args = vec![ + Arc::new(StringVector::from(vec!["UseLatest"])) as _, + Arc::new(UInt32Vector::from_slice([10])) as _, + Arc::new(StringVector::from(vec!["v1"])) as _, + ]; + let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); + assert_matches!(err, Error::UnsupportedInputDataType { .. }); + } +} diff --git a/src/common/function/src/admin/reconcile_table.rs b/src/common/function/src/admin/reconcile_table.rs new file mode 100644 index 0000000000..61e54e47bc --- /dev/null +++ b/src/common/function/src/admin/reconcile_table.rs @@ -0,0 +1,149 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::reconcile_request::Target; +use api::v1::meta::{ReconcileRequest, ReconcileTable, ResolveStrategy}; +use common_catalog::format_full_table_name; +use common_error::ext::BoxedError; +use common_macro::admin_fn; +use common_query::error::{ + MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu, UnsupportedInputDataTypeSnafu, +}; +use common_query::prelude::{Signature, TypeSignature, Volatility}; +use common_telemetry::info; +use datatypes::prelude::*; +use session::context::QueryContextRef; +use session::table_name::table_name_to_full_name; +use snafu::ResultExt; + +use crate::handlers::ProcedureServiceHandlerRef; +use crate::helper::parse_resolve_strategy; + +const FN_NAME: &str = "reconcile_table"; + +/// A function to reconcile a table. +/// Returns the procedure id if success. +/// +/// - `reconcile_table(table_name)`. +/// - `reconcile_table(table_name, resolve_strategy)`. +/// +/// The parameters: +/// - `table_name`: the table name +#[admin_fn( + name = ReconcileTableFunction, + display_name = reconcile_table, + sig_fn = signature, + ret = string +)] +pub(crate) async fn reconcile_table( + procedure_service_handler: &ProcedureServiceHandlerRef, + query_ctx: &QueryContextRef, + params: &[ValueRef<'_>], +) -> Result { + let (table_name, resolve_strategy) = match params { + [ValueRef::String(table_name)] => (table_name, ResolveStrategy::UseLatest), + [ValueRef::String(table_name), ValueRef::String(resolve_strategy)] => { + (table_name, parse_resolve_strategy(resolve_strategy)?) + } + _ => { + return UnsupportedInputDataTypeSnafu { + function: FN_NAME, + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail() + } + }; + let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx) + .map_err(BoxedError::new) + .context(TableMutationSnafu)?; + info!( + "Reconciling table: {} with resolve_strategy: {:?}", + format_full_table_name(&catalog_name, &schema_name, &table_name), + resolve_strategy + ); + let pid = procedure_service_handler + .reconcile(ReconcileRequest { + target: Some(Target::ReconcileTable(ReconcileTable { + catalog_name, + schema_name, + table_name, + resolve_strategy: resolve_strategy as i32, + })), + ..Default::default() + }) + .await?; + match pid { + Some(pid) => Ok(Value::from(pid)), + None => Ok(Value::Null), + } +} + +fn signature() -> Signature { + Signature::one_of( + vec![ + // reconcile_table(table_name) + TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]), + // reconcile_table(table_name, resolve_strategy) + TypeSignature::Exact(vec![ + ConcreteDataType::string_datatype(), + ConcreteDataType::string_datatype(), + ]), + ], + Volatility::Immutable, + ) +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use common_query::error::Error; + use datatypes::vectors::{StringVector, VectorRef}; + + use crate::admin::reconcile_table::ReconcileTableFunction; + use crate::function::{AsyncFunction, FunctionContext}; + + #[tokio::test] + async fn test_reconcile_table() { + common_telemetry::init_default_ut_logging(); + + // reconcile_table(table_name) + let f = ReconcileTableFunction; + let args = vec![Arc::new(StringVector::from(vec!["test"])) as _]; + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); + assert_eq!(expect, result); + + // reconcile_table(table_name, resolve_strategy) + let f = ReconcileTableFunction; + let args = vec![ + Arc::new(StringVector::from(vec!["test"])) as _, + Arc::new(StringVector::from(vec!["UseMetasrv"])) as _, + ]; + let result = f.eval(FunctionContext::mock(), &args).await.unwrap(); + let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"])); + assert_eq!(expect, result); + + // unsupported input data type + let f = ReconcileTableFunction; + let args = vec![ + Arc::new(StringVector::from(vec!["test"])) as _, + Arc::new(StringVector::from(vec!["UseMetasrv"])) as _, + Arc::new(StringVector::from(vec!["10"])) as _, + ]; + let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err(); + assert_matches!(err, Error::UnsupportedInputDataType { .. }); + } +} diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index bcb6ce5460..7289de6763 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use api::v1::meta::ReconcileRequest; use async_trait::async_trait; use catalog::CatalogManagerRef; use common_base::AffectedRows; @@ -65,6 +66,9 @@ pub trait ProcedureServiceHandler: Send + Sync { /// Migrate a region from source peer to target peer, returns the procedure id if success. async fn migrate_region(&self, request: MigrateRegionRequest) -> Result>; + /// Reconcile a table, database or catalog, returns the procedure id if success. + async fn reconcile(&self, request: ReconcileRequest) -> Result>; + /// Query the procedure' state by its id async fn query_procedure_state(&self, pid: &str) -> Result; diff --git a/src/common/function/src/helper.rs b/src/common/function/src/helper.rs index e4a1cd1af8..e572c2df1c 100644 --- a/src/common/function/src/helper.rs +++ b/src/common/function/src/helper.rs @@ -12,12 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -use common_query::error::{InvalidInputTypeSnafu, Result}; +use api::v1::meta::ResolveStrategy; +use common_query::error::{ + InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result, UnsupportedInputDataTypeSnafu, +}; use common_query::prelude::{Signature, TypeSignature, Volatility}; use datatypes::prelude::ConcreteDataType; use datatypes::types::cast::cast; use datatypes::value::ValueRef; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; /// Create a function signature with oneof signatures of interleaving two arguments. pub fn one_of_sigs2(args1: Vec, args2: Vec) -> Signature { @@ -43,3 +46,64 @@ pub fn cast_u64(value: &ValueRef) -> Result> { }) .map(|v| v.as_u64()) } + +/// Cast a [`ValueRef`] to u32, returns `None` if fails +pub fn cast_u32(value: &ValueRef) -> Result> { + cast((*value).into(), &ConcreteDataType::uint32_datatype()) + .context(InvalidInputTypeSnafu { + err_msg: format!( + "Failed to cast input into uint32, actual type: {:#?}", + value.data_type(), + ), + }) + .map(|v| v.as_u64().map(|v| v as u32)) +} + +/// Parse a resolve strategy from a string. +pub fn parse_resolve_strategy(strategy: &str) -> Result { + ResolveStrategy::from_str_name(strategy).context(InvalidFuncArgsSnafu { + err_msg: format!("Invalid resolve strategy: {}", strategy), + }) +} + +/// Default parallelism for reconcile operations. +pub fn default_parallelism() -> u32 { + 64 +} + +/// Default resolve strategy for reconcile operations. +pub fn default_resolve_strategy() -> ResolveStrategy { + ResolveStrategy::UseLatest +} + +/// Get the string value from the params. +/// +/// # Errors +/// Returns an error if the input type is not a string. +pub fn get_string_from_params<'a>( + params: &'a [ValueRef<'a>], + index: usize, + fn_name: &'a str, +) -> Result<&'a str> { + let ValueRef::String(s) = ¶ms[index] else { + return UnsupportedInputDataTypeSnafu { + function: fn_name, + datatypes: params.iter().map(|v| v.data_type()).collect::>(), + } + .fail(); + }; + Ok(s) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_resolve_strategy() { + assert_eq!( + parse_resolve_strategy("UseLatest").unwrap(), + ResolveStrategy::UseLatest + ); + } +} diff --git a/src/common/function/src/lib.rs b/src/common/function/src/lib.rs index 95b8b6a3b1..429d99892d 100644 --- a/src/common/function/src/lib.rs +++ b/src/common/function/src/lib.rs @@ -14,6 +14,7 @@ #![feature(let_chains)] #![feature(try_blocks)] +#![feature(assert_matches)] mod admin; mod flush_flow; diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index 211f7e1438..510bb613a6 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -32,7 +32,7 @@ impl FunctionState { pub fn mock() -> Self { use std::sync::Arc; - use api::v1::meta::ProcedureStatus; + use api::v1::meta::{ProcedureStatus, ReconcileRequest}; use async_trait::async_trait; use catalog::CatalogManagerRef; use common_base::AffectedRows; @@ -63,6 +63,10 @@ impl FunctionState { Ok(Some("test_pid".to_string())) } + async fn reconcile(&self, _request: ReconcileRequest) -> Result> { + Ok(Some("test_pid".to_string())) + } + async fn query_procedure_state(&self, _pid: &str) -> Result { Ok(ProcedureStateResponse { status: ProcedureStatus::Done.into(), diff --git a/src/common/meta/Cargo.toml b/src/common/meta/Cargo.toml index 788029e570..216ca4f4d6 100644 --- a/src/common/meta/Cargo.toml +++ b/src/common/meta/Cargo.toml @@ -41,6 +41,7 @@ common-procedure.workspace = true common-procedure-test.workspace = true common-query.workspace = true common-recordbatch.workspace = true +common-runtime.workspace = true common-telemetry.workspace = true common-time.workspace = true common-wal.workspace = true diff --git a/src/common/meta/src/procedure_executor.rs b/src/common/meta/src/procedure_executor.rs index 992f0ae8da..41567a02ca 100644 --- a/src/common/meta/src/procedure_executor.rs +++ b/src/common/meta/src/procedure_executor.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::v1::meta::ProcedureDetailResponse; +use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse}; use common_procedure::{ProcedureId, ProcedureManagerRef}; use common_telemetry::tracing_context::W3cTrace; use snafu::{OptionExt, ResultExt}; @@ -76,6 +76,13 @@ pub trait ProcedureExecutor: Send + Sync { request: MigrateRegionRequest, ) -> Result; + /// Submit a reconcile task. + async fn reconcile( + &self, + _ctx: &ExecutorContext, + request: ReconcileRequest, + ) -> Result; + /// Query the procedure state by its id async fn query_procedure_state( &self, @@ -124,6 +131,17 @@ impl ProcedureExecutor for LocalProcedureExecutor { .fail() } + async fn reconcile( + &self, + _ctx: &ExecutorContext, + _request: ReconcileRequest, + ) -> Result { + UnsupportedSnafu { + operation: "reconcile", + } + .fail() + } + async fn query_procedure_state( &self, _ctx: &ExecutorContext, diff --git a/src/common/meta/src/reconciliation.rs b/src/common/meta/src/reconciliation.rs index 568e477a80..3f851d0163 100644 --- a/src/common/meta/src/reconciliation.rs +++ b/src/common/meta/src/reconciliation.rs @@ -12,16 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -// TODO(weny): Remove it -#[allow(dead_code)] -pub(crate) mod reconcile_database; -// TODO(weny): Remove it -#[allow(dead_code)] -pub(crate) mod reconcile_table; -// TODO(weny): Remove it -#[allow(dead_code)] -pub(crate) mod reconcile_logical_tables; -// TODO(weny): Remove it -#[allow(dead_code)] +pub mod manager; pub(crate) mod reconcile_catalog; +pub(crate) mod reconcile_database; +pub(crate) mod reconcile_logical_tables; +pub(crate) mod reconcile_table; pub(crate) mod utils; diff --git a/src/common/meta/src/reconciliation/manager.rs b/src/common/meta/src/reconciliation/manager.rs new file mode 100644 index 0000000000..29e15b4692 --- /dev/null +++ b/src/common/meta/src/reconciliation/manager.rs @@ -0,0 +1,246 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_procedure::{ + watcher, BoxedProcedure, ProcedureId, ProcedureManagerRef, ProcedureWithId, +}; +use common_telemetry::{error, info, warn}; +use snafu::{OptionExt, ResultExt}; +use store_api::storage::TableId; +use table::table_name::TableName; +use table::table_reference::TableReference; + +use crate::cache_invalidator::CacheInvalidatorRef; +use crate::error::{self, Result, TableNotFoundSnafu}; +use crate::key::table_name::TableNameKey; +use crate::key::TableMetadataManagerRef; +use crate::node_manager::NodeManagerRef; +use crate::reconciliation::reconcile_catalog::ReconcileCatalogProcedure; +use crate::reconciliation::reconcile_database::{ReconcileDatabaseProcedure, DEFAULT_PARALLELISM}; +use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure; +use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy; +use crate::reconciliation::reconcile_table::ReconcileTableProcedure; +use crate::reconciliation::utils::Context; + +pub type ReconciliationManagerRef = Arc; + +/// The manager for reconciliation procedures. +pub struct ReconciliationManager { + procedure_manager: ProcedureManagerRef, + context: Context, +} + +macro_rules! register_reconcile_loader { + ($self:ident, $procedure:ty) => {{ + let context = $self.context.clone(); + $self + .procedure_manager + .register_loader( + <$procedure>::TYPE_NAME, + Box::new(move |json| { + let context = context.clone(); + let procedure = <$procedure>::from_json(context, json)?; + Ok(Box::new(procedure)) + }), + ) + .context(error::RegisterProcedureLoaderSnafu { + type_name: <$procedure>::TYPE_NAME, + })?; + }}; +} + +impl ReconciliationManager { + pub fn new( + node_manager: NodeManagerRef, + table_metadata_manager: TableMetadataManagerRef, + cache_invalidator: CacheInvalidatorRef, + procedure_manager: ProcedureManagerRef, + ) -> Self { + Self { + procedure_manager, + context: Context { + node_manager, + table_metadata_manager, + cache_invalidator, + }, + } + } + + /// Try to start the reconciliation manager. + /// + /// This function will register the procedure loaders for the reconciliation procedures. + /// Returns an error if the procedure loaders are already registered. + pub fn try_start(&self) -> Result<()> { + register_reconcile_loader!(self, ReconcileLogicalTablesProcedure); + register_reconcile_loader!(self, ReconcileTableProcedure); + register_reconcile_loader!(self, ReconcileDatabaseProcedure); + register_reconcile_loader!(self, ReconcileCatalogProcedure); + + Ok(()) + } + + /// Reconcile a table. + /// + /// Returns the procedure id of the reconciliation procedure. + pub async fn reconcile_table( + &self, + table_ref: TableReference<'_>, + resolve_strategy: ResolveStrategy, + ) -> Result { + let table_name_key = + TableNameKey::new(table_ref.catalog, table_ref.schema, table_ref.table); + let table_metadata_manager = &self.context.table_metadata_manager; + let table_id = table_metadata_manager + .table_name_manager() + .get(table_name_key) + .await? + .with_context(|| TableNotFoundSnafu { + table_name: table_ref.to_string(), + })? + .table_id(); + let (physical_table_id, _) = table_metadata_manager + .table_route_manager() + .get_physical_table_route(table_id) + .await?; + + if physical_table_id == table_id { + Ok(self.reconcile_physical_table(table_id, table_ref.into(), resolve_strategy)) + } else { + let physical_table_info = table_metadata_manager + .table_info_manager() + .get(physical_table_id) + .await? + .with_context(|| TableNotFoundSnafu { + table_name: format!("table_id: {}", physical_table_id), + })?; + + Ok(self.reconcile_logical_tables( + physical_table_id, + physical_table_info.table_name(), + vec![(table_id, table_ref.into())], + )) + } + } + + /// Reconcile a database. + /// + /// Returns the procedure id of the reconciliation procedure. + pub fn reconcile_database( + &self, + catalog: String, + schema: String, + resolve_strategy: ResolveStrategy, + parallelism: usize, + ) -> ProcedureId { + let parallelism = normalize_parallelism(parallelism); + let procedure = ReconcileDatabaseProcedure::new( + self.context.clone(), + catalog, + schema, + false, + parallelism, + resolve_strategy, + false, + ); + self.spawn_procedure(Box::new(procedure)) + } + + fn reconcile_physical_table( + &self, + table_id: TableId, + table_name: TableName, + resolve_strategy: ResolveStrategy, + ) -> ProcedureId { + let procedure = ReconcileTableProcedure::new( + self.context.clone(), + table_id, + table_name, + resolve_strategy, + false, + ); + self.spawn_procedure(Box::new(procedure)) + } + + fn reconcile_logical_tables( + &self, + physical_table_id: TableId, + physical_table_name: TableName, + logical_tables: Vec<(TableId, TableName)>, + ) -> ProcedureId { + let procedure = ReconcileLogicalTablesProcedure::new( + self.context.clone(), + physical_table_id, + physical_table_name, + logical_tables, + false, + ); + self.spawn_procedure(Box::new(procedure)) + } + + /// Reconcile a catalog. + /// + /// Returns the procedure id of the reconciliation procedure. + pub fn reconcile_catalog( + &self, + catalog: String, + resolve_strategy: ResolveStrategy, + parallelism: usize, + ) -> ProcedureId { + let parallelism = normalize_parallelism(parallelism); + let procedure = ReconcileCatalogProcedure::new( + self.context.clone(), + catalog, + false, + resolve_strategy, + parallelism, + ); + self.spawn_procedure(Box::new(procedure)) + } + + fn spawn_procedure(&self, procedure: BoxedProcedure) -> ProcedureId { + let procedure_manager = self.procedure_manager.clone(); + let procedure_with_id = ProcedureWithId::with_random_id(procedure); + let procedure_id = procedure_with_id.id; + common_runtime::spawn_global(async move { + let watcher = &mut match procedure_manager.submit(procedure_with_id).await { + Ok(watcher) => watcher, + Err(e) => { + error!(e; "Failed to submit reconciliation procedure {procedure_id}"); + return; + } + }; + if let Err(e) = watcher::wait(watcher).await { + error!(e; "Failed to wait reconciliation procedure {procedure_id}"); + return; + } + + info!("Reconciliation procedure {procedure_id} is finished successfully!"); + }); + procedure_id + } +} + +fn normalize_parallelism(parallelism: usize) -> usize { + if parallelism == 0 { + warn!( + "Parallelism is 0, using default parallelism: {}", + DEFAULT_PARALLELISM + ); + DEFAULT_PARALLELISM + } else { + parallelism + } +} diff --git a/src/common/meta/src/reconciliation/reconcile_catalog.rs b/src/common/meta/src/reconciliation/reconcile_catalog.rs index 758c44c6a7..8a9fa8031a 100644 --- a/src/common/meta/src/reconciliation/reconcile_catalog.rs +++ b/src/common/meta/src/reconciliation/reconcile_catalog.rs @@ -78,14 +78,21 @@ pub(crate) struct PersistentContext { catalog: String, fast_fail: bool, resolve_strategy: ResolveStrategy, + parallelism: usize, } impl PersistentContext { - pub fn new(catalog: String, fast_fail: bool, resolve_strategy: ResolveStrategy) -> Self { + pub fn new( + catalog: String, + fast_fail: bool, + resolve_strategy: ResolveStrategy, + parallelism: usize, + ) -> Self { Self { catalog, fast_fail, resolve_strategy, + parallelism, } } } @@ -111,8 +118,10 @@ impl ReconcileCatalogProcedure { catalog: String, fast_fail: bool, resolve_strategy: ResolveStrategy, + parallelism: usize, ) -> Self { - let persistent_ctx = PersistentContext::new(catalog, fast_fail, resolve_strategy); + let persistent_ctx = + PersistentContext::new(catalog, fast_fail, resolve_strategy, parallelism); let context = ReconcileCatalogContext::new(ctx, persistent_ctx); let state = Box::new(ReconcileCatalogStart); Self { context, state } diff --git a/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs b/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs index cccdbc15de..b51d45e2e6 100644 --- a/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs +++ b/src/common/meta/src/reconciliation/reconcile_catalog/reconcile_databases.rs @@ -15,13 +15,14 @@ use std::any::Any; use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status}; +use common_telemetry::info; use futures::TryStreamExt; use serde::{Deserialize, Serialize}; use crate::error::Result; use crate::reconciliation::reconcile_catalog::end::ReconcileCatalogEnd; use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State}; -use crate::reconciliation::reconcile_database::{ReconcileDatabaseProcedure, DEFAULT_PARALLELISM}; +use crate::reconciliation::reconcile_database::ReconcileDatabaseProcedure; use crate::reconciliation::utils::Context; #[derive(Debug, Serialize, Deserialize)] @@ -75,12 +76,16 @@ impl ReconcileDatabases { table_metadata_manager: ctx.table_metadata_manager.clone(), cache_invalidator: ctx.cache_invalidator.clone(), }; + info!( + "Scheduling reconcile database: {}, catalog: {}", + schema, ctx.persistent_ctx.catalog + ); let procedure = ReconcileDatabaseProcedure::new( context, ctx.persistent_ctx.catalog.clone(), schema, ctx.persistent_ctx.fast_fail, - DEFAULT_PARALLELISM, + ctx.persistent_ctx.parallelism, ctx.persistent_ctx.resolve_strategy, true, ); diff --git a/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs b/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs index ee563cbe8d..b2dfa620f5 100644 --- a/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs +++ b/src/common/meta/src/reconciliation/reconcile_table/resolve_column_metadata.rs @@ -31,18 +31,29 @@ use crate::reconciliation::utils::{ }; /// Strategy for resolving column metadata inconsistencies. -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] -pub(crate) enum ResolveStrategy { - /// Always uses the column metadata from metasrv. - UseMetasrv, - +#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)] +pub enum ResolveStrategy { + #[default] /// Trusts the latest column metadata from datanode. UseLatest, + /// Always uses the column metadata from metasrv. + UseMetasrv, + /// Aborts the resolution process if inconsistencies are detected. AbortOnConflict, } +impl From for ResolveStrategy { + fn from(strategy: api::v1::meta::ResolveStrategy) -> Self { + match strategy { + api::v1::meta::ResolveStrategy::UseMetasrv => Self::UseMetasrv, + api::v1::meta::ResolveStrategy::UseLatest => Self::UseLatest, + api::v1::meta::ResolveStrategy::AbortOnConflict => Self::AbortOnConflict, + } + } +} + /// State responsible for resolving inconsistencies in column metadata across physical regions. #[derive(Debug, Serialize, Deserialize)] pub struct ResolveColumnMetadata { diff --git a/src/meta-client/src/client.rs b/src/meta-client/src/client.rs index 257d5a2d9d..ebf2a6a167 100644 --- a/src/meta-client/src/client.rs +++ b/src/meta-client/src/client.rs @@ -24,7 +24,7 @@ mod util; use std::fmt::Debug; use std::sync::Arc; -use api::v1::meta::{ProcedureDetailResponse, Role}; +use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role}; pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef}; use cluster::Client as ClusterClient; pub use cluster::ClusterKvBackend; @@ -275,6 +275,17 @@ impl ProcedureExecutor for MetaClient { .context(meta_error::ExternalSnafu) } + async fn reconcile( + &self, + _ctx: &ExecutorContext, + request: ReconcileRequest, + ) -> MetaResult { + self.reconcile(request) + .await + .map_err(BoxedError::new) + .context(meta_error::ExternalSnafu) + } + async fn add_region_follower( &self, _ctx: &ExecutorContext, @@ -611,6 +622,11 @@ impl MetaClient { .await } + /// Reconcile the procedure state. + pub async fn reconcile(&self, request: ReconcileRequest) -> Result { + self.procedure_client()?.reconcile(request).await + } + /// Submit a DDL task pub async fn submit_ddl_task( &self, diff --git a/src/meta-client/src/client/procedure.rs b/src/meta-client/src/client/procedure.rs index f5408c0216..f63abe1b42 100644 --- a/src/meta-client/src/client/procedure.rs +++ b/src/meta-client/src/client/procedure.rs @@ -20,7 +20,7 @@ use api::v1::meta::procedure_service_client::ProcedureServiceClient; use api::v1::meta::{ DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse, - QueryProcedureRequest, ResponseHeader, Role, + QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role, }; use common_grpc::channel_manager::ChannelManager; use common_telemetry::tracing_context::TracingContext; @@ -98,6 +98,12 @@ impl Client { .await } + /// Reconcile the procedure state. + pub async fn reconcile(&self, request: ReconcileRequest) -> Result { + let inner = self.inner.read().await; + inner.reconcile(request).await + } + pub async fn list_procedures(&self) -> Result { let inner = self.inner.read().await; inner.list_procedures().await @@ -253,6 +259,26 @@ impl Inner { .await } + async fn reconcile(&self, request: ReconcileRequest) -> Result { + let mut req = request; + req.set_header( + self.id, + self.role, + TracingContext::from_current_span().to_w3c(), + ); + + self.with_retry( + "reconcile", + move |mut client| { + let req = req.clone(); + + async move { client.reconcile(req).await.map(|res| res.into_inner()) } + }, + |resp: &ReconcileResponse| &resp.header, + ) + .await + } + async fn query_procedure_state(&self, pid: &str) -> Result { let mut req = QueryProcedureRequest { pid: Some(ProcedureId { key: pid.into() }), diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index 5c53e505b1..9a136100e3 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -99,6 +99,13 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to init reconciliation manager"))] + InitReconciliationManager { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to create default catalog and schema"))] InitMetadata { #[snafu(implicit)] @@ -134,6 +141,13 @@ pub enum Error { source: common_meta::error::Error, }, + #[snafu(display("Failed to submit reconcile procedure"))] + SubmitReconcileProcedure { + #[snafu(implicit)] + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("Failed to invalidate table cache"))] InvalidateTableCache { #[snafu(implicit)] @@ -1022,7 +1036,8 @@ impl ErrorExt for Error { } Error::DowngradeLeader { source, .. } => source.status_code(), Error::RegisterProcedureLoader { source, .. } => source.status_code(), - Error::SubmitDdlTask { source, .. } => source.status_code(), + Error::SubmitDdlTask { source, .. } + | Error::SubmitReconcileProcedure { source, .. } => source.status_code(), Error::ConvertProtoData { source, .. } | Error::TableMetadataManager { source, .. } | Error::RuntimeSwitchManager { source, .. } @@ -1030,9 +1045,9 @@ impl ErrorExt for Error { | Error::UnexpectedLogicalRouteTable { source, .. } | Error::UpdateTopicNameValue { source, .. } => source.status_code(), - Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => { - source.status_code() - } + Error::InitMetadata { source, .. } + | Error::InitDdlManager { source, .. } + | Error::InitReconciliationManager { source, .. } => source.status_code(), Error::Other { source, .. } => source.status_code(), Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted, diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index 038cb8dd57..88d5af5c36 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -35,6 +35,7 @@ use common_meta::leadership_notifier::{ }; use common_meta::node_expiry_listener::NodeExpiryListener; use common_meta::peer::Peer; +use common_meta::reconciliation::manager::ReconciliationManagerRef; use common_meta::region_keeper::MemoryRegionKeeperRef; use common_meta::region_registry::LeaderRegionRegistryRef; use common_meta::sequence::SequenceRef; @@ -455,6 +456,7 @@ pub struct Metasrv { leader_region_registry: LeaderRegionRegistryRef, wal_prune_ticker: Option, table_id_sequence: SequenceRef, + reconciliation_manager: ReconciliationManagerRef, plugins: Plugins, } @@ -730,6 +732,10 @@ impl Metasrv { &self.table_id_sequence } + pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef { + &self.reconciliation_manager + } + pub fn plugins(&self) -> &Plugins { &self.plugins } diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index 8ae6fc80cd..c014dec6ed 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -34,6 +34,7 @@ use common_meta::key::TableMetadataManager; use common_meta::kv_backend::memory::MemoryKvBackend; use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef}; use common_meta::node_manager::NodeManagerRef; +use common_meta::reconciliation::manager::ReconciliationManager; use common_meta::region_keeper::MemoryRegionKeeper; use common_meta::region_registry::LeaderRegionRegistry; use common_meta::sequence::SequenceBuilder; @@ -360,7 +361,7 @@ impl MetasrvBuilder { let leader_region_registry = Arc::new(LeaderRegionRegistry::default()); let ddl_context = DdlContext { - node_manager, + node_manager: node_manager.clone(), cache_invalidator: cache_invalidator.clone(), memory_region_keeper: memory_region_keeper.clone(), leader_region_registry: leader_region_registry.clone(), @@ -446,6 +447,16 @@ impl MetasrvBuilder { .to_string_lossy() .to_string(); + let reconciliation_manager = Arc::new(ReconciliationManager::new( + node_manager.clone(), + table_metadata_manager.clone(), + cache_invalidator.clone(), + procedure_manager.clone(), + )); + reconciliation_manager + .try_start() + .context(error::InitReconciliationManagerSnafu)?; + Ok(Metasrv { state, started: Arc::new(AtomicBool::new(false)), @@ -482,6 +493,7 @@ impl MetasrvBuilder { leader_region_registry, wal_prune_ticker, table_id_sequence, + reconciliation_manager, }) } } diff --git a/src/meta-srv/src/service.rs b/src/meta-srv/src/service.rs index e260b8b980..c2eab57ce3 100644 --- a/src/meta-srv/src/service.rs +++ b/src/meta-srv/src/service.rs @@ -23,6 +23,7 @@ mod heartbeat; pub mod mailbox; pub mod procedure; pub mod store; +pub(crate) mod utils; pub type GrpcResult = Result, Status>; pub type GrpcStream = Pin> + Send + Sync + 'static>>; diff --git a/src/meta-srv/src/service/cluster.rs b/src/meta-srv/src/service/cluster.rs index a4a26edcac..d80481b462 100644 --- a/src/meta-srv/src/service/cluster.rs +++ b/src/meta-srv/src/service/cluster.rs @@ -14,30 +14,21 @@ use api::v1::meta::{ cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse, - Error, MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse, - RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader, + MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse, RangeRequest as PbRangeRequest, + RangeResponse as PbRangeResponse, }; use common_telemetry::warn; use snafu::ResultExt; -use tonic::{Request, Response}; +use tonic::Request; use crate::metasrv::Metasrv; use crate::service::GrpcResult; -use crate::{error, metasrv}; +use crate::{check_leader, error, metasrv}; #[async_trait::async_trait] impl cluster_server::Cluster for Metasrv { async fn batch_get(&self, req: Request) -> GrpcResult { - if !self.is_leader() { - let is_not_leader = ResponseHeader::failed(Error::is_not_leader()); - let resp = PbBatchGetResponse { - header: Some(is_not_leader), - ..Default::default() - }; - - warn!("The current meta is not leader, but a `batch_get` request have reached the meta. Detail: {:?}.", req); - return Ok(Response::new(resp)); - } + check_leader!(self, req, PbBatchGetResponse, "`batch_get`"); let req = req.into_inner().into(); let resp = self @@ -51,16 +42,7 @@ impl cluster_server::Cluster for Metasrv { } async fn range(&self, req: Request) -> GrpcResult { - if !self.is_leader() { - let is_not_leader = ResponseHeader::failed(Error::is_not_leader()); - let resp = PbRangeResponse { - header: Some(is_not_leader), - ..Default::default() - }; - - warn!("The current meta is not leader, but a `range` request have reached the meta. Detail: {:?}.", req); - return Ok(Response::new(resp)); - } + check_leader!(self, req, PbRangeResponse, "`range`"); let req = req.into_inner().into(); let res = self @@ -77,16 +59,7 @@ impl cluster_server::Cluster for Metasrv { &self, req: Request, ) -> GrpcResult { - if !self.is_leader() { - let is_not_leader = ResponseHeader::failed(Error::is_not_leader()); - let resp = MetasrvPeersResponse { - header: Some(is_not_leader), - ..Default::default() - }; - - warn!("The current meta is not leader, but a `metasrv_peers` request have reached the meta. Detail: {:?}.", req); - return Ok(Response::new(resp)); - } + check_leader!(self, req, MetasrvPeersResponse, "`metasrv_peers`"); let leader_addr = &self.options().grpc.server_addr; let (leader, followers) = match self.election() { diff --git a/src/meta-srv/src/service/procedure.rs b/src/meta-srv/src/service/procedure.rs index 05c1ed7d66..5b318c51b9 100644 --- a/src/meta-srv/src/service/procedure.rs +++ b/src/meta-srv/src/service/procedure.rs @@ -15,25 +15,28 @@ use std::sync::Arc; use std::time::Duration; +use api::v1::meta::reconcile_request::Target; use api::v1::meta::{ procedure_service_server, DdlTaskRequest as PbDdlTaskRequest, - DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse, + DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse, ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest, - ResponseHeader, + ReconcileCatalog, ReconcileDatabase, ReconcileRequest, ReconcileResponse, ReconcileTable, + ResolveStrategy, }; use common_meta::procedure_executor::ExecutorContext; use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest}; use common_meta::rpc::procedure; use common_telemetry::warn; use snafu::{OptionExt, ResultExt}; -use tonic::{Request, Response}; +use table::table_reference::TableReference; +use tonic::Request; -use crate::error; use crate::metasrv::Metasrv; use crate::procedure::region_migration::manager::{ RegionMigrationProcedureTask, RegionMigrationTriggerReason, }; use crate::service::GrpcResult; +use crate::{check_leader, error}; #[async_trait::async_trait] impl procedure_service_server::ProcedureService for Metasrv { @@ -41,15 +44,12 @@ impl procedure_service_server::ProcedureService for Metasrv { &self, request: Request, ) -> GrpcResult { - if !self.is_leader() { - let resp = ProcedureStateResponse { - header: Some(ResponseHeader::failed(Error::is_not_leader())), - ..Default::default() - }; - - warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request); - return Ok(Response::new(resp)); - } + check_leader!( + self, + request, + ProcedureStateResponse, + "`query procedure state`" + ); let QueryProcedureRequest { header, pid, .. } = request.into_inner(); let _header = header.context(error::MissingRequestHeaderSnafu)?; @@ -71,15 +71,7 @@ impl procedure_service_server::ProcedureService for Metasrv { } async fn ddl(&self, request: Request) -> GrpcResult { - if !self.is_leader() { - let resp = PbDdlTaskResponse { - header: Some(ResponseHeader::failed(Error::is_not_leader())), - ..Default::default() - }; - - warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request); - return Ok(Response::new(resp)); - } + check_leader!(self, request, PbDdlTaskResponse, "`ddl`"); let PbDdlTaskRequest { header, @@ -121,15 +113,7 @@ impl procedure_service_server::ProcedureService for Metasrv { &self, request: Request, ) -> GrpcResult { - if !self.is_leader() { - let resp = MigrateRegionResponse { - header: Some(ResponseHeader::failed(Error::is_not_leader())), - ..Default::default() - }; - - warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request); - return Ok(Response::new(resp)); - } + check_leader!(self, request, MigrateRegionResponse, "`migrate`"); let MigrateRegionRequest { header, @@ -169,19 +153,79 @@ impl procedure_service_server::ProcedureService for Metasrv { Ok(Response::new(resp)) } + async fn reconcile(&self, request: Request) -> GrpcResult { + check_leader!(self, request, ReconcileResponse, "`reconcile`"); + + let ReconcileRequest { header, target } = request.into_inner(); + let _header = header.context(error::MissingRequestHeaderSnafu)?; + let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?; + let parse_resolve_strategy = |resolve_strategy: i32| { + ResolveStrategy::try_from(resolve_strategy) + .ok() + .context(error::UnexpectedSnafu { + violated: format!("Invalid resolve strategy: {}", resolve_strategy), + }) + }; + let procedure_id = match target { + Target::ReconcileTable(table) => { + let ReconcileTable { + catalog_name, + schema_name, + table_name, + resolve_strategy, + } = table; + let resolve_strategy = parse_resolve_strategy(resolve_strategy)?; + let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name); + self.reconciliation_manager() + .reconcile_table(table_ref, resolve_strategy.into()) + .await + .context(error::SubmitReconcileProcedureSnafu)? + } + Target::ReconcileDatabase(database) => { + let ReconcileDatabase { + catalog_name, + database_name, + resolve_strategy, + parallelism, + } = database; + let resolve_strategy = parse_resolve_strategy(resolve_strategy)?; + self.reconciliation_manager().reconcile_database( + catalog_name, + database_name, + resolve_strategy.into(), + parallelism as usize, + ) + } + Target::ReconcileCatalog(catalog) => { + let ReconcileCatalog { + catalog_name, + resolve_strategy, + parallelism, + } = catalog; + let resolve_strategy = parse_resolve_strategy(resolve_strategy)?; + self.reconciliation_manager().reconcile_catalog( + catalog_name, + resolve_strategy.into(), + parallelism as usize, + ) + } + }; + Ok(Response::new(ReconcileResponse { + pid: Some(procedure::pid_to_pb_pid(procedure_id)), + ..Default::default() + })) + } + async fn details( &self, request: Request, ) -> GrpcResult { - if !self.is_leader() { - let resp = ProcedureDetailResponse { - header: Some(ResponseHeader::failed(Error::is_not_leader())), - ..Default::default() - }; - - warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request); - return Ok(Response::new(resp)); - } + check_leader!( + self, + request, + ProcedureDetailResponse, + "`procedure details`" + ); let ProcedureDetailRequest { header } = request.into_inner(); let _header = header.context(error::MissingRequestHeaderSnafu)?; diff --git a/src/meta-srv/src/service/utils.rs b/src/meta-srv/src/service/utils.rs new file mode 100644 index 0000000000..309af50822 --- /dev/null +++ b/src/meta-srv/src/service/utils.rs @@ -0,0 +1,34 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#[macro_export] +macro_rules! check_leader { + ($self:expr, $request:expr, $resp_ty:ty, $msg:expr) => { + use common_telemetry::warn; + use api::v1::meta::{ResponseHeader, Error}; + use tonic::Response; + + if !$self.is_leader() { + warn!( + "The current metasrv is not the leader, but a {} request has reached the meta. Detail: {:?}.", + $msg, $request + ); + let mut resp: $resp_ty = Default::default(); + resp.header = Some(ResponseHeader::failed( + Error::is_not_leader(), + )); + return Ok(Response::new(resp)); + } + }; +} diff --git a/src/operator/src/procedure.rs b/src/operator/src/procedure.rs index 00616cb72a..6212bdf4c3 100644 --- a/src/operator/src/procedure.rs +++ b/src/operator/src/procedure.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::meta::ReconcileRequest; use async_trait::async_trait; use catalog::CatalogManagerRef; use common_error::ext::BoxedError; @@ -57,6 +58,17 @@ impl ProcedureServiceHandler for ProcedureServiceOperator { .map(|pid| String::from_utf8_lossy(&pid.key).to_string())) } + async fn reconcile(&self, request: ReconcileRequest) -> QueryResult> { + Ok(self + .procedure_executor + .reconcile(&ExecutorContext::default(), request) + .await + .map_err(BoxedError::new) + .context(query_error::ProcedureServiceSnafu)? + .pid + .map(|pid| String::from_utf8_lossy(&pid.key).to_string())) + } + async fn query_procedure_state(&self, pid: &str) -> QueryResult { self.procedure_executor .query_procedure_state(&ExecutorContext::default(), pid)