feat: flush or compact table and region functions (#3363)

* feat: adds Requester to process table flush and compaction request

* feat: admin_fn macros for administration functions

* test: add query result

* feat: impl flush_region, flush_table, compact_region, and flush_region functions

* docs: add Arguments to admin_fn macro

* chore: apply suggestion

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: apply suggestion

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: group_requests_by_peer and adds log

* Update src/common/macro/src/admin_fn.rs

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* feat: adds todo for spawan thread

* feat: rebase with main

---------

Co-authored-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
dennis zhuang
2024-02-27 16:57:38 +08:00
committed by GitHub
parent dbb1ce1a9b
commit 4b36c285f1
44 changed files with 1590 additions and 389 deletions

View File

@@ -12,6 +12,7 @@ api.workspace = true
arc-swap = "1.0"
async-trait.workspace = true
chrono-tz = "0.6"
common-base.workspace = true
common-catalog.workspace = true
common-error.workspace = true
common-macro.workspace = true
@@ -33,6 +34,7 @@ serde_json.workspace = true
session.workspace = true
snafu.workspace = true
statrs = "0.16"
store-api.workspace = true
table.workspace = true
[dev-dependencies]

View File

@@ -15,12 +15,12 @@
use std::sync::Arc;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use table::requests::{DeleteRequest, InsertRequest};
pub type AffectedRows = usize;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
/// A trait for handling table mutations in `QueryEngine`.
#[async_trait]
@@ -30,6 +30,28 @@ pub trait TableMutationHandler: Send + Sync {
/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
/// Trigger a flush task for table.
async fn flush(&self, request: FlushTableRequest, ctx: QueryContextRef)
-> Result<AffectedRows>;
/// Trigger a compaction task for table.
async fn compact(
&self,
request: CompactTableRequest,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
/// Trigger a flush task for a table region.
async fn flush_region(&self, region_id: RegionId, ctx: QueryContextRef)
-> Result<AffectedRows>;
/// Trigger a compaction task for a table region.
async fn compact_region(
&self,
region_id: RegionId,
ctx: QueryContextRef,
) -> Result<AffectedRows>;
}
/// A trait for handling procedure service requests in `QueryEngine`.

View File

@@ -12,8 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_query::error::{InvalidInputTypeSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use datatypes::prelude::ConcreteDataType;
use datatypes::types::cast::cast;
use datatypes::value::ValueRef;
use snafu::ResultExt;
/// Create a function signature with oneof signatures of interleaving two arguments.
pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>) -> Signature {
@@ -27,3 +31,15 @@ pub fn one_of_sigs2(args1: Vec<ConcreteDataType>, args2: Vec<ConcreteDataType>)
Signature::one_of(sigs, Volatility::Immutable)
}
/// Cast a [`ValueRef`] to u64, returns `None` if fails
pub fn cast_u64(value: &ValueRef) -> Result<Option<u64>> {
cast((*value).into(), &ConcreteDataType::uint64_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint64, actual type: {:#?}",
value.data_type(),
),
})
.map(|v| v.as_u64())
}

View File

@@ -32,11 +32,19 @@ impl FunctionState {
use api::v1::meta::ProcedureStatus;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{
CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest,
};
use crate::handlers::ProcedureServiceHandler;
use crate::handlers::{ProcedureServiceHandler, TableMutationHandler};
struct MockProcedureServiceHandler;
struct MockTableMutationHandler;
const ROWS: usize = 42;
#[async_trait]
impl ProcedureServiceHandler for MockProcedureServiceHandler {
@@ -56,8 +64,59 @@ impl FunctionState {
}
}
#[async_trait]
impl TableMutationHandler for MockTableMutationHandler {
async fn insert(
&self,
_request: InsertRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn delete(
&self,
_request: DeleteRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn flush(
&self,
_request: FlushTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn compact(
&self,
_request: CompactTableRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn flush_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
async fn compact_region(
&self,
_region_id: RegionId,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
}
}
Self {
table_mutation_handler: None,
table_mutation_handler: Some(Arc::new(MockTableMutationHandler)),
procedure_service_handler: Some(Arc::new(MockProcedureServiceHandler)),
}
}

View File

@@ -13,9 +13,9 @@
// limitations under the License.
use std::fmt;
use std::sync::Arc;
use api::v1::meta::ProcedureStatus;
use common_macro::admin_fn;
use common_meta::rpc::procedure::ProcedureStateResponse;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
@@ -25,24 +25,14 @@ use common_query::error::{
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::{ConstantVector, Helper, StringVector, VectorRef};
use datatypes::vectors::VectorRef;
use serde::Serialize;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
const NAME: &str = "procedure_state";
/// A function to query procedure state by its id.
/// Such as `procedure_state(pid)`.
#[derive(Clone, Debug, Default)]
pub struct ProcedureStateFunction;
impl fmt::Display for ProcedureStateFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "PROCEDURE_STATE")
}
}
use crate::handlers::ProcedureServiceHandlerRef;
#[derive(Serialize)]
struct ProcedureStateJson {
@@ -51,105 +41,58 @@ struct ProcedureStateJson {
error: Option<String>,
}
impl Function for ProcedureStateFunction {
fn name(&self) -> &str {
NAME
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
crate::ensure_greptime!(func_ctx);
ensure!(
columns.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
columns.len()
),
}
);
let pids = columns[0].clone();
let expect_len = pids.len();
let is_const = pids.is_const();
match pids.data_type() {
ConcreteDataType::String(_) => {
// TODO(dennis): datafusion UDF doesn't support async function currently
std::thread::spawn(move || {
let pids: &StringVector = if is_const {
let pids: &ConstantVector = unsafe { Helper::static_cast(&pids) };
unsafe { Helper::static_cast(pids.inner()) }
} else {
unsafe { Helper::static_cast(&pids) }
};
let procedure_service_handler = func_ctx
.state
.procedure_service_handler
.as_ref()
.context(MissingProcedureServiceHandlerSnafu)?;
let states = pids
.iter_data()
.map(|pid| {
if let Some(pid) = pid {
let ProcedureStateResponse { status, error, .. } =
common_runtime::block_on_read(async move {
procedure_service_handler.query_procedure_state(pid).await
})?;
let status = ProcedureStatus::try_from(status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown");
let state = ProcedureStateJson {
status: status.to_string(),
error: if error.is_empty() { None } else { Some(error) },
};
Ok(Some(serde_json::to_string(&state).unwrap_or_default()))
} else {
Ok(None)
}
})
.collect::<Result<Vec<_>>>()?;
let results: VectorRef = Arc::new(StringVector::from(states));
if is_const {
Ok(Arc::new(ConstantVector::new(results, expect_len)) as _)
} else {
Ok(results)
}
})
.join()
.map_err(|e| {
error!(e; "Join thread error");
ThreadJoin {
location: Location::default(),
}
})?
}
_ => UnsupportedInputDataTypeSnafu {
function: NAME,
datatypes: columns.iter().map(|c| c.data_type()).collect::<Vec<_>>(),
}
.fail(),
/// A function to query procedure state by its id.
/// Such as `procedure_state(pid)`.
#[admin_fn(
name = "ProcedureStateFunction",
display_name = "procedure_state",
sig_fn = "signature",
ret = "string"
)]
pub(crate) async fn procedure_state(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
}
);
let ValueRef::String(pid) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: "procedure_state",
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let ProcedureStateResponse { status, error, .. } =
procedure_service_handler.query_procedure_state(pid).await?;
let status = ProcedureStatus::try_from(status)
.map(|v| v.as_str_name())
.unwrap_or("Unknown");
let state = ProcedureStateJson {
status: status.to_string(),
error: if error.is_empty() { None } else { Some(error) },
};
let json = serde_json::to_string(&state).unwrap_or_default();
Ok(Value::from(json))
}
fn signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
#[cfg(test)]

View File

@@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod flush_compact_region;
mod flush_compact_table;
mod migrate_region;
use std::sync::Arc;
use flush_compact_region::{CompactRegionFunction, FlushRegionFunction};
use flush_compact_table::{CompactTableFunction, FlushTableFunction};
use migrate_region::MigrateRegionFunction;
use crate::function_registry::FunctionRegistry;
@@ -27,5 +31,9 @@ impl TableFunction {
/// Register all table functions to [`FunctionRegistry`].
pub fn register(registry: &FunctionRegistry) {
registry.register(Arc::new(MigrateRegionFunction));
registry.register(Arc::new(FlushRegionFunction));
registry.register(Arc::new(CompactRegionFunction));
registry.register(Arc::new(FlushTableFunction));
registry.register(Arc::new(CompactTableFunction));
}
}

View File

@@ -0,0 +1,148 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{ensure, Location, OptionExt};
use store_api::storage::RegionId;
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
use crate::helper::cast_u64;
macro_rules! define_region_function {
($name: expr, $display_name_str: expr, $display_name: ident) => {
/// A function to $display_name
#[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")]
pub(crate) async fn $display_name(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
let Some(region_id) = cast_u64(&params[0])? else {
return UnsupportedInputDataTypeSnafu {
function: $display_name_str,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let affected_rows = table_mutation_handler
.$display_name(RegionId::from_u64(region_id), query_ctx.clone())
.await?;
Ok(Value::from(affected_rows as u64))
}
};
}
define_region_function!("FlushRegionFunction", "flush_region", flush_region);
define_region_function!("CompactRegionFunction", "compact_region", compact_region);
fn signature() -> Signature {
Signature::uniform(1, ConcreteDataType::numerics(), Volatility::Immutable)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::UInt64Vector;
use super::*;
macro_rules! define_region_function_test {
($name: ident, $func: ident) => {
paste::paste! {
#[test]
fn [<test_ $name _misc>]() {
let f = $func;
assert_eq!(stringify!($name), f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == ConcreteDataType::numerics()));
}
#[test]
fn [<test_ $name _missing_table_mutation>]() {
let f = $func;
let args = vec![99];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing TableMutationHandler, not expected",
result.to_string()
);
}
#[test]
fn [<test_ $name>]() {
let f = $func;
let args = vec![99];
let args = args
.into_iter()
.map(|arg| Arc::new(UInt64Vector::from_slice([arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
assert_eq!(expect, result);
}
}
};
}
define_region_function_test!(flush_region, FlushRegionFunction);
define_region_function_test!(compact_region, CompactRegionFunction);
}

View File

@@ -0,0 +1,178 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::fmt;
use common_error::ext::BoxedError;
use common_macro::admin_fn;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, MissingTableMutationHandlerSnafu, Result, TableMutationSnafu,
UnsupportedInputDataTypeSnafu,
};
use common_query::prelude::{Signature, Volatility};
use common_telemetry::error;
use datatypes::prelude::*;
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use session::table_name::table_name_to_full_name;
use snafu::{ensure, Location, OptionExt, ResultExt};
use table::requests::{CompactTableRequest, FlushTableRequest};
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::TableMutationHandlerRef;
macro_rules! define_table_function {
($name: expr, $display_name_str: expr, $display_name: ident, $func: ident, $request: ident) => {
/// A function to $func table, such as `$display_name(table_name)`.
#[admin_fn(name = $name, display_name = $display_name_str, sig_fn = "signature", ret = "uint64")]
pub(crate) async fn $display_name(
table_mutation_handler: &TableMutationHandlerRef,
query_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
ensure!(
params.len() == 1,
InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect 1, have: {}",
params.len()
),
}
);
let ValueRef::String(table_name) = params[0] else {
return UnsupportedInputDataTypeSnafu {
function: $display_name_str,
datatypes: params.iter().map(|v| v.data_type()).collect::<Vec<_>>(),
}
.fail();
};
let (catalog_name, schema_name, table_name) =
table_name_to_full_name(table_name, &query_ctx)
.map_err(BoxedError::new)
.context(TableMutationSnafu)?;
let affected_rows = table_mutation_handler
.$func(
$request {
catalog_name,
schema_name,
table_name,
},
query_ctx.clone(),
)
.await?;
Ok(Value::from(affected_rows as u64))
}
};
}
define_table_function!(
"FlushTableFunction",
"flush_table",
flush_table,
flush,
FlushTableRequest
);
define_table_function!(
"CompactTableFunction",
"compact_table",
compact_table,
compact,
CompactTableRequest
);
fn signature() -> Signature {
Signature::uniform(
1,
vec![ConcreteDataType::string_datatype()],
Volatility::Immutable,
)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_query::prelude::TypeSignature;
use datatypes::vectors::{StringVector, UInt64Vector};
use super::*;
macro_rules! define_table_function_test {
($name: ident, $func: ident) => {
paste::paste!{
#[test]
fn [<test_ $name _misc>]() {
let f = $func;
assert_eq!(stringify!($name), f.name());
assert_eq!(
ConcreteDataType::uint64_datatype(),
f.return_type(&[]).unwrap()
);
assert!(matches!(f.signature(),
Signature {
type_signature: TypeSignature::Uniform(1, valid_types),
volatility: Volatility::Immutable
} if valid_types == vec![ConcreteDataType::string_datatype()]));
}
#[test]
fn [<test_ $name _missing_table_mutation>]() {
let f = $func;
let args = vec!["test"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::default(), &args).unwrap_err();
assert_eq!(
"Missing TableMutationHandler, not expected",
result.to_string()
);
}
#[test]
fn [<test_ $name>]() {
let f = $func;
let args = vec!["test"];
let args = args
.into_iter()
.map(|arg| Arc::new(StringVector::from(vec![arg])) as _)
.collect::<Vec<_>>();
let result = f.eval(FunctionContext::mock(), &args).unwrap();
let expect: VectorRef = Arc::new(UInt64Vector::from_slice([42]));
assert_eq!(expect, result);
}
}
}
}
define_table_function_test!(flush_table, FlushTableFunction);
define_table_function_test!(compact_table, CompactTableFunction);
}

View File

@@ -15,19 +15,25 @@
use std::fmt::{self};
use std::time::Duration;
use common_macro::admin_fn;
use common_meta::rpc::procedure::MigrateRegionRequest;
use common_query::error::Error::ThreadJoin;
use common_query::error::{
InvalidFuncArgsSnafu, InvalidInputTypeSnafu, MissingProcedureServiceHandlerSnafu, Result,
};
use common_query::error::{InvalidFuncArgsSnafu, MissingProcedureServiceHandlerSnafu, Result};
use common_query::prelude::{Signature, TypeSignature, Volatility};
use common_telemetry::logging::error;
use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder};
use datatypes::value::Value;
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::{Location, OptionExt, ResultExt};
use datatypes::data_type::DataType;
use datatypes::prelude::ConcreteDataType;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::VectorRef;
use session::context::QueryContextRef;
use snafu::{Location, OptionExt};
use crate::ensure_greptime;
use crate::function::{Function, FunctionContext};
use crate::handlers::ProcedureServiceHandlerRef;
use crate::helper::cast_u64;
const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10;
/// A function to migrate a region from source peer to target peer.
/// Returns the submitted procedure id if success. Only available in cluster mode.
@@ -39,137 +45,82 @@ use crate::function::{Function, FunctionContext};
/// - `region_id`: the region id
/// - `from_peer`: the source peer id
/// - `to_peer`: the target peer id
#[derive(Clone, Debug, Default)]
pub struct MigrateRegionFunction;
#[admin_fn(
name = "MigrateRegionFunction",
display_name = "migrate_region",
sig_fn = "signature",
ret = "string"
)]
pub(crate) async fn migrate_region(
procedure_service_handler: &ProcedureServiceHandlerRef,
_ctx: &QueryContextRef,
params: &[ValueRef<'_>],
) -> Result<Value> {
let (region_id, from_peer, to_peer, replay_timeout) = match params.len() {
3 => {
let region_id = cast_u64(&params[0])?;
let from_peer = cast_u64(&params[1])?;
let to_peer = cast_u64(&params[2])?;
const NAME: &str = "migrate_region";
const DEFAULT_REPLAY_TIMEOUT_SECS: u64 = 10;
(
region_id,
from_peer,
to_peer,
Some(DEFAULT_REPLAY_TIMEOUT_SECS),
)
}
fn cast_u64_vector(vector: &VectorRef) -> Result<VectorRef> {
vector
.cast(&ConcreteDataType::uint64_datatype())
.context(InvalidInputTypeSnafu {
err_msg: format!(
"Failed to cast input into uint64, actual type: {:#?}",
vector.data_type(),
),
})
}
4 => {
let region_id = cast_u64(&params[0])?;
let from_peer = cast_u64(&params[1])?;
let to_peer = cast_u64(&params[2])?;
let replay_timeout = cast_u64(&params[3])?;
impl Function for MigrateRegionFunction {
fn name(&self) -> &str {
NAME
}
(region_id, from_peer, to_peer, replay_timeout)
}
fn return_type(&self, _input_types: &[ConcreteDataType]) -> Result<ConcreteDataType> {
Ok(ConcreteDataType::string_datatype())
}
fn signature(&self) -> Signature {
Signature::one_of(
vec![
// migrate_region(region_id, from_peer, to_peer)
TypeSignature::Uniform(3, ConcreteDataType::numerics()),
// migrate_region(region_id, from_peer, to_peer, timeout(secs))
TypeSignature::Uniform(4, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
fn eval(&self, func_ctx: FunctionContext, columns: &[VectorRef]) -> Result<VectorRef> {
crate::ensure_greptime!(func_ctx);
let (region_ids, from_peers, to_peers, replay_timeouts) = match columns.len() {
3 => {
let region_ids = cast_u64_vector(&columns[0])?;
let from_peers = cast_u64_vector(&columns[1])?;
let to_peers = cast_u64_vector(&columns[2])?;
(region_ids, from_peers, to_peers, None)
size => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 3 or 4, have: {}",
size
),
}
.fail();
}
};
4 => {
let region_ids = cast_u64_vector(&columns[0])?;
let from_peers = cast_u64_vector(&columns[1])?;
let to_peers = cast_u64_vector(&columns[2])?;
let replay_timeouts = cast_u64_vector(&columns[3])?;
match (region_id, from_peer, to_peer, replay_timeout) {
(Some(region_id), Some(from_peer), Some(to_peer), Some(replay_timeout)) => {
let pid = procedure_service_handler
.migrate_region(MigrateRegionRequest {
region_id,
from_peer,
to_peer,
replay_timeout: Duration::from_secs(replay_timeout),
})
.await?;
(region_ids, from_peers, to_peers, Some(replay_timeouts))
match pid {
Some(pid) => Ok(Value::from(pid)),
None => Ok(Value::Null),
}
}
size => {
return InvalidFuncArgsSnafu {
err_msg: format!(
"The length of the args is not correct, expect exactly 3 or 4, have: {}",
size
),
}
.fail();
}
};
// TODO(dennis): datafusion UDF doesn't support async function currently
std::thread::spawn(move || {
let len = region_ids.len();
let mut results = StringVectorBuilder::with_capacity(len);
let procedure_service_handler = func_ctx
.state
.procedure_service_handler
.as_ref()
.context(MissingProcedureServiceHandlerSnafu)?;
for index in 0..len {
let region_id = region_ids.get(index);
let from_peer = from_peers.get(index);
let to_peer = to_peers.get(index);
let replay_timeout = match &replay_timeouts {
Some(replay_timeouts) => replay_timeouts.get(index),
None => Value::UInt64(DEFAULT_REPLAY_TIMEOUT_SECS),
};
match (region_id, from_peer, to_peer, replay_timeout) {
(
Value::UInt64(region_id),
Value::UInt64(from_peer),
Value::UInt64(to_peer),
Value::UInt64(replay_timeout),
) => {
let pid = common_runtime::block_on_read(async move {
procedure_service_handler
.migrate_region(MigrateRegionRequest {
region_id,
from_peer,
to_peer,
replay_timeout: Duration::from_secs(replay_timeout),
})
.await
})?;
results.push(pid.as_deref())
}
_ => {
results.push(None);
}
}
}
Ok(results.to_vector())
})
.join()
.map_err(|e| {
error!(e; "Join thread error");
ThreadJoin {
location: Location::default(),
}
})?
_ => Ok(Value::Null),
}
}
impl fmt::Display for MigrateRegionFunction {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "MIGRATE_REGION")
}
fn signature() -> Signature {
Signature::one_of(
vec![
// migrate_region(region_id, from_peer, to_peer)
TypeSignature::Uniform(3, ConcreteDataType::numerics()),
// migrate_region(region_id, from_peer, to_peer, timeout(secs))
TypeSignature::Uniform(4, ConcreteDataType::numerics()),
],
Volatility::Immutable,
)
}
#[cfg(test)]