feat: introduce reconciliation interface (#6614)

* feat: introduce reconcile interface

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions from CR

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: upgrade proto

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
Weny Xu
2025-08-04 17:12:48 +08:00
committed by Zhenchi
parent d9f177ba9f
commit b31d307eb6
27 changed files with 1138 additions and 107 deletions

3
Cargo.lock generated
View File

@@ -2552,6 +2552,7 @@ dependencies = [
"common-procedure-test",
"common-query",
"common-recordbatch",
"common-runtime",
"common-telemetry",
"common-test-util",
"common-time",
@@ -5309,7 +5310,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b515206826fc96ca3310a3f90ad321d614a683bb#b515206826fc96ca3310a3f90ad321d614a683bb"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=91c3d7b97a2850c014aa5ce4ffa4caeb6b918446#91c3d7b97a2850c014aa5ce4ffa4caeb6b918446"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -140,7 +140,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b515206826fc96ca3310a3f90ad321d614a683bb" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "91c3d7b97a2850c014aa5ce4ffa4caeb6b918446" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -16,6 +16,9 @@ mod add_region_follower;
mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;
mod reconcile_catalog;
mod reconcile_database;
mod reconcile_table;
mod remove_region_follower;
use std::sync::Arc;
@@ -24,6 +27,9 @@ use add_region_follower::AddRegionFollowerFunction;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use reconcile_catalog::ReconcileCatalogFunction;
use reconcile_database::ReconcileDatabaseFunction;
use reconcile_table::ReconcileTableFunction;
use remove_region_follower::RemoveRegionFollowerFunction;
use crate::flush_flow::FlushFlowFunction;
@@ -43,5 +49,8 @@ impl AdminFunction {
registry.register_async(Arc::new(FlushTableFunction));
registry.register_async(Arc::new(CompactTableFunction));
registry.register_async(Arc::new(FlushFlowFunction));
registry.register_async(Arc::new(ReconcileCatalogFunction));
registry.register_async(Arc::new(ReconcileDatabaseFunction));
registry.register_async(Arc::new(ReconcileTableFunction));
}
}

View File

@@ -0,0 +1,179 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::reconcile_request::Target;
use api::v1::meta::{ReconcileCatalog, ReconcileRequest};
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::info;
use datatypes::prelude::*;
use session::context::QueryContextRef;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::{
cast_u32, default_parallelism, default_resolve_strategy, get_string_from_params,
parse_resolve_strategy,
};
const FN_NAME: &str = "reconcile_catalog";
/// A function to reconcile a catalog.
/// Returns the procedure id if success.
///
/// - `reconcile_catalog(resolve_strategy)`.
/// - `reconcile_catalog(resolve_strategy, parallelism)`.
///
/// - `reconcile_catalog()`.
#[admin_fn(
name = ReconcileCatalogFunction,
display_name = reconcile_catalog,
sig_fn = signature,
ret = string
)]
pub(crate) async fn reconcile_catalog(
procedure_service_handler: &ProcedureServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (resolve_strategy, parallelism) = match params.len() {
0 => (default_resolve_strategy(), default_parallelism()),
1 => (
parse_resolve_strategy(get_string_from_params(params, 0, FN_NAME)?)?,
default_parallelism(),
),
2 => {
let Some(parallelism) = cast_u32(&params[1])? else {
return UnsupportedInputDataTypeSnafu {
function: FN_NAME,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
(
parse_resolve_strategy(get_string_from_params(params, 0, FN_NAME)?)?,
parallelism,
)
}
size => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 0, 1 or 2, have: {}",
size
),
}
.fail();
}
};
info!(
"Reconciling catalog with resolve_strategy: {:?}, parallelism: {}",
resolve_strategy, parallelism
);
let pid = procedure_service_handler
.reconcile(ReconcileRequest {
target: Some(Target::ReconcileCatalog(ReconcileCatalog {
catalog_name: query_ctx.current_catalog().to_string(),
parallelism,
resolve_strategy: resolve_strategy as i32,
})),
..Default::default()
})
.await?;
match pid {
Some(pid) => Ok(Value::from(pid)),
None => Ok(Value::Null),
}
}
fn signature() -> Signature {
let nums = ConcreteDataType::numerics();
let mut signs = Vec::with_capacity(2 + nums.len());
signs.extend([
// reconcile_catalog()
TypeSignature::NullAry,
// reconcile_catalog(resolve_strategy)
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
]);
for sign in nums {
// reconcile_catalog(resolve_strategy, parallelism)
signs.push(TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
sign,
]));
}
Signature::one_of(signs, Volatility::Immutable)
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_query::error::Error;
use datatypes::vectors::{StringVector, UInt64Vector, VectorRef};
use crate::admin::reconcile_catalog::ReconcileCatalogFunction;
use crate::function::{AsyncFunction, FunctionContext};
#[tokio::test]
async fn test_reconcile_catalog() {
common_telemetry::init_default_ut_logging();
// reconcile_catalog()
let f = ReconcileCatalogFunction;
let args = vec![];
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
// reconcile_catalog(resolve_strategy)
let f = ReconcileCatalogFunction;
let args = vec![Arc::new(StringVector::from(vec!["UseMetasrv"])) as _];
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
// reconcile_catalog(resolve_strategy, parallelism)
let f = ReconcileCatalogFunction;
let args = vec![
Arc::new(StringVector::from(vec!["UseLatest"])) as _,
Arc::new(UInt64Vector::from_slice([10])) as _,
];
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
// unsupported input data type
let f = ReconcileCatalogFunction;
let args = vec![
Arc::new(StringVector::from(vec!["UseLatest"])) as _,
Arc::new(StringVector::from(vec!["test"])) as _,
];
let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err();
assert_matches!(err, Error::UnsupportedInputDataType { .. });
// invalid function args
let f = ReconcileCatalogFunction;
let args = vec![
Arc::new(StringVector::from(vec!["UseLatest"])) as _,
Arc::new(UInt64Vector::from_slice([10])) as _,
Arc::new(StringVector::from(vec!["10"])) as _,
];
let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err();
assert_matches!(err, Error::InvalidFuncArgs { .. });
}
}

View File

@@ -0,0 +1,198 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::reconcile_request::Target;
use api::v1::meta::{ReconcileDatabase, ReconcileRequest};
use common_macro::admin_fn;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::info;
use datatypes::prelude::*;
use session::context::QueryContextRef;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::{
cast_u32, default_parallelism, default_resolve_strategy, get_string_from_params,
parse_resolve_strategy,
};
const FN_NAME: &str = "reconcile_database";
/// A function to reconcile a database.
/// Returns the procedure id if success.
///
/// - `reconcile_database(database_name)`.
/// - `reconcile_database(database_name, resolve_strategy)`.
/// - `reconcile_database(database_name, resolve_strategy, parallelism)`.
///
/// The parameters:
/// - `database_name`: the database name
#[admin_fn(
name = ReconcileDatabaseFunction,
display_name = reconcile_database,
sig_fn = signature,
ret = string
)]
pub(crate) async fn reconcile_database(
procedure_service_handler: &ProcedureServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (database_name, resolve_strategy, parallelism) = match params.len() {
1 => (
get_string_from_params(params, 0, FN_NAME)?,
default_resolve_strategy(),
default_parallelism(),
),
2 => (
get_string_from_params(params, 0, FN_NAME)?,
parse_resolve_strategy(get_string_from_params(params, 1, FN_NAME)?)?,
default_parallelism(),
),
3 => {
let Some(parallelism) = cast_u32(&params[2])? else {
return UnsupportedInputDataTypeSnafu {
function: FN_NAME,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
(
get_string_from_params(params, 0, FN_NAME)?,
parse_resolve_strategy(get_string_from_params(params, 1, FN_NAME)?)?,
parallelism,
)
}
size => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, 2 or 3, have: {}",
size
),
}
.fail();
}
};
info!(
"Reconciling database: {}, resolve_strategy: {:?}, parallelism: {}",
database_name, resolve_strategy, parallelism
);
let pid = procedure_service_handler
.reconcile(ReconcileRequest {
target: Some(Target::ReconcileDatabase(ReconcileDatabase {
catalog_name: query_ctx.current_catalog().to_string(),
database_name: database_name.to_string(),
parallelism,
resolve_strategy: resolve_strategy as i32,
})),
..Default::default()
})
.await?;
match pid {
Some(pid) => Ok(Value::from(pid)),
None => Ok(Value::Null),
}
}
fn signature() -> Signature {
let nums = ConcreteDataType::numerics();
let mut signs = Vec::with_capacity(2 + nums.len());
signs.extend([
// reconcile_database(datanode_name)
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
// reconcile_database(database_name, resolve_strategy)
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
]),
]);
for sign in nums {
// reconcile_database(database_name, resolve_strategy, parallelism)
signs.push(TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
sign,
]));
}
Signature::one_of(signs, Volatility::Immutable)
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_query::error::Error;
use datatypes::vectors::{StringVector, UInt32Vector, VectorRef};
use crate::admin::reconcile_database::ReconcileDatabaseFunction;
use crate::function::{AsyncFunction, FunctionContext};
#[tokio::test]
async fn test_reconcile_catalog() {
common_telemetry::init_default_ut_logging();
// reconcile_database(database_name)
let f = ReconcileDatabaseFunction;
let args = vec![Arc::new(StringVector::from(vec!["test"])) as _];
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
// reconcile_database(database_name, resolve_strategy)
let f = ReconcileDatabaseFunction;
let args = vec![
Arc::new(StringVector::from(vec!["test"])) as _,
Arc::new(StringVector::from(vec!["UseLatest"])) as _,
];
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
// reconcile_database(database_name, resolve_strategy, parallelism)
let f = ReconcileDatabaseFunction;
let args = vec![
Arc::new(StringVector::from(vec!["test"])) as _,
Arc::new(StringVector::from(vec!["UseLatest"])) as _,
Arc::new(UInt32Vector::from_slice([10])) as _,
];
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
// invalid function args
let f = ReconcileDatabaseFunction;
let args = vec![
Arc::new(StringVector::from(vec!["UseLatest"])) as _,
Arc::new(UInt32Vector::from_slice([10])) as _,
Arc::new(StringVector::from(vec!["v1"])) as _,
Arc::new(StringVector::from(vec!["v2"])) as _,
];
let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err();
assert_matches!(err, Error::InvalidFuncArgs { .. });
// unsupported input data type
let f = ReconcileDatabaseFunction;
let args = vec![
Arc::new(StringVector::from(vec!["UseLatest"])) as _,
Arc::new(UInt32Vector::from_slice([10])) as _,
Arc::new(StringVector::from(vec!["v1"])) as _,
];
let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err();
assert_matches!(err, Error::UnsupportedInputDataType { .. });
}
}

View File

@@ -0,0 +1,149 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::reconcile_request::Target;
use api::v1::meta::{ReconcileRequest, ReconcileTable, ResolveStrategy};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::{
MissingProcedureServiceHandlerSnafu, Result, TableMutationSnafu, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::info;
use datatypes::prelude::*;
use session::context::QueryContextRef;
use session::table_name::table_name_to_full_name;
use snafu::ResultExt;
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::parse_resolve_strategy;
const FN_NAME: &str = "reconcile_table";
/// A function to reconcile a table.
/// Returns the procedure id if success.
///
/// - `reconcile_table(table_name)`.
/// - `reconcile_table(table_name, resolve_strategy)`.
///
/// The parameters:
/// - `table_name`: the table name
#[admin_fn(
name = ReconcileTableFunction,
display_name = reconcile_table,
sig_fn = signature,
ret = string
)]
pub(crate) async fn reconcile_table(
procedure_service_handler: &ProcedureServiceHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (table_name, resolve_strategy) = match params {
[ValueRef::String(table_name)] => (table_name, ResolveStrategy::UseLatest),
[ValueRef::String(table_name), ValueRef::String(resolve_strategy)] => {
(table_name, parse_resolve_strategy(resolve_strategy)?)
}
_ => {
return UnsupportedInputDataTypeSnafu {
function: FN_NAME,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail()
}
};
let (catalog_name, schema_name, table_name) = table_name_to_full_name(table_name, query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;
info!(
"Reconciling table: {} with resolve_strategy: {:?}",
format_full_table_name(&catalog_name, &schema_name, &table_name),
resolve_strategy
);
let pid = procedure_service_handler
.reconcile(ReconcileRequest {
target: Some(Target::ReconcileTable(ReconcileTable {
catalog_name,
schema_name,
table_name,
resolve_strategy: resolve_strategy as i32,
})),
..Default::default()
})
.await?;
match pid {
Some(pid) => Ok(Value::from(pid)),
None => Ok(Value::Null),
}
}
fn signature() -> Signature {
Signature::one_of(
vec![
// reconcile_table(table_name)
TypeSignature::Exact(vec![ConcreteDataType::string_datatype()]),
// reconcile_table(table_name, resolve_strategy)
TypeSignature::Exact(vec![
ConcreteDataType::string_datatype(),
ConcreteDataType::string_datatype(),
]),
],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_query::error::Error;
use datatypes::vectors::{StringVector, VectorRef};
use crate::admin::reconcile_table::ReconcileTableFunction;
use crate::function::{AsyncFunction, FunctionContext};
#[tokio::test]
async fn test_reconcile_table() {
common_telemetry::init_default_ut_logging();
// reconcile_table(table_name)
let f = ReconcileTableFunction;
let args = vec![Arc::new(StringVector::from(vec!["test"])) as _];
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
// reconcile_table(table_name, resolve_strategy)
let f = ReconcileTableFunction;
let args = vec![
Arc::new(StringVector::from(vec!["test"])) as _,
Arc::new(StringVector::from(vec!["UseMetasrv"])) as _,
];
let result = f.eval(FunctionContext::mock(), &args).await.unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
// unsupported input data type
let f = ReconcileTableFunction;
let args = vec![
Arc::new(StringVector::from(vec!["test"])) as _,
Arc::new(StringVector::from(vec!["UseMetasrv"])) as _,
Arc::new(StringVector::from(vec!["10"])) as _,
];
let err = f.eval(FunctionContext::mock(), &args).await.unwrap_err();
assert_matches!(err, Error::UnsupportedInputDataType { .. });
}
}

View File

@@ -14,6 +14,7 @@
use std::sync::Arc;
use api::v1::meta::ReconcileRequest;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::AffectedRows;
@@ -65,6 +66,9 @@ pub trait ProcedureServiceHandler: Send + Sync {
/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(&self, request: MigrateRegionRequest) -> Result<Option<String>>;
/// Reconcile a table, database or catalog, returns the procedure id if success.
async fn reconcile(&self, request: ReconcileRequest) -> Result<Option<String>>;
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;

View File

@@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::error::{InvalidInputTypeSnafu, Result};
use api::v1::meta::ResolveStrategy;
use common_query::error::{
InvalidFuncArgsSnafu, InvalidInputTypeSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -43,3 +46,64 @@ pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
})
.map(|v| v.as_u64())
}
/// Cast a [`ValueRef`] to u32, returns `None` if fails
pub fn cast_u32(value: &ValueRef) -> Result<Option<u32>> {
cast((*value).into(), &ConcreteDataType::uint32_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint32, actual type: {:#?}",
value.data_type(),
),
})
.map(|v| v.as_u64().map(|v| v as u32))
}
/// Parse a resolve strategy from a string.
pub fn parse_resolve_strategy(strategy: &str) -> Result<ResolveStrategy> {
ResolveStrategy::from_str_name(strategy).context(InvalidFuncArgsSnafu {
err_msg: format!("Invalid resolve strategy: {}", strategy),
})
}
/// Default parallelism for reconcile operations.
pub fn default_parallelism() -> u32 {
64
}
/// Default resolve strategy for reconcile operations.
pub fn default_resolve_strategy() -> ResolveStrategy {
ResolveStrategy::UseLatest
}
/// Get the string value from the params.
///
/// # Errors
/// Returns an error if the input type is not a string.
pub fn get_string_from_params<'a>(
params: &'a [ValueRef<'a>],
index: usize,
fn_name: &'a str,
) -> Result<&'a str> {
let ValueRef::String(s) = &params[index] else {
return UnsupportedInputDataTypeSnafu {
function: fn_name,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
Ok(s)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_resolve_strategy() {
assert_eq!(
parse_resolve_strategy("UseLatest").unwrap(),
ResolveStrategy::UseLatest
);
}
}

View File

@@ -14,6 +14,7 @@
#![feature(let_chains)]
#![feature(try_blocks)]
#![feature(assert_matches)]
mod admin;
mod flush_flow;

View File

@@ -32,7 +32,7 @@ impl FunctionState {
pub fn mock() -> Self {
use std::sync::Arc;
use api::v1::meta::ProcedureStatus;
use api::v1::meta::{ProcedureStatus, ReconcileRequest};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::AffectedRows;
@@ -63,6 +63,10 @@ impl FunctionState {
Ok(Some("test_pid".to_string()))
}
async fn reconcile(&self, _request: ReconcileRequest) -> Result<Option<String>> {
Ok(Some("test_pid".to_string()))
}
async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
Ok(ProcedureStateResponse {
status: ProcedureStatus::Done.into(),

View File

@@ -41,6 +41,7 @@ common-procedure.workspace = true
common-procedure-test.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
common-wal.workspace = true

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::v1::meta::ProcedureDetailResponse;
use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse};
use common_procedure::{ProcedureId, ProcedureManagerRef};
use common_telemetry::tracing_context::W3cTrace;
use snafu::{OptionExt, ResultExt};
@@ -76,6 +76,13 @@ pub trait ProcedureExecutor: Send + Sync {
request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse>;
/// Submit a reconcile task.
async fn reconcile(
&self,
_ctx: &ExecutorContext,
request: ReconcileRequest,
) -> Result<ReconcileResponse>;
/// Query the procedure state by its id
async fn query_procedure_state(
&self,
@@ -124,6 +131,17 @@ impl ProcedureExecutor for LocalProcedureExecutor {
.fail()
}
async fn reconcile(
&self,
_ctx: &ExecutorContext,
_request: ReconcileRequest,
) -> Result<ReconcileResponse> {
UnsupportedSnafu {
operation: "reconcile",
}
.fail()
}
async fn query_procedure_state(
&self,
_ctx: &ExecutorContext,

View File

@@ -12,16 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod reconcile_database;
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod reconcile_table;
// TODO(weny): Remove it
#[allow(dead_code)]
pub(crate) mod reconcile_logical_tables;
// TODO(weny): Remove it
#[allow(dead_code)]
pub mod manager;
pub(crate) mod reconcile_catalog;
pub(crate) mod reconcile_database;
pub(crate) mod reconcile_logical_tables;
pub(crate) mod reconcile_table;
pub(crate) mod utils;

View File

@@ -0,0 +1,246 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use common_procedure::{
watcher, BoxedProcedure, ProcedureId, ProcedureManagerRef, ProcedureWithId,
};
use common_telemetry::{error, info, warn};
use snafu::{OptionExt, ResultExt};
use store_api::storage::TableId;
use table::table_name::TableName;
use table::table_reference::TableReference;
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::error::{self, Result, TableNotFoundSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::TableMetadataManagerRef;
use crate::node_manager::NodeManagerRef;
use crate::reconciliation::reconcile_catalog::ReconcileCatalogProcedure;
use crate::reconciliation::reconcile_database::{ReconcileDatabaseProcedure, DEFAULT_PARALLELISM};
use crate::reconciliation::reconcile_logical_tables::ReconcileLogicalTablesProcedure;
use crate::reconciliation::reconcile_table::resolve_column_metadata::ResolveStrategy;
use crate::reconciliation::reconcile_table::ReconcileTableProcedure;
use crate::reconciliation::utils::Context;
pub type ReconciliationManagerRef = Arc<ReconciliationManager>;
/// The manager for reconciliation procedures.
pub struct ReconciliationManager {
procedure_manager: ProcedureManagerRef,
context: Context,
}
macro_rules! register_reconcile_loader {
($self:ident, $procedure:ty) => {{
let context = $self.context.clone();
$self
.procedure_manager
.register_loader(
<$procedure>::TYPE_NAME,
Box::new(move |json| {
let context = context.clone();
let procedure = <$procedure>::from_json(context, json)?;
Ok(Box::new(procedure))
}),
)
.context(error::RegisterProcedureLoaderSnafu {
type_name: <$procedure>::TYPE_NAME,
})?;
}};
}
impl ReconciliationManager {
pub fn new(
node_manager: NodeManagerRef,
table_metadata_manager: TableMetadataManagerRef,
cache_invalidator: CacheInvalidatorRef,
procedure_manager: ProcedureManagerRef,
) -> Self {
Self {
procedure_manager,
context: Context {
node_manager,
table_metadata_manager,
cache_invalidator,
},
}
}
/// Try to start the reconciliation manager.
///
/// This function will register the procedure loaders for the reconciliation procedures.
/// Returns an error if the procedure loaders are already registered.
pub fn try_start(&self) -> Result<()> {
register_reconcile_loader!(self, ReconcileLogicalTablesProcedure);
register_reconcile_loader!(self, ReconcileTableProcedure);
register_reconcile_loader!(self, ReconcileDatabaseProcedure);
register_reconcile_loader!(self, ReconcileCatalogProcedure);
Ok(())
}
/// Reconcile a table.
///
/// Returns the procedure id of the reconciliation procedure.
pub async fn reconcile_table(
&self,
table_ref: TableReference<'_>,
resolve_strategy: ResolveStrategy,
) -> Result<ProcedureId> {
let table_name_key =
TableNameKey::new(table_ref.catalog, table_ref.schema, table_ref.table);
let table_metadata_manager = &self.context.table_metadata_manager;
let table_id = table_metadata_manager
.table_name_manager()
.get(table_name_key)
.await?
.with_context(|| TableNotFoundSnafu {
table_name: table_ref.to_string(),
})?
.table_id();
let (physical_table_id, _) = table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await?;
if physical_table_id == table_id {
Ok(self.reconcile_physical_table(table_id, table_ref.into(), resolve_strategy))
} else {
let physical_table_info = table_metadata_manager
.table_info_manager()
.get(physical_table_id)
.await?
.with_context(|| TableNotFoundSnafu {
table_name: format!("table_id: {}", physical_table_id),
})?;
Ok(self.reconcile_logical_tables(
physical_table_id,
physical_table_info.table_name(),
vec![(table_id, table_ref.into())],
))
}
}
/// Reconcile a database.
///
/// Returns the procedure id of the reconciliation procedure.
pub fn reconcile_database(
&self,
catalog: String,
schema: String,
resolve_strategy: ResolveStrategy,
parallelism: usize,
) -> ProcedureId {
let parallelism = normalize_parallelism(parallelism);
let procedure = ReconcileDatabaseProcedure::new(
self.context.clone(),
catalog,
schema,
false,
parallelism,
resolve_strategy,
false,
);
self.spawn_procedure(Box::new(procedure))
}
fn reconcile_physical_table(
&self,
table_id: TableId,
table_name: TableName,
resolve_strategy: ResolveStrategy,
) -> ProcedureId {
let procedure = ReconcileTableProcedure::new(
self.context.clone(),
table_id,
table_name,
resolve_strategy,
false,
);
self.spawn_procedure(Box::new(procedure))
}
fn reconcile_logical_tables(
&self,
physical_table_id: TableId,
physical_table_name: TableName,
logical_tables: Vec<(TableId, TableName)>,
) -> ProcedureId {
let procedure = ReconcileLogicalTablesProcedure::new(
self.context.clone(),
physical_table_id,
physical_table_name,
logical_tables,
false,
);
self.spawn_procedure(Box::new(procedure))
}
/// Reconcile a catalog.
///
/// Returns the procedure id of the reconciliation procedure.
pub fn reconcile_catalog(
&self,
catalog: String,
resolve_strategy: ResolveStrategy,
parallelism: usize,
) -> ProcedureId {
let parallelism = normalize_parallelism(parallelism);
let procedure = ReconcileCatalogProcedure::new(
self.context.clone(),
catalog,
false,
resolve_strategy,
parallelism,
);
self.spawn_procedure(Box::new(procedure))
}
fn spawn_procedure(&self, procedure: BoxedProcedure) -> ProcedureId {
let procedure_manager = self.procedure_manager.clone();
let procedure_with_id = ProcedureWithId::with_random_id(procedure);
let procedure_id = procedure_with_id.id;
common_runtime::spawn_global(async move {
let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
Ok(watcher) => watcher,
Err(e) => {
error!(e; "Failed to submit reconciliation procedure {procedure_id}");
return;
}
};
if let Err(e) = watcher::wait(watcher).await {
error!(e; "Failed to wait reconciliation procedure {procedure_id}");
return;
}
info!("Reconciliation procedure {procedure_id} is finished successfully!");
});
procedure_id
}
}
fn normalize_parallelism(parallelism: usize) -> usize {
if parallelism == 0 {
warn!(
"Parallelism is 0, using default parallelism: {}",
DEFAULT_PARALLELISM
);
DEFAULT_PARALLELISM
} else {
parallelism
}
}

View File

@@ -78,14 +78,21 @@ pub(crate) struct PersistentContext {
catalog: String,
fast_fail: bool,
resolve_strategy: ResolveStrategy,
parallelism: usize,
}
impl PersistentContext {
pub fn new(catalog: String, fast_fail: bool, resolve_strategy: ResolveStrategy) -> Self {
pub fn new(
catalog: String,
fast_fail: bool,
resolve_strategy: ResolveStrategy,
parallelism: usize,
) -> Self {
Self {
catalog,
fast_fail,
resolve_strategy,
parallelism,
}
}
}
@@ -111,8 +118,10 @@ impl ReconcileCatalogProcedure {
catalog: String,
fast_fail: bool,
resolve_strategy: ResolveStrategy,
parallelism: usize,
) -> Self {
let persistent_ctx = PersistentContext::new(catalog, fast_fail, resolve_strategy);
let persistent_ctx =
PersistentContext::new(catalog, fast_fail, resolve_strategy, parallelism);
let context = ReconcileCatalogContext::new(ctx, persistent_ctx);
let state = Box::new(ReconcileCatalogStart);
Self { context, state }

View File

@@ -15,13 +15,14 @@
use std::any::Any;
use common_procedure::{Context as ProcedureContext, ProcedureWithId, Status};
use common_telemetry::info;
use futures::TryStreamExt;
use serde::{Deserialize, Serialize};
use crate::error::Result;
use crate::reconciliation::reconcile_catalog::end::ReconcileCatalogEnd;
use crate::reconciliation::reconcile_catalog::{ReconcileCatalogContext, State};
use crate::reconciliation::reconcile_database::{ReconcileDatabaseProcedure, DEFAULT_PARALLELISM};
use crate::reconciliation::reconcile_database::ReconcileDatabaseProcedure;
use crate::reconciliation::utils::Context;
#[derive(Debug, Serialize, Deserialize)]
@@ -75,12 +76,16 @@ impl ReconcileDatabases {
table_metadata_manager: ctx.table_metadata_manager.clone(),
cache_invalidator: ctx.cache_invalidator.clone(),
};
info!(
"Scheduling reconcile database: {}, catalog: {}",
schema, ctx.persistent_ctx.catalog
);
let procedure = ReconcileDatabaseProcedure::new(
context,
ctx.persistent_ctx.catalog.clone(),
schema,
ctx.persistent_ctx.fast_fail,
DEFAULT_PARALLELISM,
ctx.persistent_ctx.parallelism,
ctx.persistent_ctx.resolve_strategy,
true,
);

View File

@@ -31,18 +31,29 @@ use crate::reconciliation::utils::{
};
/// Strategy for resolving column metadata inconsistencies.
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
pub(crate) enum ResolveStrategy {
/// Always uses the column metadata from metasrv.
UseMetasrv,
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)]
pub enum ResolveStrategy {
#[default]
/// Trusts the latest column metadata from datanode.
UseLatest,
/// Always uses the column metadata from metasrv.
UseMetasrv,
/// Aborts the resolution process if inconsistencies are detected.
AbortOnConflict,
}
impl From<api::v1::meta::ResolveStrategy> for ResolveStrategy {
fn from(strategy: api::v1::meta::ResolveStrategy) -> Self {
match strategy {
api::v1::meta::ResolveStrategy::UseMetasrv => Self::UseMetasrv,
api::v1::meta::ResolveStrategy::UseLatest => Self::UseLatest,
api::v1::meta::ResolveStrategy::AbortOnConflict => Self::AbortOnConflict,
}
}
}
/// State responsible for resolving inconsistencies in column metadata across physical regions.
#[derive(Debug, Serialize, Deserialize)]
pub struct ResolveColumnMetadata {

View File

@@ -24,7 +24,7 @@ mod util;
use std::fmt::Debug;
use std::sync::Arc;
use api::v1::meta::{ProcedureDetailResponse, Role};
use api::v1::meta::{ProcedureDetailResponse, ReconcileRequest, ReconcileResponse, Role};
pub use ask_leader::{AskLeader, LeaderProvider, LeaderProviderRef};
use cluster::Client as ClusterClient;
pub use cluster::ClusterKvBackend;
@@ -275,6 +275,17 @@ impl ProcedureExecutor for MetaClient {
.context(meta_error::ExternalSnafu)
}
async fn reconcile(
&self,
_ctx: &ExecutorContext,
request: ReconcileRequest,
) -> MetaResult<ReconcileResponse> {
self.reconcile(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
async fn add_region_follower(
&self,
_ctx: &ExecutorContext,
@@ -611,6 +622,11 @@ impl MetaClient {
.await
}
/// Reconcile the procedure state.
pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
self.procedure_client()?.reconcile(request).await
}
/// Submit a DDL task
pub async fn submit_ddl_task(
&self,

View File

@@ -20,7 +20,7 @@ use api::v1::meta::procedure_service_client::ProcedureServiceClient;
use api::v1::meta::{
DdlTaskRequest, DdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureId, ProcedureStateResponse,
QueryProcedureRequest, ResponseHeader, Role,
QueryProcedureRequest, ReconcileRequest, ReconcileResponse, ResponseHeader, Role,
};
use common_grpc::channel_manager::ChannelManager;
use common_telemetry::tracing_context::TracingContext;
@@ -98,6 +98,12 @@ impl Client {
.await
}
/// Reconcile the procedure state.
pub async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
let inner = self.inner.read().await;
inner.reconcile(request).await
}
pub async fn list_procedures(&self) -> Result<ProcedureDetailResponse> {
let inner = self.inner.read().await;
inner.list_procedures().await
@@ -253,6 +259,26 @@ impl Inner {
.await
}
async fn reconcile(&self, request: ReconcileRequest) -> Result<ReconcileResponse> {
let mut req = request;
req.set_header(
self.id,
self.role,
TracingContext::from_current_span().to_w3c(),
);
self.with_retry(
"reconcile",
move |mut client| {
let req = req.clone();
async move { client.reconcile(req).await.map(|res| res.into_inner()) }
},
|resp: &ReconcileResponse| &resp.header,
)
.await
}
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
let mut req = QueryProcedureRequest {
pid: Some(ProcedureId { key: pid.into() }),

View File

@@ -99,6 +99,13 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to init reconciliation manager"))]
InitReconciliationManager {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
#[snafu(implicit)]
@@ -134,6 +141,13 @@ pub enum Error {
source: common_meta::error::Error,
},
#[snafu(display("Failed to submit reconcile procedure"))]
SubmitReconcileProcedure {
#[snafu(implicit)]
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to invalidate table cache"))]
InvalidateTableCache {
#[snafu(implicit)]
@@ -1022,7 +1036,8 @@ impl ErrorExt for Error {
}
Error::DowngradeLeader { source, .. } => source.status_code(),
Error::RegisterProcedureLoader { source, .. } => source.status_code(),
Error::SubmitDdlTask { source, .. } => source.status_code(),
Error::SubmitDdlTask { source, .. }
| Error::SubmitReconcileProcedure { source, .. } => source.status_code(),
Error::ConvertProtoData { source, .. }
| Error::TableMetadataManager { source, .. }
| Error::RuntimeSwitchManager { source, .. }
@@ -1030,9 +1045,9 @@ impl ErrorExt for Error {
| Error::UnexpectedLogicalRouteTable { source, .. }
| Error::UpdateTopicNameValue { source, .. } => source.status_code(),
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}
Error::InitMetadata { source, .. }
| Error::InitDdlManager { source, .. }
| Error::InitReconciliationManager { source, .. } => source.status_code(),
Error::Other { source, .. } => source.status_code(),
Error::NoEnoughAvailableNode { .. } => StatusCode::RuntimeResourcesExhausted,

View File

@@ -35,6 +35,7 @@ use common_meta::leadership_notifier::{
};
use common_meta::node_expiry_listener::NodeExpiryListener;
use common_meta::peer::Peer;
use common_meta::reconciliation::manager::ReconciliationManagerRef;
use common_meta::region_keeper::MemoryRegionKeeperRef;
use common_meta::region_registry::LeaderRegionRegistryRef;
use common_meta::sequence::SequenceRef;
@@ -455,6 +456,7 @@ pub struct Metasrv {
leader_region_registry: LeaderRegionRegistryRef,
wal_prune_ticker: Option<WalPruneTickerRef>,
table_id_sequence: SequenceRef,
reconciliation_manager: ReconciliationManagerRef,
plugins: Plugins,
}
@@ -730,6 +732,10 @@ impl Metasrv {
&self.table_id_sequence
}
pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
&self.reconciliation_manager
}
pub fn plugins(&self) -> &Plugins {
&self.plugins
}

View File

@@ -34,6 +34,7 @@ use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_meta::node_manager::NodeManagerRef;
use common_meta::reconciliation::manager::ReconciliationManager;
use common_meta::region_keeper::MemoryRegionKeeper;
use common_meta::region_registry::LeaderRegionRegistry;
use common_meta::sequence::SequenceBuilder;
@@ -360,7 +361,7 @@ impl MetasrvBuilder {
let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
let ddl_context = DdlContext {
node_manager,
node_manager: node_manager.clone(),
cache_invalidator: cache_invalidator.clone(),
memory_region_keeper: memory_region_keeper.clone(),
leader_region_registry: leader_region_registry.clone(),
@@ -446,6 +447,16 @@ impl MetasrvBuilder {
.to_string_lossy()
.to_string();
let reconciliation_manager = Arc::new(ReconciliationManager::new(
node_manager.clone(),
table_metadata_manager.clone(),
cache_invalidator.clone(),
procedure_manager.clone(),
));
reconciliation_manager
.try_start()
.context(error::InitReconciliationManagerSnafu)?;
Ok(Metasrv {
state,
started: Arc::new(AtomicBool::new(false)),
@@ -482,6 +493,7 @@ impl MetasrvBuilder {
leader_region_registry,
wal_prune_ticker,
table_id_sequence,
reconciliation_manager,
})
}
}

View File

@@ -23,6 +23,7 @@ mod heartbeat;
pub mod mailbox;
pub mod procedure;
pub mod store;
pub(crate) mod utils;
pub type GrpcResult<T> = Result<Response<T>, Status>;
pub type GrpcStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send + Sync + 'static>>;

View File

@@ -14,30 +14,21 @@
use api::v1::meta::{
cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
Error, MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse,
RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse, RangeRequest as PbRangeRequest,
RangeResponse as PbRangeResponse,
};
use common_telemetry::warn;
use snafu::ResultExt;
use tonic::{Request, Response};
use tonic::Request;
use crate::metasrv::Metasrv;
use crate::service::GrpcResult;
use crate::{error, metasrv};
use crate::{check_leader, error, metasrv};
#[async_trait::async_trait]
impl cluster_server::Cluster for Metasrv {
async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
if !self.is_leader() {
let is_not_leader = ResponseHeader::failed(Error::is_not_leader());
let resp = PbBatchGetResponse {
header: Some(is_not_leader),
..Default::default()
};
warn!("The current meta is not leader, but a `batch_get` request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}
check_leader!(self, req, PbBatchGetResponse, "`batch_get`");
let req = req.into_inner().into();
let resp = self
@@ -51,16 +42,7 @@ impl cluster_server::Cluster for Metasrv {
}
async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
if !self.is_leader() {
let is_not_leader = ResponseHeader::failed(Error::is_not_leader());
let resp = PbRangeResponse {
header: Some(is_not_leader),
..Default::default()
};
warn!("The current meta is not leader, but a `range` request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}
check_leader!(self, req, PbRangeResponse, "`range`");
let req = req.into_inner().into();
let res = self
@@ -77,16 +59,7 @@ impl cluster_server::Cluster for Metasrv {
&self,
req: Request<MetasrvPeersRequest>,
) -> GrpcResult<MetasrvPeersResponse> {
if !self.is_leader() {
let is_not_leader = ResponseHeader::failed(Error::is_not_leader());
let resp = MetasrvPeersResponse {
header: Some(is_not_leader),
..Default::default()
};
warn!("The current meta is not leader, but a `metasrv_peers` request have reached the meta. Detail: {:?}.", req);
return Ok(Response::new(resp));
}
check_leader!(self, req, MetasrvPeersResponse, "`metasrv_peers`");
let leader_addr = &self.options().grpc.server_addr;
let (leader, followers) = match self.election() {

View File

@@ -15,25 +15,28 @@
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::reconcile_request::Target;
use api::v1::meta::{
procedure_service_server, DdlTaskRequest as PbDdlTaskRequest,
DdlTaskResponse as PbDdlTaskResponse, Error, MigrateRegionRequest, MigrateRegionResponse,
DdlTaskResponse as PbDdlTaskResponse, MigrateRegionRequest, MigrateRegionResponse,
ProcedureDetailRequest, ProcedureDetailResponse, ProcedureStateResponse, QueryProcedureRequest,
ResponseHeader,
ReconcileCatalog, ReconcileDatabase, ReconcileRequest, ReconcileResponse, ReconcileTable,
ResolveStrategy,
};
use common_meta::procedure_executor::ExecutorContext;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest};
use common_meta::rpc::procedure;
use common_telemetry::warn;
use snafu::{OptionExt, ResultExt};
use tonic::{Request, Response};
use table::table_reference::TableReference;
use tonic::Request;
use crate::error;
use crate::metasrv::Metasrv;
use crate::procedure::region_migration::manager::{
RegionMigrationProcedureTask, RegionMigrationTriggerReason,
};
use crate::service::GrpcResult;
use crate::{check_leader, error};
#[async_trait::async_trait]
impl procedure_service_server::ProcedureService for Metasrv {
@@ -41,15 +44,12 @@ impl procedure_service_server::ProcedureService for Metasrv {
&self,
request: Request<QueryProcedureRequest>,
) -> GrpcResult<ProcedureStateResponse> {
if !self.is_leader() {
let resp = ProcedureStateResponse {
header: Some(ResponseHeader::failed(Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `query procedure state` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
check_leader!(
self,
request,
ProcedureStateResponse,
"`query procedure state`"
);
let QueryProcedureRequest { header, pid, .. } = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;
@@ -71,15 +71,7 @@ impl procedure_service_server::ProcedureService for Metasrv {
}
async fn ddl(&self, request: Request<PbDdlTaskRequest>) -> GrpcResult<PbDdlTaskResponse> {
if !self.is_leader() {
let resp = PbDdlTaskResponse {
header: Some(ResponseHeader::failed(Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `ddl` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
check_leader!(self, request, PbDdlTaskResponse, "`ddl`");
let PbDdlTaskRequest {
header,
@@ -121,15 +113,7 @@ impl procedure_service_server::ProcedureService for Metasrv {
&self,
request: Request<MigrateRegionRequest>,
) -> GrpcResult<MigrateRegionResponse> {
if !self.is_leader() {
let resp = MigrateRegionResponse {
header: Some(ResponseHeader::failed(Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `migrate` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
check_leader!(self, request, MigrateRegionResponse, "`migrate`");
let MigrateRegionRequest {
header,
@@ -169,19 +153,79 @@ impl procedure_service_server::ProcedureService for Metasrv {
Ok(Response::new(resp))
}
async fn reconcile(&self, request: Request<ReconcileRequest>) -> GrpcResult<ReconcileResponse> {
check_leader!(self, request, ReconcileResponse, "`reconcile`");
let ReconcileRequest { header, target } = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;
let target = target.context(error::MissingRequiredParameterSnafu { param: "target" })?;
let parse_resolve_strategy = |resolve_strategy: i32| {
ResolveStrategy::try_from(resolve_strategy)
.ok()
.context(error::UnexpectedSnafu {
violated: format!("Invalid resolve strategy: {}", resolve_strategy),
})
};
let procedure_id = match target {
Target::ReconcileTable(table) => {
let ReconcileTable {
catalog_name,
schema_name,
table_name,
resolve_strategy,
} = table;
let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
self.reconciliation_manager()
.reconcile_table(table_ref, resolve_strategy.into())
.await
.context(error::SubmitReconcileProcedureSnafu)?
}
Target::ReconcileDatabase(database) => {
let ReconcileDatabase {
catalog_name,
database_name,
resolve_strategy,
parallelism,
} = database;
let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
self.reconciliation_manager().reconcile_database(
catalog_name,
database_name,
resolve_strategy.into(),
parallelism as usize,
)
}
Target::ReconcileCatalog(catalog) => {
let ReconcileCatalog {
catalog_name,
resolve_strategy,
parallelism,
} = catalog;
let resolve_strategy = parse_resolve_strategy(resolve_strategy)?;
self.reconciliation_manager().reconcile_catalog(
catalog_name,
resolve_strategy.into(),
parallelism as usize,
)
}
};
Ok(Response::new(ReconcileResponse {
pid: Some(procedure::pid_to_pb_pid(procedure_id)),
..Default::default()
}))
}
async fn details(
&self,
request: Request<ProcedureDetailRequest>,
) -> GrpcResult<ProcedureDetailResponse> {
if !self.is_leader() {
let resp = ProcedureDetailResponse {
header: Some(ResponseHeader::failed(Error::is_not_leader())),
..Default::default()
};
warn!("The current meta is not leader, but a `procedure details` request have reached the meta. Detail: {:?}.", request);
return Ok(Response::new(resp));
}
check_leader!(
self,
request,
ProcedureDetailResponse,
"`procedure details`"
);
let ProcedureDetailRequest { header } = request.into_inner();
let _header = header.context(error::MissingRequestHeaderSnafu)?;

View File

@@ -0,0 +1,34 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[macro_export]
macro_rules! check_leader {
($self:expr, $request:expr, $resp_ty:ty, $msg:expr) => {
use common_telemetry::warn;
use api::v1::meta::{ResponseHeader, Error};
use tonic::Response;
if !$self.is_leader() {
warn!(
"The current metasrv is not the leader, but a {} request has reached the meta. Detail: {:?}.",
$msg, $request
);
let mut resp: $resp_ty = Default::default();
resp.header = Some(ResponseHeader::failed(
Error::is_not_leader(),
));
return Ok(Response::new(resp));
}
};
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::meta::ReconcileRequest;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
@@ -57,6 +58,17 @@ impl ProcedureServiceHandler for ProcedureServiceOperator {
.map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
}
async fn reconcile(&self, request: ReconcileRequest) -> QueryResult<Option<String>> {
Ok(self
.procedure_executor
.reconcile(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)?
.pid
.map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
}
async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
self.procedure_executor
.query_procedure_state(&ExecutorContext::default(), pid)