feat: impl migrate_region and procedure_state SQL function (#3325)

* fix: logical region can't find region routes

* feat: fetch partitions info in batch

* refactor: rename batch functions

* refactor: rename DdlTaskExecutor to ProcedureExecutor

* feat: impl migrate_region and query_procedure_state for ProcedureExecutor

* feat: adds SQL function procedure_state and finish migrate_region impl

* fix: constant vector

* feat: unit tests for migrate_region and procedure_state

* test: test region migration by SQL

* fix: compile error after rebeasing

* fix: clippy warnings

* feat: ensure procedure_state and migrate_region can be only called under greptime catalog

* fix: license header
This commit is contained in:
dennis zhuang
2024-02-22 10:37:11 +08:00
committed by GitHub
parent e9a2b0a9ee
commit 564fe3beca
48 changed files with 994 additions and 189 deletions

3
Cargo.lock generated
View File

@@ -1768,8 +1768,10 @@ dependencies = [
"arc-swap",
"async-trait",
"chrono-tz 0.6.3",
"common-catalog",
"common-error",
"common-macro",
"common-meta",
"common-query",
"common-runtime",
"common-telemetry",
@@ -1784,6 +1786,7 @@ dependencies = [
"paste",
"ron",
"serde",
"serde_json",
"session",
"snafu",
"statrs",

View File

@@ -164,11 +164,8 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to find table partitions: #{table}"))]
FindPartitions {
source: partition::error::Error,
table: String,
},
#[snafu(display("Failed to find table partitions"))]
FindPartitions { source: partition::error::Error },
#[snafu(display("Failed to find region routes"))]
FindRegionRoutes { source: partition::error::Error },

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use core::pin::pin;
use std::sync::{Arc, Weak};
use arrow_schema::SchemaRef as ArrowSchemaRef;
@@ -31,7 +32,7 @@ use datatypes::vectors::{
ConstantVector, DateTimeVector, DateTimeVectorBuilder, Int64Vector, Int64VectorBuilder,
MutableVector, StringVector, StringVectorBuilder, UInt64VectorBuilder,
};
use futures::TryStreamExt;
use futures::{StreamExt, TryStreamExt};
use partition::manager::PartitionInfo;
use partition::partition::PartitionDef;
use snafu::{OptionExt, ResultExt};
@@ -240,40 +241,64 @@ impl InformationSchemaPartitionsBuilder {
let predicates = Predicates::from_scan_request(&request);
for schema_name in catalog_manager.schema_names(&catalog_name).await? {
let mut stream = catalog_manager.tables(&catalog_name, &schema_name).await;
let table_info_stream = catalog_manager
.tables(&catalog_name, &schema_name)
.await
.try_filter_map(|t| async move {
let table_info = t.table_info();
if table_info.table_type == TableType::Temporary {
Ok(None)
} else {
Ok(Some(table_info))
}
});
while let Some(table) = stream.try_next().await? {
let table_info = table.table_info();
const BATCH_SIZE: usize = 128;
if table_info.table_type == TableType::Temporary {
continue;
}
// Split table infos into chunks
let mut table_info_chunks = pin!(table_info_stream.ready_chunks(BATCH_SIZE));
let table_id = table_info.ident.table_id;
let partitions = if let Some(partition_manager) = &partition_manager {
while let Some(table_infos) = table_info_chunks.next().await {
let table_infos = table_infos.into_iter().collect::<Result<Vec<_>>>()?;
let table_ids: Vec<TableId> =
table_infos.iter().map(|info| info.ident.table_id).collect();
let mut table_partitions = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_table_partitions(table_id)
.batch_find_table_partitions(&table_ids)
.await
.context(FindPartitionsSnafu {
table: &table_info.name,
})?
.context(FindPartitionsSnafu)?
} else {
// Current node must be a standalone instance, contains only one partition by default.
// TODO(dennis): change it when we support multi-regions for standalone.
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}]
table_ids
.into_iter()
.map(|table_id| {
(
table_id,
vec![PartitionInfo {
id: RegionId::new(table_id, 0),
partition: PartitionDef::new(vec![], vec![]),
}],
)
})
.collect()
};
self.add_partitions(
&predicates,
&table_info,
&catalog_name,
&schema_name,
&table_info.name,
&partitions,
);
for table_info in table_infos {
let partitions = table_partitions
.remove(&table_info.ident.table_id)
.unwrap_or(vec![]);
self.add_partitions(
&predicates,
&table_info,
&catalog_name,
&schema_name,
&table_info.name,
&partitions,
);
}
}
}

View File

@@ -199,7 +199,7 @@ impl InformationSchemaRegionPeersBuilder {
let table_routes = if let Some(partition_manager) = &partition_manager {
partition_manager
.find_region_routes_batch(&table_ids)
.batch_find_region_routes(&table_ids)
.await
.context(FindRegionRoutesSnafu)?
} else {

View File

@@ -260,6 +260,7 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
catalog_list,
None,
None,
None,
false,
plugins.clone(),
));

View File

@@ -22,7 +22,7 @@ use common_config::{metadata_store_dir, KvBackendConfig};
use common_meta::cache_invalidator::DummyCacheInvalidator;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::ddl_manager::DdlManager;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
@@ -459,8 +459,8 @@ impl StartCommand {
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Result<DdlTaskExecutorRef> {
let ddl_task_executor: DdlTaskExecutorRef = Arc::new(
) -> Result<ProcedureExecutorRef> {
let procedure_executor: ProcedureExecutorRef = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
@@ -472,7 +472,7 @@ impl StartCommand {
.context(InitDdlManagerSnafu)?,
);
Ok(ddl_task_executor)
Ok(procedure_executor)
}
pub async fn create_table_metadata_manager(

View File

@@ -12,8 +12,10 @@ api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
chrono-tz = "0.6"
common-catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-query.workspace = true
common-runtime.workspace = true
common-telemetry.workspace = true
@@ -26,6 +28,8 @@ num = "0.4"
num-traits = "0.2"
once_cell.workspace = true
paste = "1.0"
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
statrs = "0.16"

View File

@@ -30,6 +30,17 @@ pub struct FunctionContext {
pub state: Arc<FunctionState>,
}
impl FunctionContext {
/// Create a mock [`FunctionContext`] for test.
#[cfg(any(test, feature = "testing"))]
pub fn mock() -> Self {
Self {
query_ctx: QueryContextBuilder::default().build(),
state: Arc::new(FunctionState::mock()),
}
}
}
impl Default for FunctionContext {
fn default() -> Self {
Self {

View File

@@ -13,10 +13,9 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::ProcedureStateResponse;
use async_trait::async_trait;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use table::requests::{DeleteRequest, InsertRequest};
@@ -31,24 +30,18 @@ pub trait TableMutationHandler: Send + Sync {
/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(
&self,
region_id: u64,
from_peer: u64,
to_peer: u64,
replay_timeout: Duration,
) -> Result<String>;
}
/// A trait for handling meta service requests in `QueryEngine`.
/// A trait for handling procedure service requests in `QueryEngine`.
#[async_trait]
pub trait MetaServiceHandler: Send + Sync {
pub trait ProcedureServiceHandler: Send + Sync {
/// Migrate a region from source peer to target peer, returns the procedure id if success.
async fn migrate_region(&self, request: MigrateRegionRequest) -> Result<Option<String>>;
/// Query the procedure' state by its id
async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse>;
}
pub type TableMutationHandlerRef = Arc<dyn TableMutationHandler>;
pub type MetaServiceHandlerRef = Arc<dyn MetaServiceHandler>;
pub type ProcedureServiceHandlerRef = Arc<dyn ProcedureServiceHandler>;

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod macros;
pub mod scalars;
mod system;
mod table;

View File

@@ -0,0 +1,27 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Ensure current function is invokded under `greptime` catalog.
#[macro_export]
macro_rules! ensure_greptime {
($func_ctx: expr) => {{
use common_catalog::consts::DEFAULT_CATALOG_NAME;
snafu::ensure!(
$func_ctx.query_ctx.current_catalog() == DEFAULT_CATALOG_NAME,
common_query::error::PermissionDeniedSnafu {
err_msg: format!("current catalog is not {DEFAULT_CATALOG_NAME}")
}
);
}};
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
use crate::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
/// Shared state for SQL functions.
/// The handlers in state may be `None` in cli command-line or test cases.
@@ -20,6 +20,45 @@ use crate::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
pub struct FunctionState {
// The table mutation handler
pub table_mutation_handler: Option<TableMutationHandlerRef>,
// The meta service handler
pub meta_service_handler: Option<MetaServiceHandlerRef>,
// The procedure service handler
pub procedure_service_handler: Option<ProcedureServiceHandlerRef>,
}
impl FunctionState {
/// Create a mock [`FunctionState`] for test.
#[cfg(any(test, feature = "testing"))]
pub fn mock() -> Self {
use std::sync::Arc;
use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use crate::handlers::ProcedureServiceHandler;
struct MockProcedureServiceHandler;
#[async_trait]
impl ProcedureServiceHandler for MockProcedureServiceHandler {
async fn migrate_region(
&self,
_request: MigrateRegionRequest,
) -> Result<Option<String>> {
Ok(Some("test_pid".to_string()))
}
async fn query_procedure_state(&self, _pid: &str) -> Result<ProcedureStateResponse> {
Ok(ProcedureStateResponse {
status: ProcedureStatus::Done.into(),
error: "OK".to_string(),
..Default::default()
})
}
}
Self {
table_mutation_handler: None,
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
}
}
}

View File

@@ -14,6 +14,7 @@
mod build;
mod database;
mod procedure_state;
mod timezone;
mod version;
@@ -21,6 +22,7 @@ use std::sync::Arc;
use build::BuildFunction;
use database::DatabaseFunction;
use procedure_state::ProcedureStateFunction;
use timezone::TimezoneFunction;
use version::VersionFunction;
@@ -34,5 +36,6 @@ impl SystemFunction {
registry.register(Arc::new(VersionFunction));
registry.register(Arc::new(DatabaseFunction));
registry.register(Arc::new(TimezoneFunction));
registry.register(Arc::new(ProcedureStateFunction));
}
}

View File

@@ -22,7 +22,7 @@ use datatypes::vectors::{StringVector, VectorRef};
use crate::function::{Function, FunctionContext};
/// Generates build information
/// Generates build information
#[derive(Clone, Debug, Default)]
pub struct BuildFunction;
@@ -42,11 +42,7 @@ impl Function for BuildFunction {
}
fn signature(&self) -> Signature {
Signature::uniform(
0,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
Signature::uniform(0, vec![], Volatility::Immutable)
}
fn eval(&self, _func_ctx: FunctionContext, _columns: &[VectorRef]) -> Result<VectorRef> {
@@ -75,7 +71,7 @@ mod tests {
Signature {
type_signature: TypeSignature::Uniform(0, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::string_datatype()]
} if valid_types.is_empty()
));
let build_info = common_version::build_info().to_string();
let vector = build.eval(FunctionContext::default(), &[]).unwrap();

View File

@@ -0,0 +1,216 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use api::v1::meta::ProcedureStatus;
use common_meta::rpc::procedure::ProcedureStateResponse;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::{ConstantVector, Helper, StringVector, VectorRef};
use serde::Serialize;
use snafu::{ensure, Location, OptionExt};
use crate::function::{Function, FunctionContext};
const NAME: &str = "procedure_state";
/// A function to query procedure state by its id.
/// Such as `procedure_state(pid)`.
#[derive(Clone, Debug, Default)]
pub struct ProcedureStateFunction;
impl fmt::Display for ProcedureStateFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "PROCEDURE_STATE")
}
}
#[derive(Serialize)]
struct ProcedureStateJson {
status: String,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<String>,
}
impl Function for ProcedureStateFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
crate::ensure_greptime!(func_ctx);
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
columns.len()
),
}
);
let pids = columns[0].clone();
let expect_len = pids.len();
let is_const = pids.is_const();
match pids.data_type() {
ConcreteDataType::String(_) => {
// TODO(dennis): datafusion UDF doesn't support async function currently
std::thread::spawn(move || {
let pids: &StringVector = if is_const {
let pids: &ConstantVector = unsafe { Helper::static_cast(&pids) };
unsafe { Helper::static_cast(pids.inner()) }
} else {
unsafe { Helper::static_cast(&pids) }
};
let procedure_service_handler = func_ctx
.state
.procedure_service_handler
.as_ref()
.context(MissingProcedureServiceHandlerSnafu)?;
let states = pids
.iter_data()
.map(|pid| {
if let Some(pid) = pid {
let ProcedureStateResponse { status, error, .. } =
common_runtime::block_on_read(async move {
procedure_service_handler.query_procedure_state(pid).await
})?;
let status = ProcedureStatus::try_from(status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown");
let state = ProcedureStateJson {
status: status.to_string(),
error: if error.is_empty() { None } else { Some(error) },
};
Ok(Some(serde_json::to_string(&state).unwrap_or_default()))
} else {
Ok(None)
}
})
.collect::<Result<Vec<_>>>()?;
let results: VectorRef = Arc::new(StringVector::from(states));
if is_const {
Ok(Arc::new(ConstantVector::new(results, expect_len)) as _)
} else {
Ok(results)
}
})
.join()
.map_err(|e| {
error!(e; "Join thread error");
ThreadJoin {
location: Location::default(),
}
})?
}
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail(),
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::StringVector;
use super::*;
#[test]
fn test_procedure_state_misc() {
let f = ProcedureStateFunction;
assert_eq!("procedure_state", f.name());
assert_eq!(
ConcreteDataType::string_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::string_datatype()]
));
}
#[test]
fn test_missing_procedure_service() {
let f = ProcedureStateFunction;
let args = vec!["pid"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}
#[test]
fn test_procedure_state() {
let f = ProcedureStateFunction;
let args = vec!["pid"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from_slice(&[arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec![
"{\"status\":\"Done\",\"error\":\"OK\"}",
]));
assert_eq!(expect, result);
}
}

View File

@@ -15,9 +15,10 @@
use std::fmt::{self};
use std::time::Duration;
use common_meta::rpc::procedure::MigrateRegionRequest;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingTableMutationHandlerSnafu, Result,
InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingProcedureServiceHandlerSnafu, Result,
};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::logging::error;
@@ -77,6 +78,8 @@ impl Function for MigrateRegionFunction {
}
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
crate::ensure_greptime!(func_ctx);
let (region_ids, from_peers, to_peers, replay_timeouts) = match columns.len() {
3 => {
let region_ids = cast_u64_vector(&columns[0])?;
@@ -106,9 +109,15 @@ impl Function for MigrateRegionFunction {
}
};
// TODO(dennis): datafusion UDF doesn't support async function currently
std::thread::spawn(move || {
let len = region_ids.len();
let mut results = StringVectorBuilder::with_capacity(len);
let procedure_service_handler = func_ctx
.state
.procedure_service_handler
.as_ref()
.context(MissingProcedureServiceHandlerSnafu)?;
for index in 0..len {
let region_id = region_ids.get(index);
@@ -126,24 +135,18 @@ impl Function for MigrateRegionFunction {
Value::UInt64(to_peer),
Value::UInt64(replay_timeout),
) => {
let func_ctx = func_ctx.clone();
let pid = common_runtime::block_on_read(async move {
func_ctx
.state
.table_mutation_handler
.as_ref()
.context(MissingTableMutationHandlerSnafu)?
.migrate_region(
procedure_service_handler
.migrate_region(MigrateRegionRequest {
region_id,
from_peer,
to_peer,
Duration::from_secs(replay_timeout),
)
replay_timeout: Duration::from_secs(replay_timeout),
})
.await
})?;
results.push(Some(&pid));
results.push(pid.as_deref())
}
_ => {
results.push(None);
@@ -171,5 +174,60 @@ impl fmt::Display for MigrateRegionFunction {
#[cfg(test)]
mod tests {
// FIXME(dennis): test in the following PR.
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{StringVector, UInt64Vector};
use super::*;
#[test]
fn test_migrate_region_misc() {
let f = MigrateRegionFunction;
assert_eq!("migrate_region", f.name());
assert_eq!(
ConcreteDataType::string_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::OneOf(sigs),
volatility: Volatility::Immutable
} if sigs.len() == 2));
}
#[test]
fn test_missing_procedure_service() {
let f = MigrateRegionFunction;
let args = vec![1, 1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing ProcedureServiceHandler, not expected",
result.to_string()
);
}
#[test]
fn test_migrate_region() {
let f = MigrateRegionFunction;
let args = vec![1, 1, 1];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let expect: VectorRef = Arc::new(StringVector::from(vec!["test_pid"]));
assert_eq!(expect, result);
}
}

View File

@@ -26,6 +26,7 @@ use crate::key::table_route::TableRouteValue;
use crate::key::TableMetadataManagerRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
pub mod alter_table;
pub mod create_logical_tables;
@@ -46,16 +47,32 @@ pub struct ExecutorContext {
pub tracing_context: Option<W3cTrace>,
}
/// The procedure executor that accepts ddl, region migration task etc.
#[async_trait::async_trait]
pub trait DdlTaskExecutor: Send + Sync {
pub trait ProcedureExecutor: Send + Sync {
/// Submit a ddl task
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
request: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse>;
/// Submit a region migration task
async fn migrate_region(
&self,
ctx: &ExecutorContext,
request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse>;
/// Query the procedure state by its id
async fn query_procedure_state(
&self,
ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse>;
}
pub type DdlTaskExecutorRef = Arc<dyn DdlTaskExecutor>;
pub type ProcedureExecutorRef = Arc<dyn ProcedureExecutor>;
pub struct TableMetadataAllocatorContext {
pub cluster_id: u64,

View File

@@ -28,10 +28,10 @@ use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::table_meta::TableMetadataAllocatorRef;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{utils, DdlContext, DdlTaskExecutor, ExecutorContext};
use crate::ddl::{utils, DdlContext, ExecutorContext, ProcedureExecutor};
use crate::error::{
self, EmptyCreateTableTasksSnafu, ProcedureOutputSnafu, RegisterProcedureLoaderSnafu, Result,
SubmitProcedureSnafu, TableNotFoundSnafu, WaitProcedureSnafu,
SubmitProcedureSnafu, TableNotFoundSnafu, UnsupportedSnafu, WaitProcedureSnafu,
};
use crate::key::table_info::TableInfoValue;
use crate::key::table_name::TableNameKey;
@@ -46,6 +46,8 @@ use crate::rpc::ddl::{
AlterTableTask, CreateTableTask, DropTableTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse,
TruncateTableTask,
};
use crate::rpc::procedure;
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::rpc::router::RegionRoute;
use crate::table_name::TableName;
use crate::ClusterId;
@@ -527,8 +529,9 @@ async fn handle_create_logical_table_tasks(
})
}
/// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it.
#[async_trait::async_trait]
impl DdlTaskExecutor for DdlManager {
impl ProcedureExecutor for DdlManager {
async fn submit_ddl_task(
&self,
ctx: &ExecutorContext,
@@ -566,6 +569,37 @@ impl DdlTaskExecutor for DdlManager {
.trace(span)
.await
}
async fn migrate_region(
&self,
_ctx: &ExecutorContext,
_request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse> {
UnsupportedSnafu {
operation: "migrate_region",
}
.fail()
}
async fn query_procedure_state(
&self,
_ctx: &ExecutorContext,
pid: &str,
) -> Result<ProcedureStateResponse> {
let pid = ProcedureId::parse_str(pid)
.with_context(|_| error::ParseProcedureIdSnafu { key: pid })?;
let state = self
.procedure_manager
.procedure_state(pid)
.await
.context(error::QueryProcedureSnafu)?
.context(error::ProcedureNotFoundSnafu {
pid: pid.to_string(),
})?;
Ok(procedure::procedure_state_to_pb_response(&state))
}
}
#[cfg(test)]

View File

@@ -100,6 +100,15 @@ pub enum Error {
source: common_procedure::Error,
},
#[snafu(display("Failed to query procedure"))]
QueryProcedure {
location: Location,
source: common_procedure::Error,
},
#[snafu(display("Procedure not found: {pid}"))]
ProcedureNotFound { location: Location, pid: String },
#[snafu(display("Failed to parse procedure id: {key}"))]
ParseProcedureId {
location: Location,
@@ -431,14 +440,17 @@ impl ErrorExt for Error {
| RenameTable { .. }
| Unsupported { .. } => StatusCode::Internal,
PrimaryKeyNotFound { .. } | EmptyKey { .. } | InvalidEngineType { .. } => {
StatusCode::InvalidArguments
}
ProcedureNotFound { .. }
| PrimaryKeyNotFound { .. }
| EmptyKey { .. }
| InvalidEngineType { .. } => StatusCode::InvalidArguments,
TableNotFound { .. } => StatusCode::TableNotFound,
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
SubmitProcedure { source, .. } | WaitProcedure { source, .. } => source.status_code(),
SubmitProcedure { source, .. }
| QueryProcedure { source, .. }
| WaitProcedure { source, .. } => source.status_code(),
RegisterProcedureLoader { source, .. } => source.status_code(),
External { source, .. } => source.status_code(),
OperateDatanode { source, .. } => source.status_code(),

View File

@@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
pub use api::v1::meta::{MigrateRegionResponse, ProcedureStateResponse};
use api::v1::meta::{
ProcedureId as PbProcedureId, ProcedureStateResponse as PbProcedureStateResponse,
ProcedureStatus as PbProcedureStatus,
@@ -21,6 +24,15 @@ use snafu::ResultExt;
use crate::error::{ParseProcedureIdSnafu, Result};
/// A request to migrate region.
#[derive(Clone)]
pub struct MigrateRegionRequest {
pub region_id: u64,
pub from_peer: u64,
pub to_peer: u64,
pub replay_timeout: Duration,
}
/// Cast the protobuf [`ProcedureId`] to common [`ProcedureId`].
pub fn pb_pid_to_pid(pid: &PbProcedureId) -> Result<ProcedureId> {
ProcedureId::parse_str(&String::from_utf8_lossy(&pid.key)).with_context(|_| {

View File

@@ -178,14 +178,23 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to do procedure task"))]
ProcedureService {
source: BoxedError,
location: Location,
},
#[snafu(display("Missing TableMutationHandler, not expected"))]
MissingTableMutationHandler { location: Location },
#[snafu(display("Missing MetaServiceHandler, not expected"))]
MissingMetaServiceHandler { location: Location },
#[snafu(display("Missing ProcedureServiceHandler, not expected"))]
MissingProcedureServiceHandler { location: Location },
#[snafu(display("Invalid function args: {}", err_msg))]
InvalidFuncArgs { err_msg: String, location: Location },
#[snafu(display("Permission denied: {}", err_msg))]
PermissionDenied { err_msg: String, location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -213,7 +222,7 @@ impl ErrorExt for Error {
| Error::FromArrowArray { source, .. } => source.status_code(),
Error::MissingTableMutationHandler { .. }
| Error::MissingMetaServiceHandler { .. }
| Error::MissingProcedureServiceHandler { .. }
| Error::ExecuteRepeatedly { .. }
| Error::ThreadJoin { .. }
| Error::GeneralDataFusion { .. } => StatusCode::Unexpected,
@@ -225,7 +234,11 @@ impl ErrorExt for Error {
Error::ConvertDfRecordBatchStream { source, .. } => source.status_code(),
Error::ExecutePhysicalPlan { source, .. } => source.status_code(),
Error::Execute { source, .. } => source.status_code(),
Error::TableMutation { source, .. } => source.status_code(),
Error::ProcedureService { source, .. } | Error::TableMutation { source, .. } => {
source.status_code()
}
Error::PermissionDenied { .. } => StatusCode::PermissionDenied,
}
}

View File

@@ -310,6 +310,7 @@ impl DatanodeBuilder {
MemoryCatalogManager::with_default_setup(),
None,
None,
None,
false,
self.plugins.clone(),
);

View File

@@ -147,7 +147,7 @@ impl Instance {
.enable_router()
.enable_store()
.enable_heartbeat()
.enable_ddl()
.enable_procedure()
.channel_manager(channel_manager)
.ddl_channel_manager(ddl_channel_manager)
.build();

View File

@@ -19,11 +19,12 @@ use catalog::kvbackend::KvBackendCatalogManager;
use common_base::Plugins;
use common_meta::cache_invalidator::{CacheInvalidatorRef, DummyCacheInvalidator};
use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
use operator::statement::StatementExecutor;
use operator::table::TableMutationOperator;
use partition::manager::PartitionRuleManager;
@@ -35,12 +36,13 @@ use crate::instance::region_query::FrontendRegionQueryHandler;
use crate::instance::{Instance, StatementExecutorRef};
use crate::script::ScriptExecutor;
/// The frontend [`Instance`] builder.
pub struct FrontendBuilder {
kv_backend: KvBackendRef,
cache_invalidator: Option<CacheInvalidatorRef>,
datanode_manager: DatanodeManagerRef,
plugins: Option<Plugins>,
ddl_task_executor: DdlTaskExecutorRef,
procedure_executor: ProcedureExecutorRef,
heartbeat_task: Option<HeartbeatTask>,
}
@@ -48,14 +50,14 @@ impl FrontendBuilder {
pub fn new(
kv_backend: KvBackendRef,
datanode_manager: DatanodeManagerRef,
ddl_task_executor: DdlTaskExecutorRef,
procedure_executor: ProcedureExecutorRef,
) -> Self {
Self {
kv_backend,
cache_invalidator: None,
datanode_manager,
plugins: None,
ddl_task_executor,
procedure_executor,
heartbeat_task: None,
}
}
@@ -112,10 +114,15 @@ impl FrontendBuilder {
deleter.clone(),
));
let procedure_service_handler = Arc::new(ProcedureServiceOperator::new(
self.procedure_executor.clone(),
));
let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_query_handler.clone()),
Some(table_mutation_handler),
Some(procedure_service_handler),
true,
plugins.clone(),
)
@@ -127,7 +134,7 @@ impl FrontendBuilder {
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),
query_engine.clone(),
self.ddl_task_executor,
self.procedure_executor,
kv_backend.clone(),
catalog_manager.clone(),
inserter.clone(),

View File

@@ -23,10 +23,13 @@ mod store;
use api::v1::meta::Role;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::ddl::{DdlTaskExecutor, ExecutorContext};
use common_meta::ddl::{ExecutorContext, ProcedureExecutor};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use common_meta::rpc::procedure::{
MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse,
};
use common_meta::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
@@ -56,7 +59,7 @@ pub struct MetaClientBuilder {
enable_router: bool,
enable_store: bool,
enable_lock: bool,
enable_ddl: bool,
enable_procedure: bool,
channel_manager: Option<ChannelManager>,
ddl_channel_manager: Option<ChannelManager>,
heartbeat_channel_manager: Option<ChannelManager>,
@@ -99,9 +102,9 @@ impl MetaClientBuilder {
}
}
pub fn enable_ddl(self) -> Self {
pub fn enable_procedure(self) -> Self {
Self {
enable_ddl: true,
enable_procedure: true,
..self
}
}
@@ -155,9 +158,9 @@ impl MetaClientBuilder {
if self.enable_lock {
client.lock = Some(LockClient::new(self.id, self.role, mgr.clone()));
}
if self.enable_ddl {
if self.enable_procedure {
let mgr = self.ddl_channel_manager.unwrap_or(mgr);
client.ddl = Some(ProcedureClient::new(
client.procedure = Some(ProcedureClient::new(
self.id,
self.role,
mgr,
@@ -176,11 +179,11 @@ pub struct MetaClient {
heartbeat: Option<HeartbeatClient>,
store: Option<StoreClient>,
lock: Option<LockClient>,
ddl: Option<ProcedureClient>,
procedure: Option<ProcedureClient>,
}
#[async_trait::async_trait]
impl DdlTaskExecutor for MetaClient {
impl ProcedureExecutor for MetaClient {
async fn submit_ddl_task(
&self,
_ctx: &ExecutorContext,
@@ -191,6 +194,28 @@ impl DdlTaskExecutor for MetaClient {
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
async fn migrate_region(
&self,
_ctx: &ExecutorContext,
request: MigrateRegionRequest,
) -> MetaResult<MigrateRegionResponse> {
self.migrate_region(request)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
async fn query_procedure_state(
&self,
_ctx: &ExecutorContext,
pid: &str,
) -> MetaResult<ProcedureStateResponse> {
self.query_procedure_state(pid)
.await
.map_err(BoxedError::new)
.context(meta_error::ExternalSnafu)
}
}
impl MetaClient {
@@ -228,7 +253,7 @@ impl MetaClient {
client.start(urls.clone()).await?;
info!("Lock client started");
}
if let Some(client) = &mut self.ddl {
if let Some(client) = &mut self.procedure {
client.start(urls).await?;
info!("DDL client started");
}
@@ -328,13 +353,33 @@ impl MetaClient {
Ok(())
}
/// Query the procedure state by its id.
pub async fn query_procedure_state(&self, pid: &str) -> Result<ProcedureStateResponse> {
self.procedure_client()?.query_procedure_state(pid).await
}
/// Submit a region migration task.
pub async fn migrate_region(
&self,
request: MigrateRegionRequest,
) -> Result<MigrateRegionResponse> {
self.procedure_client()?
.migrate_region(
request.region_id,
request.from_peer,
request.to_peer,
request.replay_timeout,
)
.await
}
/// Submit a DDL task
pub async fn submit_ddl_task(
&self,
req: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let res = self
.ddl_client()?
.procedure_client()?
.submit_ddl_task(req.try_into().context(error::ConvertMetaRequestSnafu)?)
.await?
.try_into()
@@ -365,8 +410,8 @@ impl MetaClient {
}
#[inline]
pub fn ddl_client(&self) -> Result<ProcedureClient> {
self.ddl
pub fn procedure_client(&self) -> Result<ProcedureClient> {
self.procedure
.clone()
.context(error::NotStartedSnafu { name: "ddl_client" })
}

View File

@@ -21,7 +21,7 @@ use std::time::Duration;
use common_base::Plugins;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::TableMetadataManagerRef;
use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
use common_meta::peer::Peer;
@@ -253,7 +253,7 @@ pub struct MetaSrv {
lock: DistLockRef,
procedure_manager: ProcedureManagerRef,
mailbox: MailboxRef,
ddl_executor: DdlTaskExecutorRef,
procedure_executor: ProcedureExecutorRef,
wal_options_allocator: WalOptionsAllocatorRef,
table_metadata_manager: TableMetadataManagerRef,
memory_region_keeper: MemoryRegionKeeperRef,
@@ -423,8 +423,8 @@ impl MetaSrv {
&self.mailbox
}
pub fn ddl_executor(&self) -> &DdlTaskExecutorRef {
&self.ddl_executor
pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
&self.procedure_executor
}
pub fn procedure_manager(&self) -> &ProcedureManagerRef {

View File

@@ -329,7 +329,7 @@ impl MetaSrvBuilder {
lock,
procedure_manager,
mailbox,
ddl_executor: ddl_manager,
procedure_executor: ddl_manager,
wal_options_allocator,
table_metadata_manager,
greptimedb_telemetry_task: get_greptimedb_telemetry_task(

View File

@@ -66,7 +66,7 @@ impl procedure_service_server::ProcedureService for MetaSrv {
.context(error::ConvertProtoDataSnafu)?;
let resp = self
.ddl_executor()
.procedure_executor()
.submit_ddl_task(
&ExecutorContext {
cluster_id: Some(cluster_id),

View File

@@ -17,6 +17,7 @@ pub mod error;
pub mod expr_factory;
pub mod insert;
pub mod metrics;
pub mod procedure;
pub mod region_req_factory;
pub mod req_convert;
pub mod statement;

View File

@@ -0,0 +1,56 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::handlers::ProcedureServiceHandler;
use common_meta::ddl::{ExecutorContext, ProcedureExecutorRef};
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use snafu::ResultExt;
/// The operator for procedures which implements [`ProcedureServiceHandler`].
#[derive(Clone)]
pub struct ProcedureServiceOperator {
procedure_executor: ProcedureExecutorRef,
}
impl ProcedureServiceOperator {
pub fn new(procedure_executor: ProcedureExecutorRef) -> Self {
Self { procedure_executor }
}
}
#[async_trait]
impl ProcedureServiceHandler for ProcedureServiceOperator {
async fn migrate_region(&self, request: MigrateRegionRequest) -> QueryResult<Option<String>> {
Ok(self
.procedure_executor
.migrate_region(&ExecutorContext::default(), request)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)?
.pid
.map(|pid| String::from_utf8_lossy(&pid.key).to_string()))
}
async fn query_procedure_state(&self, pid: &str) -> QueryResult<ProcedureStateResponse> {
self.procedure_executor
.query_procedure_state(&ExecutorContext::default(), pid)
.await
.map_err(BoxedError::new)
.context(query_error::ProcedureServiceSnafu)
}
}

View File

@@ -26,7 +26,7 @@ use std::sync::Arc;
use catalog::CatalogManagerRef;
use common_error::ext::BoxedError;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::DdlTaskExecutorRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
use common_meta::kv_backend::KvBackendRef;
use common_meta::table_name::TableName;
@@ -61,7 +61,7 @@ use crate::table::table_idents_to_full_name;
pub struct StatementExecutor {
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
ddl_executor: DdlTaskExecutorRef,
procedure_executor: ProcedureExecutorRef,
table_metadata_manager: TableMetadataManagerRef,
partition_manager: PartitionRuleManagerRef,
cache_invalidator: CacheInvalidatorRef,
@@ -72,7 +72,7 @@ impl StatementExecutor {
pub fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
ddl_task_executor: DdlTaskExecutorRef,
procedure_executor: ProcedureExecutorRef,
kv_backend: KvBackendRef,
cache_invalidator: CacheInvalidatorRef,
inserter: InserterRef,
@@ -80,7 +80,7 @@ impl StatementExecutor {
Self {
catalog_manager,
query_engine,
ddl_executor: ddl_task_executor,
procedure_executor,
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend.clone())),
partition_manager: Arc::new(PartitionRuleManager::new(kv_backend)),
cache_invalidator,

View File

@@ -404,7 +404,7 @@ impl StatementExecutor {
task: DdlTask::new_alter_table(expr.clone()),
};
self.ddl_executor
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), req)
.await
.context(error::ExecuteDdlSnafu)?;
@@ -438,7 +438,7 @@ impl StatementExecutor {
task: DdlTask::new_create_table(create_table, partitions, table_info),
};
self.ddl_executor
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
@@ -452,7 +452,7 @@ impl StatementExecutor {
task: DdlTask::new_create_logical_tables(tables_data),
};
self.ddl_executor
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
@@ -474,7 +474,7 @@ impl StatementExecutor {
),
};
self.ddl_executor
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)
@@ -494,7 +494,7 @@ impl StatementExecutor {
),
};
self.ddl_executor
self.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), request)
.await
.context(error::ExecuteDdlSnafu)

View File

@@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::time::Duration;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_function::handlers::{AffectedRows, TableMutationHandler};
@@ -95,15 +93,4 @@ impl TableMutationHandler for TableMutationOperator {
.map_err(BoxedError::new)
.context(query_error::TableMutationSnafu)
}
async fn migrate_region(
&self,
_region_id: u64,
_from_peer: u64,
_to_peer: u64,
_replay_timeout: Duration,
) -> QueryResult<String> {
// FIXME(dennis): implemented in the following PR.
todo!();
}
}

View File

@@ -74,23 +74,20 @@ impl PartitionRuleManager {
Ok(route.region_routes)
}
pub async fn find_region_routes_batch(
pub async fn batch_find_region_routes(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, Vec<RegionRoute>>> {
let table_routes = self
.table_route_manager
.batch_get(table_ids)
.batch_get_physical_table_routes(table_ids)
.await
.context(error::TableRouteManagerSnafu)?;
let mut table_region_routes = HashMap::with_capacity(table_routes.len());
for (table_id, table_route) in table_routes {
let region_routes = table_route
.region_routes()
.context(error::TableRouteManagerSnafu)?
.clone();
let region_routes = table_route.region_routes;
table_region_routes.insert(table_id, region_routes);
}
@@ -104,40 +101,25 @@ impl PartitionRuleManager {
error::FindTableRoutesSnafu { table_id }
);
let mut partitions = Vec::with_capacity(region_routes.len());
for r in region_routes {
let partition = r
.region
.partition
.clone()
.context(error::FindRegionRoutesSnafu {
region_id: r.region.id,
table_id,
})?;
let partition_def = PartitionDef::try_from(partition)?;
create_partitions_from_region_routes(table_id, region_routes)
}
partitions.push(PartitionInfo {
id: r.region.id,
partition: partition_def,
});
}
partitions.sort_by(|a, b| {
a.partition
.partition_bounds()
.cmp(b.partition.partition_bounds())
});
pub async fn batch_find_table_partitions(
&self,
table_ids: &[TableId],
) -> Result<HashMap<TableId, Vec<PartitionInfo>>> {
let batch_region_routes = self.batch_find_region_routes(table_ids).await?;
ensure!(
partitions
.windows(2)
.all(|w| w[0].partition.partition_columns() == w[1].partition.partition_columns()),
error::InvalidTableRouteDataSnafu {
let mut results = HashMap::with_capacity(table_ids.len());
for (table_id, region_routes) in batch_region_routes {
results.insert(
table_id,
err_msg: "partition columns of all regions are not the same"
}
);
create_partitions_from_region_routes(table_id, region_routes)?,
);
}
Ok(partitions)
Ok(results)
}
/// Get partition rule of given table.
@@ -237,6 +219,46 @@ impl PartitionRuleManager {
}
}
fn create_partitions_from_region_routes(
table_id: TableId,
region_routes: Vec<RegionRoute>,
) -> Result<Vec<PartitionInfo>> {
let mut partitions = Vec::with_capacity(region_routes.len());
for r in region_routes {
let partition = r
.region
.partition
.clone()
.context(error::FindRegionRoutesSnafu {
region_id: r.region.id,
table_id,
})?;
let partition_def = PartitionDef::try_from(partition)?;
partitions.push(PartitionInfo {
id: r.region.id,
partition: partition_def,
});
}
partitions.sort_by(|a, b| {
a.partition
.partition_bounds()
.cmp(b.partition.partition_bounds())
});
ensure!(
partitions
.windows(2)
.all(|w| w[0].partition.partition_columns() == w[1].partition.partition_columns()),
error::InvalidTableRouteDataSnafu {
table_id,
err_msg: "partition columns of all regions are not the same"
}
);
Ok(partitions)
}
fn find_regions0(partition_rule: PartitionRuleRef, filter: &Expr) -> Result<HashSet<RegionNumber>> {
let expr = filter.df_expr();
match expr {

View File

@@ -564,7 +564,7 @@ mod tests {
};
catalog_manager.register_table_sync(req).unwrap();
QueryEngineFactory::new(catalog_manager, None, None, false).query_engine()
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine()
}
#[tokio::test]

View File

@@ -24,7 +24,7 @@ use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::function::FunctionRef;
use common_function::function_registry::FUNCTION_REGISTRY;
use common_function::handlers::TableMutationHandlerRef;
use common_function::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::prelude::ScalarUdf;
use common_query::Output;
@@ -101,12 +101,14 @@ impl QueryEngineFactory {
catalog_manager: CatalogManagerRef,
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
with_dist_planner: bool,
) -> Self {
Self::new_with_plugins(
catalog_manager,
region_query_handler,
table_mutation_handler,
procedure_service_handler,
with_dist_planner,
Default::default(),
)
@@ -116,6 +118,7 @@ impl QueryEngineFactory {
catalog_manager: CatalogManagerRef,
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
with_dist_planner: bool,
plugins: Plugins,
) -> Self {
@@ -123,6 +126,7 @@ impl QueryEngineFactory {
catalog_manager,
region_query_handler,
table_mutation_handler,
procedure_service_handler,
with_dist_planner,
plugins.clone(),
));
@@ -156,7 +160,7 @@ mod tests {
#[test]
fn test_query_engine_factory() {
let catalog_list = catalog::memory::new_memory_catalog_manager().unwrap();
let factory = QueryEngineFactory::new(catalog_list, None, None, false);
let factory = QueryEngineFactory::new(catalog_list, None, None, None, false);
let engine = factory.query_engine();

View File

@@ -70,6 +70,7 @@ impl QueryEngineContext {
catalog::memory::new_memory_catalog_manager().unwrap(),
None,
None,
None,
false,
Plugins::default(),
));

View File

@@ -20,7 +20,7 @@ use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_function::function::FunctionRef;
use common_function::handlers::{MetaServiceHandlerRef, TableMutationHandlerRef};
use common_function::handlers::{ProcedureServiceHandlerRef, TableMutationHandlerRef};
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::state::FunctionState;
use common_query::physical_plan::SessionContext;
@@ -80,6 +80,7 @@ impl QueryEngineState {
catalog_list: CatalogManagerRef,
region_query_handler: Option<RegionQueryHandlerRef>,
table_mutation_handler: Option<TableMutationHandlerRef>,
procedure_service_handler: Option<ProcedureServiceHandlerRef>,
with_dist_planner: bool,
plugins: Plugins,
) -> Self {
@@ -120,8 +121,7 @@ impl QueryEngineState {
catalog_manager: catalog_list,
function_state: Arc::new(FunctionState {
table_mutation_handler,
// FIXME(dennis): implemented in the following PR.
meta_service_handler: None,
procedure_service_handler,
}),
aggregate_functions: Arc::new(RwLock::new(HashMap::new())),
extension_rules,
@@ -219,9 +219,9 @@ impl QueryEngineState {
self.function_state.table_mutation_handler.as_ref()
}
/// Returns the [`MetaServiceHandlerRef`] in state.
pub fn meta_service_handler(&self) -> Option<&MetaServiceHandlerRef> {
self.function_state.meta_service_handler.as_ref()
/// Returns the [`ProcedureServiceHandlerRef`] in state.
pub fn procedure_service_handler(&self) -> Option<&ProcedureServiceHandlerRef> {
self.function_state.procedure_service_handler.as_ref()
}
pub(crate) fn disallow_cross_catalog_query(&self) -> bool {

View File

@@ -533,7 +533,7 @@ mod test {
table,
})
.is_ok());
QueryEngineFactory::new(catalog_list, None, None, false).query_engine()
QueryEngineFactory::new(catalog_list, None, None, None, false).query_engine()
}
async fn do_query(sql: &str) -> Result<crate::plan::LogicalPlan> {

View File

@@ -52,5 +52,5 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
QueryEngineFactory::new(catalog_manager, None, None, false).query_engine()
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine()
}

View File

@@ -47,7 +47,7 @@ async fn test_datafusion_query_engine() -> Result<()> {
let catalog_list = catalog::memory::new_memory_catalog_manager()
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let factory = QueryEngineFactory::new(catalog_list, None, None, false);
let factory = QueryEngineFactory::new(catalog_list, None, None, None, false);
let engine = factory.query_engine();
let column_schemas = vec![ColumnSchema::new(
@@ -128,7 +128,8 @@ async fn test_query_validate() -> Result<()> {
disallow_cross_catalog_query: true,
});
let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, None, false, plugins);
let factory =
QueryEngineFactory::new_with_plugins(catalog_list, None, None, None, false, plugins);
let engine = factory.query_engine();
let stmt =
@@ -158,7 +159,7 @@ async fn test_udf() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog_manager()?;
let factory = QueryEngineFactory::new(catalog_list, None, None, false);
let factory = QueryEngineFactory::new(catalog_list, None, None, None, false);
let engine = factory.query_engine();
let pow = make_scalar_function(pow);

View File

@@ -106,7 +106,7 @@ fn create_test_engine() -> TimeRangeTester {
};
let _ = catalog_manager.register_table_sync(req).unwrap();
let engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine();
let engine = QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine();
TimeRangeTester { engine, filter }
}

View File

@@ -52,7 +52,8 @@ where
pub(crate) fn sample_script_engine() -> PyEngine {
let catalog_manager =
MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID));
let query_engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine();
let query_engine =
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine();
PyEngine::new(query_engine.clone())
}

View File

@@ -385,7 +385,7 @@ mod tests {
let catalog_manager =
MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID));
let query_engine =
QueryEngineFactory::new(catalog_manager, None, None, false).query_engine();
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine();
PyEngine::new(query_engine.clone())
}

View File

@@ -56,7 +56,7 @@ pub async fn setup_scripts_manager(
let catalog_manager = MemoryCatalogManager::new_with_table(table.clone());
let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, false);
let factory = QueryEngineFactory::new(catalog_manager.clone(), None, None, None, false);
let query_engine = factory.query_engine();
let mgr = ScriptManager::new(Arc::new(MockGrpcQueryHandler {}) as _, query_engine)
.await

View File

@@ -214,7 +214,8 @@ impl GrpcQueryHandler for DummyInstance {
fn create_testing_instance(table: TableRef) -> DummyInstance {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
let query_engine = QueryEngineFactory::new(catalog_manager, None, None, false).query_engine();
let query_engine =
QueryEngineFactory::new(catalog_manager, None, None, None, false).query_engine();
DummyInstance::new(query_engine)
}

View File

@@ -345,7 +345,7 @@ impl GreptimeDbClusterBuilder {
.enable_store()
.enable_heartbeat()
.channel_manager(meta_srv.channel_manager)
.enable_ddl()
.enable_procedure()
.build();
meta_client.start(&[&meta_srv.server_addr]).await.unwrap();
let meta_client = Arc::new(meta_client);

View File

@@ -19,11 +19,15 @@ use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::{RegionDistribution, TableMetadataManagerRef};
use common_meta::peer::Peer;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::kafka::{DatanodeKafkaConfig, MetaSrvKafkaConfig};
use common_wal::config::{DatanodeWalConfig, MetaSrvWalConfig};
use datatypes::prelude::ScalarVector;
use datatypes::value::Value;
use datatypes::vectors::{Helper, UInt64Vector};
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
use futures::future::BoxFuture;
@@ -76,6 +80,7 @@ macro_rules! region_migration_tests {
$service,
test_region_migration,
test_region_migration_by_sql,
test_region_migration_multiple_regions,
test_region_migration_all_regions,
test_region_migration_incorrect_from_peer,
@@ -212,6 +217,125 @@ pub async fn test_region_migration(store_type: StorageType, endpoints: Vec<Strin
assert!(procedure.is_none());
}
/// A naive region migration test by SQL function
pub async fn test_region_migration_by_sql(store_type: StorageType, endpoints: Vec<String>) {
let cluster_name = "test_region_migration";
let peer_factory = |id| Peer {
id,
addr: PEER_PLACEHOLDER_ADDR.to_string(),
};
// Prepares test cluster.
let (store_config, _guard) = get_test_store_config(&store_type);
let home_dir = create_temp_dir("test_migration_data_home");
let datanodes = 5u64;
let builder = GreptimeDbClusterBuilder::new(cluster_name).await;
let const_selector = Arc::new(ConstNodeSelector::new(vec![
peer_factory(1),
peer_factory(2),
peer_factory(3),
]));
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_wal_config(DatanodeWalConfig::Kafka(DatanodeKafkaConfig {
broker_endpoints: endpoints.clone(),
linger: Duration::from_millis(25),
..Default::default()
}))
.with_meta_wal_config(MetaSrvWalConfig::Kafka(MetaSrvKafkaConfig {
broker_endpoints: endpoints,
num_topics: 3,
topic_name_prefix: Uuid::new_v4().to_string(),
..Default::default()
}))
.with_shared_home_dir(Arc::new(home_dir))
.with_meta_selector(const_selector.clone())
.build()
.await;
let mut logical_timer = 1685508715000;
// Prepares test table.
let table_id = prepare_testing_table(&cluster).await;
// Inserts data
let results = insert_values(&cluster.frontend, logical_timer).await;
logical_timer += 1000;
for result in results {
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
}
// The region distribution
let mut distribution = find_region_distribution_by_sql(&cluster).await;
let old_distribution = distribution.clone();
// Selecting target of region migration.
let region_migration_manager = cluster.meta_srv.region_migration_manager();
let (from_peer_id, from_regions) = distribution.pop_first().unwrap();
info!(
"Selecting from peer: {from_peer_id}, and regions: {:?}",
from_regions
);
let (to_peer_id, to_regions) = distribution.pop_first().unwrap();
info!(
"Selecting to peer: {to_peer_id}, and regions: {:?}",
to_regions
);
let region_id = RegionId::new(table_id, from_regions[0]);
// Trigger region migration.
let procedure_id =
trigger_migration_by_sql(&cluster, region_id.as_u64(), from_peer_id, to_peer_id).await;
info!("Started region procedure: {}!", procedure_id);
// Waits condition by checking procedure state
let frontend = cluster.frontend.clone();
wait_condition(
Duration::from_secs(10),
Box::pin(async move {
loop {
let state = query_procedure_by_sql(&frontend, &procedure_id).await;
if state == "{\"status\":\"Done\"}" {
info!("Migration done: {state}");
break;
} else {
info!("Migration not finished: {state}");
tokio::time::sleep(Duration::from_millis(200)).await;
}
}
}),
)
.await;
// Inserts more table.
let results = insert_values(&cluster.frontend, logical_timer).await;
for result in results {
assert!(matches!(result.unwrap(), Output::AffectedRows(1)));
}
// Asserts the writes.
assert_values(&cluster.frontend).await;
// Triggers again.
let procedure = region_migration_manager
.submit_procedure(RegionMigrationProcedureTask::new(
0,
region_id,
peer_factory(from_peer_id),
peer_factory(to_peer_id),
Duration::from_millis(1000),
))
.await
.unwrap();
assert!(procedure.is_none());
let new_distribution = find_region_distribution_by_sql(&cluster).await;
assert_ne!(old_distribution, new_distribution);
}
/// A region migration test for a region server contains multiple regions of the table.
pub async fn test_region_migration_multiple_regions(
store_type: StorageType,
@@ -724,12 +848,103 @@ async fn find_region_distribution(
.unwrap()
}
/// Find region distribution by SQL query
async fn find_region_distribution_by_sql(cluster: &GreptimeDbCluster) -> RegionDistribution {
let query_ctx = QueryContext::arc();
let Output::Stream(stream, _) = run_sql(
&cluster.frontend,
&format!(r#"select b.peer_id as datanode_id,
a.greptime_partition_id as region_id
from information_schema.partitions a left join information_schema.greptime_region_peers b
on a.greptime_partition_id = b.region_id
where a.table_name='{TEST_TABLE_NAME}' order by datanode_id asc"#
),
query_ctx.clone(),
)
.await.unwrap() else {
unreachable!();
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let mut distribution = RegionDistribution::new();
for batch in recordbatches.take() {
let datanode_ids: &UInt64Vector =
unsafe { Helper::static_cast(batch.column_by_name("datanode_id").unwrap()) };
let region_ids: &UInt64Vector =
unsafe { Helper::static_cast(batch.column_by_name("region_id").unwrap()) };
for (datanode_id, region_id) in datanode_ids.iter_data().zip(region_ids.iter_data()) {
let (Some(datanode_id), Some(region_id)) = (datanode_id, region_id) else {
unreachable!();
};
let region_id = RegionId::from_u64(region_id);
distribution
.entry(datanode_id)
.or_default()
.push(region_id.region_number());
}
}
distribution
}
/// Trigger the region migration by SQL, returns the procedure id if success.
async fn trigger_migration_by_sql(
cluster: &GreptimeDbCluster,
region_id: u64,
from_peer_id: u64,
to_peer_id: u64,
) -> String {
let Output::Stream(stream, _) = run_sql(
&cluster.frontend,
&format!("select migrate_region({region_id}, {from_peer_id}, {to_peer_id})"),
QueryContext::arc(),
)
.await
.unwrap() else {
unreachable!();
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let Value::String(procedure_id) = recordbatches.take()[0].column(0).get(0) else {
unreachable!();
};
procedure_id.as_utf8().to_string()
}
/// Query procedure state by SQL.
async fn query_procedure_by_sql(instance: &Arc<Instance>, pid: &str) -> String {
let Output::Stream(stream, _) = run_sql(
instance,
&format!("select procedure_state('{pid}')"),
QueryContext::arc(),
)
.await
.unwrap() else {
unreachable!();
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let Value::String(state) = recordbatches.take()[0].column(0).get(0) else {
unreachable!();
};
state.as_utf8().to_string()
}
async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<Output>> {
let query_ctx = QueryContext::arc();
let mut results = Vec::new();
for range in [5, 15, 55] {
let result = insert_value(
let result = run_sql(
instance,
&format!("INSERT INTO {TEST_TABLE_NAME} VALUES ({},{})", range, ts),
query_ctx.clone(),
@@ -741,10 +956,11 @@ async fn insert_values(instance: &Arc<Instance>, ts: u64) -> Vec<FrontendResult<
results
}
async fn insert_value(
async fn run_sql(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> FrontendResult<Output> {
info!("Run SQL: {sql}");
instance.do_query(sql, query_ctx).await.remove(0)
}