mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat: extend region leases in Metasrv (#1784)
* feat: extend region leases in Metasrv * fix: resolve PR comments
This commit is contained in:
4
Cargo.lock
generated
4
Cargo.lock
generated
@@ -1801,6 +1801,7 @@ name = "common-meta"
|
||||
version = "0.4.0"
|
||||
dependencies = [
|
||||
"api",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
@@ -4096,7 +4097,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
|
||||
[[package]]
|
||||
name = "greptime-proto"
|
||||
version = "0.1.0"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=aee86f4a68c59873961c9b99ee7ed6a4341bf773#aee86f4a68c59873961c9b99ee7ed6a4341bf773"
|
||||
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=5d5eb65bb985ff47b3a417fb2505e315e2f5c319#5d5eb65bb985ff47b3a417fb2505e315e2f5c319"
|
||||
dependencies = [
|
||||
"prost",
|
||||
"serde",
|
||||
@@ -5185,6 +5186,7 @@ dependencies = [
|
||||
"serde_json",
|
||||
"servers",
|
||||
"snafu",
|
||||
"store-api",
|
||||
"table",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
|
||||
@@ -72,7 +72,7 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
|
||||
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
|
||||
futures = "0.3"
|
||||
futures-util = "0.3"
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "aee86f4a68c59873961c9b99ee7ed6a4341bf773" }
|
||||
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "5d5eb65bb985ff47b3a417fb2505e315e2f5c319" }
|
||||
itertools = "0.10"
|
||||
parquet = "40.0"
|
||||
paste = "1.0"
|
||||
|
||||
@@ -91,7 +91,7 @@ pub fn build_table_regional_prefix(
|
||||
}
|
||||
|
||||
/// Table global info has only one key across all datanodes so it does not have `node_id` field.
|
||||
#[derive(Clone)]
|
||||
#[derive(Clone, Hash, Eq, PartialEq)]
|
||||
pub struct TableGlobalKey {
|
||||
pub catalog_name: String,
|
||||
pub schema_name: String,
|
||||
@@ -124,6 +124,14 @@ impl TableGlobalKey {
|
||||
table_name: captures[3].to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn to_raw_key(&self) -> Vec<u8> {
|
||||
self.to_string().into_bytes()
|
||||
}
|
||||
|
||||
pub fn try_from_raw_key(key: &[u8]) -> Result<Self, Error> {
|
||||
Self::parse(String::from_utf8_lossy(key))
|
||||
}
|
||||
}
|
||||
|
||||
/// Table global info contains necessary info for a datanode to create table regions, including
|
||||
@@ -141,6 +149,10 @@ impl TableGlobalValue {
|
||||
pub fn table_id(&self) -> TableId {
|
||||
self.table_info.ident.table_id
|
||||
}
|
||||
|
||||
pub fn engine(&self) -> &str {
|
||||
&self.table_info.meta.engine
|
||||
}
|
||||
}
|
||||
|
||||
/// Table regional info that varies between datanode, so it contains a `node_id` field.
|
||||
|
||||
@@ -29,9 +29,6 @@ mod manager;
|
||||
|
||||
#[cfg(feature = "testing")]
|
||||
pub mod mock;
|
||||
|
||||
// FIXME(LFC): Used in next PR.
|
||||
#[allow(dead_code)]
|
||||
pub mod region_alive_keeper;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
|
||||
@@ -16,10 +16,15 @@ use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_meta::error::InvalidProtoMsgSnafu;
|
||||
use common_meta::heartbeat::handler::{
|
||||
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
|
||||
};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::RegionIdent;
|
||||
use common_telemetry::{debug, error, info, warn};
|
||||
use snafu::ResultExt;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionNumber;
|
||||
use table::engine::manager::TableEngineManagerRef;
|
||||
use table::engine::{CloseTableResult, EngineContext, TableEngineRef};
|
||||
@@ -35,6 +40,12 @@ use crate::error::{Result, TableEngineNotFoundSnafu};
|
||||
pub struct RegionAliveKeepers {
|
||||
table_engine_manager: TableEngineManagerRef,
|
||||
keepers: Arc<Mutex<HashMap<TableIdent, Arc<RegionAliveKeeper>>>>,
|
||||
|
||||
/// The epoch when [RegionAliveKeepers] is created. It's used to get a monotonically non-decreasing
|
||||
/// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically
|
||||
/// non-decreasing). The heartbeat request will carry the duration since this epoch, and the
|
||||
/// duration acts like an "invariant point" for region's keep alive lease.
|
||||
epoch: Instant,
|
||||
}
|
||||
|
||||
impl RegionAliveKeepers {
|
||||
@@ -42,6 +53,7 @@ impl RegionAliveKeepers {
|
||||
Self {
|
||||
table_engine_manager,
|
||||
keepers: Arc::new(Mutex::new(HashMap::new())),
|
||||
epoch: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,6 +119,50 @@ impl RegionAliveKeepers {
|
||||
keeper.start(heartbeat_interval_millis).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn epoch(&self) -> Instant {
|
||||
self.epoch
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HeartbeatResponseHandler for RegionAliveKeepers {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
!ctx.response.region_leases.is_empty()
|
||||
}
|
||||
|
||||
async fn handle(
|
||||
&self,
|
||||
ctx: &mut HeartbeatResponseHandlerContext,
|
||||
) -> common_meta::error::Result<HandleControl> {
|
||||
let leases = ctx.response.region_leases.drain(..).collect::<Vec<_>>();
|
||||
for lease in leases {
|
||||
let table_ident: TableIdent = match lease
|
||||
.table_ident
|
||||
.context(InvalidProtoMsgSnafu {
|
||||
err_msg: "'table_ident' is missing in RegionLease",
|
||||
})
|
||||
.and_then(|x| x.try_into())
|
||||
{
|
||||
Ok(x) => x,
|
||||
Err(e) => {
|
||||
error!(e; "");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let Some(keeper) = self.keepers.lock().await.get(&table_ident).cloned() else {
|
||||
// Alive keeper could be affected by lagging msg, just warn and ignore.
|
||||
warn!("Alive keeper for table {table_ident} is not found!");
|
||||
continue;
|
||||
};
|
||||
|
||||
let start_instant = self.epoch + Duration::from_millis(lease.duration_since_epoch);
|
||||
let deadline = start_instant + Duration::from_secs(lease.lease_seconds);
|
||||
keeper.keep_lived(lease.regions, deadline).await;
|
||||
}
|
||||
Ok(HandleControl::Continue)
|
||||
}
|
||||
}
|
||||
|
||||
/// [RegionAliveKeeper] starts a countdown for each region in a table. When deadline is reached,
|
||||
@@ -309,8 +365,11 @@ impl CountdownTask {
|
||||
debug!("Reset deadline to region {region} of table {table_ident} to {deadline:?}");
|
||||
countdown.set(tokio::time::sleep_until(deadline));
|
||||
}
|
||||
// Else the countdown could be not started yet, or during startup protection.
|
||||
// Can be safely ignored.
|
||||
// Else the countdown could be either:
|
||||
// - not started yet;
|
||||
// - during startup protection;
|
||||
// - received a lagging heartbeat message.
|
||||
// All can be safely ignored.
|
||||
},
|
||||
None => {
|
||||
info!(
|
||||
@@ -367,6 +426,8 @@ mod test {
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{HeartbeatResponse, RegionLease};
|
||||
use common_meta::heartbeat::mailbox::HeartbeatMailbox;
|
||||
use datatypes::schema::RawSchema;
|
||||
use table::engine::manager::MemoryTableEngineManager;
|
||||
use table::engine::{TableEngine, TableReference};
|
||||
@@ -377,8 +438,7 @@ mod test {
|
||||
use super::*;
|
||||
use crate::remote::mock::MockTableEngine;
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_region_alive_keepers() {
|
||||
async fn prepare_keepers() -> (TableIdent, RegionAliveKeepers) {
|
||||
let table_engine = Arc::new(MockTableEngine::default());
|
||||
let table_engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine));
|
||||
let keepers = RegionAliveKeepers::new(table_engine_manager);
|
||||
@@ -410,13 +470,82 @@ mod test {
|
||||
table_options: TableOptions::default(),
|
||||
engine: "MockTableEngine".to_string(),
|
||||
}));
|
||||
|
||||
keepers
|
||||
.register_table(table_ident.clone(), table)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(keepers.keepers.lock().await.contains_key(&table_ident));
|
||||
|
||||
(table_ident, keepers)
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_handle_heartbeat_response() {
|
||||
let (table_ident, keepers) = prepare_keepers().await;
|
||||
|
||||
keepers.start(5000).await;
|
||||
let startup_protection_until = Instant::now() + Duration::from_secs(21);
|
||||
|
||||
let duration_since_epoch = (Instant::now() - keepers.epoch).as_millis() as _;
|
||||
let lease_seconds = 100;
|
||||
let response = HeartbeatResponse {
|
||||
region_leases: vec![RegionLease {
|
||||
table_ident: Some(table_ident.clone().into()),
|
||||
regions: vec![1, 3], // Not extending region 2's lease time.
|
||||
duration_since_epoch,
|
||||
lease_seconds,
|
||||
}],
|
||||
..Default::default()
|
||||
};
|
||||
let keep_alive_until = keepers.epoch
|
||||
+ Duration::from_millis(duration_since_epoch)
|
||||
+ Duration::from_secs(lease_seconds);
|
||||
|
||||
let (tx, _) = mpsc::channel(8);
|
||||
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
|
||||
let mut ctx = HeartbeatResponseHandlerContext::new(mailbox, response);
|
||||
|
||||
assert!(keepers.handle(&mut ctx).await.unwrap() == HandleControl::Continue);
|
||||
|
||||
// sleep to wait for background task spawned in `handle`
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
|
||||
async fn test(
|
||||
keeper: &Arc<RegionAliveKeeper>,
|
||||
region_number: RegionNumber,
|
||||
startup_protection_until: Instant,
|
||||
keep_alive_until: Instant,
|
||||
is_kept_live: bool,
|
||||
) {
|
||||
let handles = keeper.countdown_task_handles.lock().await;
|
||||
let deadline = deadline(&handles.get(®ion_number).unwrap().tx).await;
|
||||
if is_kept_live {
|
||||
assert!(deadline > startup_protection_until && deadline == keep_alive_until);
|
||||
} else {
|
||||
assert!(deadline <= startup_protection_until);
|
||||
}
|
||||
}
|
||||
|
||||
let keeper = &keepers
|
||||
.keepers
|
||||
.lock()
|
||||
.await
|
||||
.get(&table_ident)
|
||||
.cloned()
|
||||
.unwrap();
|
||||
|
||||
// Test region 1 and 3 is kept lived. Their deadlines are updated to desired instant.
|
||||
test(keeper, 1, startup_protection_until, keep_alive_until, true).await;
|
||||
test(keeper, 3, startup_protection_until, keep_alive_until, true).await;
|
||||
|
||||
// Test region 2 is not kept lived. It's deadline is not updated: still during startup protection period.
|
||||
test(keeper, 2, startup_protection_until, keep_alive_until, false).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_region_alive_keepers() {
|
||||
let (table_ident, keepers) = prepare_keepers().await;
|
||||
|
||||
keepers
|
||||
.register_region(&RegionIdent {
|
||||
cluster_id: 1,
|
||||
|
||||
@@ -6,6 +6,7 @@ license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
api = { path = "../../api" }
|
||||
async-trait.workspace = true
|
||||
common-catalog = { path = "../catalog" }
|
||||
common-error = { path = "../error" }
|
||||
common-runtime = { path = "../runtime" }
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::HeartbeatResponse;
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::error;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -57,14 +58,16 @@ impl HeartbeatResponseHandlerContext {
|
||||
/// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`].
|
||||
///
|
||||
/// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`].
|
||||
#[async_trait]
|
||||
pub trait HeartbeatResponseHandler: Send + Sync {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;
|
||||
|
||||
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
|
||||
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
|
||||
fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
|
||||
async fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
|
||||
}
|
||||
|
||||
pub struct HandlerGroupExecutor {
|
||||
@@ -77,14 +80,15 @@ impl HandlerGroupExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
|
||||
fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
|
||||
async fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
|
||||
for handler in &self.handlers {
|
||||
if !handler.is_acceptable(&ctx) {
|
||||
continue;
|
||||
}
|
||||
|
||||
match handler.handle(&mut ctx) {
|
||||
match handler.handle(&mut ctx).await {
|
||||
Ok(HandleControl::Done) => break,
|
||||
Ok(HandleControl::Continue) => {}
|
||||
Err(e) => {
|
||||
|
||||
@@ -12,6 +12,8 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::heartbeat::handler::{
|
||||
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
|
||||
@@ -21,12 +23,13 @@ use crate::heartbeat::utils::mailbox_message_to_incoming_message;
|
||||
#[derive(Default)]
|
||||
pub struct ParseMailboxMessageHandler;
|
||||
|
||||
#[async_trait]
|
||||
impl HeartbeatResponseHandler for ParseMailboxMessageHandler {
|
||||
fn is_acceptable(&self, _ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
|
||||
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
|
||||
if let Some(message) = &ctx.response.mailbox_message {
|
||||
if message.payload.is_some() {
|
||||
// mailbox_message_to_incoming_message will raise an error if payload is none
|
||||
|
||||
@@ -14,7 +14,7 @@
|
||||
|
||||
use std::fmt::{Display, Formatter};
|
||||
|
||||
use api::v1::meta::TableIdent as RawTableIdent;
|
||||
use api::v1::meta::{TableIdent as RawTableIdent, TableName};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
|
||||
@@ -55,3 +55,17 @@ impl TryFrom<RawTableIdent> for TableIdent {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl From<TableIdent> for RawTableIdent {
|
||||
fn from(table_ident: TableIdent) -> Self {
|
||||
Self {
|
||||
table_id: table_ident.table_id,
|
||||
engine: table_ident.engine,
|
||||
table_name: Some(TableName {
|
||||
catalog_name: table_ident.catalog,
|
||||
schema_name: table_ident.schema,
|
||||
table_name: table_ident.table,
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, NodeStat, Peer};
|
||||
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
|
||||
use catalog::{datanode_stat, CatalogManagerRef};
|
||||
use common_meta::heartbeat::handler::{
|
||||
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
|
||||
@@ -42,6 +43,7 @@ pub struct HeartbeatTask {
|
||||
catalog_manager: CatalogManagerRef,
|
||||
interval: u64,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
region_alive_keepers: Arc<RegionAliveKeepers>,
|
||||
}
|
||||
|
||||
impl Drop for HeartbeatTask {
|
||||
@@ -59,6 +61,7 @@ impl HeartbeatTask {
|
||||
meta_client: Arc<MetaClient>,
|
||||
catalog_manager: CatalogManagerRef,
|
||||
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
region_alive_keepers: Arc<RegionAliveKeepers>,
|
||||
) -> Self {
|
||||
Self {
|
||||
node_id,
|
||||
@@ -69,6 +72,7 @@ impl HeartbeatTask {
|
||||
catalog_manager,
|
||||
interval: 5_000, // default interval is set to 5 secs
|
||||
resp_handler_executor,
|
||||
region_alive_keepers,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -94,7 +98,7 @@ impl HeartbeatTask {
|
||||
}
|
||||
|
||||
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
|
||||
if let Err(e) = Self::handle_response(ctx, handler_executor.clone()) {
|
||||
if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
|
||||
error!(e; "Error while handling heartbeat response");
|
||||
}
|
||||
if !running.load(Ordering::Acquire) {
|
||||
@@ -106,13 +110,14 @@ impl HeartbeatTask {
|
||||
Ok(tx)
|
||||
}
|
||||
|
||||
fn handle_response(
|
||||
async fn handle_response(
|
||||
ctx: HeartbeatResponseHandlerContext,
|
||||
handler_executor: HeartbeatResponseHandlerExecutorRef,
|
||||
) -> Result<()> {
|
||||
trace!("heartbeat response: {:?}", ctx.response);
|
||||
handler_executor
|
||||
.handle(ctx)
|
||||
.await
|
||||
.context(error::HandleHeartbeatResponseSnafu)
|
||||
}
|
||||
|
||||
@@ -131,8 +136,7 @@ impl HeartbeatTask {
|
||||
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
|
||||
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
|
||||
|
||||
// TODO(LFC): Continued in next PR.
|
||||
// self.region_alive_keepers.start(interval).await;
|
||||
self.region_alive_keepers.start(interval).await;
|
||||
|
||||
let meta_client = self.meta_client.clone();
|
||||
let catalog_manager_clone = self.catalog_manager.clone();
|
||||
@@ -150,6 +154,7 @@ impl HeartbeatTask {
|
||||
)
|
||||
.await?;
|
||||
|
||||
let epoch = self.region_alive_keepers.epoch();
|
||||
common_runtime::spawn_bg(async move {
|
||||
let sleep = tokio::time::sleep(Duration::from_millis(0));
|
||||
tokio::pin!(sleep);
|
||||
@@ -195,6 +200,7 @@ impl HeartbeatTask {
|
||||
..Default::default()
|
||||
}),
|
||||
region_stats,
|
||||
duration_since_epoch: (Instant::now() - epoch).as_millis() as u64,
|
||||
..Default::default()
|
||||
};
|
||||
sleep.as_mut().reset(Instant::now() + Duration::from_millis(interval));
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
|
||||
use catalog::{CatalogManagerRef, DeregisterTableRequest};
|
||||
use common_catalog::format_full_table_name;
|
||||
@@ -38,6 +39,7 @@ pub struct CloseRegionHandler {
|
||||
region_alive_keepers: Arc<RegionAliveKeepers>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HeartbeatResponseHandler for CloseRegionHandler {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
matches!(
|
||||
@@ -46,7 +48,7 @@ impl HeartbeatResponseHandler for CloseRegionHandler {
|
||||
)
|
||||
}
|
||||
|
||||
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
|
||||
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
|
||||
let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() else {
|
||||
unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'");
|
||||
};
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::error::Error as CatalogError;
|
||||
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
|
||||
use catalog::{CatalogManagerRef, RegisterTableRequest};
|
||||
@@ -39,6 +40,7 @@ pub struct OpenRegionHandler {
|
||||
region_alive_keepers: Arc<RegionAliveKeepers>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HeartbeatResponseHandler for OpenRegionHandler {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
matches!(
|
||||
@@ -47,7 +49,7 @@ impl HeartbeatResponseHandler for OpenRegionHandler {
|
||||
)
|
||||
}
|
||||
|
||||
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
|
||||
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
|
||||
let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() else {
|
||||
unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'");
|
||||
};
|
||||
|
||||
@@ -215,8 +215,9 @@ impl Instance {
|
||||
Arc::new(CloseRegionHandler::new(
|
||||
catalog_manager.clone(),
|
||||
engine_manager.clone(),
|
||||
region_alive_keepers,
|
||||
region_alive_keepers.clone(),
|
||||
)),
|
||||
region_alive_keepers.clone(),
|
||||
]);
|
||||
|
||||
let heartbeat_task = Some(HeartbeatTask::new(
|
||||
@@ -226,6 +227,7 @@ impl Instance {
|
||||
meta_client,
|
||||
catalog_manager.clone(),
|
||||
Arc::new(handlers_executor),
|
||||
region_alive_keepers,
|
||||
));
|
||||
|
||||
(catalog_manager as CatalogManagerRef, None, heartbeat_task)
|
||||
|
||||
@@ -75,7 +75,8 @@ async fn test_close_region_handler() {
|
||||
executor.clone(),
|
||||
mailbox.clone(),
|
||||
close_region_instruction(),
|
||||
);
|
||||
)
|
||||
.await;
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
reply,
|
||||
@@ -89,7 +90,8 @@ async fn test_close_region_handler() {
|
||||
executor.clone(),
|
||||
mailbox.clone(),
|
||||
close_region_instruction(),
|
||||
);
|
||||
)
|
||||
.await;
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
reply,
|
||||
@@ -112,7 +114,8 @@ async fn test_close_region_handler() {
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
}),
|
||||
);
|
||||
)
|
||||
.await;
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
reply,
|
||||
@@ -149,7 +152,7 @@ async fn test_open_region_handler() {
|
||||
prepare_table(instance.inner()).await;
|
||||
|
||||
// Opens a opened table
|
||||
handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction());
|
||||
handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()).await;
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
reply,
|
||||
@@ -172,7 +175,8 @@ async fn test_open_region_handler() {
|
||||
cluster_id: 1,
|
||||
datanode_id: 2,
|
||||
}),
|
||||
);
|
||||
)
|
||||
.await;
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
reply,
|
||||
@@ -184,7 +188,8 @@ async fn test_open_region_handler() {
|
||||
executor.clone(),
|
||||
mailbox.clone(),
|
||||
close_region_instruction(),
|
||||
);
|
||||
)
|
||||
.await;
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
reply,
|
||||
@@ -193,7 +198,7 @@ async fn test_open_region_handler() {
|
||||
assert_test_table_not_found(instance.inner()).await;
|
||||
|
||||
// Opens demo table
|
||||
handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction());
|
||||
handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()).await;
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
reply,
|
||||
@@ -228,7 +233,7 @@ pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> Messag
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_instruction(
|
||||
async fn handle_instruction(
|
||||
executor: Arc<dyn HeartbeatResponseHandlerExecutor>,
|
||||
mailbox: Arc<HeartbeatMailbox>,
|
||||
instruction: Instruction,
|
||||
@@ -237,7 +242,7 @@ fn handle_instruction(
|
||||
let mut ctx: HeartbeatResponseHandlerContext =
|
||||
HeartbeatResponseHandlerContext::new(mailbox, response);
|
||||
ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction));
|
||||
executor.handle(ctx).unwrap();
|
||||
executor.handle(ctx).await.unwrap();
|
||||
}
|
||||
|
||||
fn close_region_instruction() -> Instruction {
|
||||
|
||||
@@ -84,7 +84,7 @@ impl HeartbeatTask {
|
||||
Ok(Some(resp)) => {
|
||||
debug!("Receiving heartbeat response: {:?}", resp);
|
||||
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
|
||||
if let Err(e) = capture_self.handle_response(ctx) {
|
||||
if let Err(e) = capture_self.handle_response(ctx).await {
|
||||
error!(e; "Error while handling heartbeat response");
|
||||
}
|
||||
}
|
||||
@@ -153,9 +153,10 @@ impl HeartbeatTask {
|
||||
});
|
||||
}
|
||||
|
||||
fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
|
||||
async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
|
||||
self.resp_handler_executor
|
||||
.handle(ctx)
|
||||
.await
|
||||
.context(error::HandleHeartbeatResponseSnafu)
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::helper::TableGlobalKey;
|
||||
use catalog::remote::KvCacheInvalidatorRef;
|
||||
use common_meta::error::Result as MetaResult;
|
||||
@@ -30,6 +31,7 @@ pub struct InvalidateTableCacheHandler {
|
||||
table_route_cache_invalidator: TableRouteCacheInvalidatorRef,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
|
||||
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
|
||||
matches!(
|
||||
@@ -38,7 +40,7 @@ impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
|
||||
)
|
||||
}
|
||||
|
||||
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
|
||||
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
|
||||
// TODO(weny): considers introducing a macro
|
||||
let Some((meta, Instruction::InvalidateTableCache(table_ident))) = ctx.incoming_message.take() else {
|
||||
unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'");
|
||||
|
||||
@@ -90,7 +90,8 @@ async fn test_invalidate_table_cache_handler() {
|
||||
table_id: 0,
|
||||
engine: "mito".to_string(),
|
||||
}),
|
||||
);
|
||||
)
|
||||
.await;
|
||||
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
@@ -126,7 +127,8 @@ async fn test_invalidate_table_cache_handler() {
|
||||
table_id: 0,
|
||||
engine: "mito".to_string(),
|
||||
}),
|
||||
);
|
||||
)
|
||||
.await;
|
||||
|
||||
let (_, reply) = rx.recv().await.unwrap();
|
||||
assert_matches!(
|
||||
@@ -144,7 +146,7 @@ pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> Messag
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_instruction(
|
||||
async fn handle_instruction(
|
||||
executor: Arc<dyn HeartbeatResponseHandlerExecutor>,
|
||||
mailbox: Arc<HeartbeatMailbox>,
|
||||
instruction: Instruction,
|
||||
@@ -153,5 +155,5 @@ fn handle_instruction(
|
||||
let mut ctx: HeartbeatResponseHandlerContext =
|
||||
HeartbeatResponseHandlerContext::new(mailbox, response);
|
||||
ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction));
|
||||
executor.handle(ctx).unwrap();
|
||||
executor.handle(ctx).await.unwrap();
|
||||
}
|
||||
|
||||
@@ -38,6 +38,7 @@ regex = "1.6"
|
||||
serde = "1.0"
|
||||
serde_json = "1.0"
|
||||
snafu.workspace = true
|
||||
store-api = { path = "../store-api" }
|
||||
table = { path = "../table" }
|
||||
tokio.workspace = true
|
||||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
|
||||
@@ -354,6 +354,12 @@ pub enum Error {
|
||||
source: common_meta::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to convert proto data, source: {}", source))]
|
||||
ConvertProtoData {
|
||||
location: Location,
|
||||
source: common_meta::error::Error,
|
||||
},
|
||||
|
||||
// this error is used for custom error mapping
|
||||
// please do not delete it
|
||||
#[snafu(display("Other error, source: {}", source))]
|
||||
@@ -442,7 +448,9 @@ impl ErrorExt for Error {
|
||||
Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
Error::RegisterProcedureLoader { source, .. } => source.status_code(),
|
||||
Error::TableRouteConversion { source, .. } => source.status_code(),
|
||||
Error::TableRouteConversion { source, .. } | Error::ConvertProtoData { source, .. } => {
|
||||
source.status_code()
|
||||
}
|
||||
Error::Other { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,8 +19,8 @@ use std::time::Duration;
|
||||
|
||||
use api::v1::meta::mailbox_message::Payload;
|
||||
use api::v1::meta::{
|
||||
HeartbeatRequest, HeartbeatResponse, MailboxMessage, RequestHeader, ResponseHeader, Role,
|
||||
PROTOCOL_VERSION,
|
||||
HeartbeatRequest, HeartbeatResponse, MailboxMessage, RegionLease, RequestHeader,
|
||||
ResponseHeader, Role, PROTOCOL_VERSION,
|
||||
};
|
||||
pub use check_leader_handler::CheckLeaderHandler;
|
||||
pub use collect_stats_handler::CollectStatsHandler;
|
||||
@@ -54,6 +54,7 @@ pub mod mailbox_handler;
|
||||
pub mod node_stat;
|
||||
mod on_leader_start;
|
||||
mod persist_stats_handler;
|
||||
pub(crate) mod region_lease_handler;
|
||||
mod response_header_handler;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
@@ -73,6 +74,7 @@ pub struct HeartbeatAccumulator {
|
||||
pub header: Option<ResponseHeader>,
|
||||
pub instructions: Vec<Instruction>,
|
||||
pub stat: Option<Stat>,
|
||||
pub region_leases: Vec<RegionLease>,
|
||||
}
|
||||
|
||||
impl HeartbeatAccumulator {
|
||||
@@ -233,7 +235,7 @@ impl HeartbeatHandlerGroup {
|
||||
let header = std::mem::take(&mut acc.header);
|
||||
let res = HeartbeatResponse {
|
||||
header,
|
||||
mailbox_message: acc.into_mailbox_message(),
|
||||
region_leases: acc.region_leases,
|
||||
..Default::default()
|
||||
};
|
||||
Ok(res)
|
||||
|
||||
@@ -36,6 +36,7 @@ pub(crate) struct DatanodeHeartbeat {
|
||||
|
||||
pub struct RegionFailureHandler {
|
||||
failure_detect_runner: FailureDetectRunner,
|
||||
region_failover_manager: Arc<RegionFailoverManager>,
|
||||
}
|
||||
|
||||
impl RegionFailureHandler {
|
||||
@@ -45,13 +46,19 @@ impl RegionFailureHandler {
|
||||
) -> Result<Self> {
|
||||
region_failover_manager.try_start()?;
|
||||
|
||||
let mut failure_detect_runner = FailureDetectRunner::new(election, region_failover_manager);
|
||||
let mut failure_detect_runner =
|
||||
FailureDetectRunner::new(election, region_failover_manager.clone());
|
||||
failure_detect_runner.start().await;
|
||||
|
||||
Ok(Self {
|
||||
failure_detect_runner,
|
||||
region_failover_manager,
|
||||
})
|
||||
}
|
||||
|
||||
pub(crate) fn region_failover_manager(&self) -> &Arc<RegionFailoverManager> {
|
||||
&self.region_failover_manager
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
226
src/meta-srv/src/handler/region_lease_handler.rs
Normal file
226
src/meta-srv/src/handler/region_lease_handler.rs
Normal file
@@ -0,0 +1,226 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, RegionLease, Role};
|
||||
use async_trait::async_trait;
|
||||
use catalog::helper::TableGlobalKey;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::ClusterId;
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
|
||||
use crate::metasrv::Context;
|
||||
use crate::procedure::region_failover::{RegionFailoverKey, RegionFailoverManager};
|
||||
use crate::service::store::kv::KvStoreRef;
|
||||
use crate::table_routes;
|
||||
|
||||
/// The lease seconds of a region. It's set by two default heartbeat intervals (5 second × 2) plus
|
||||
/// two roundtrip time (2 second × 2 × 2), plus some extra buffer (2 second).
|
||||
// TODO(LFC): Make region lease seconds calculated from Datanode heartbeat configuration.
|
||||
pub(crate) const REGION_LEASE_SECONDS: u64 = 20;
|
||||
|
||||
pub(crate) struct RegionLeaseHandler {
|
||||
kv_store: KvStoreRef,
|
||||
region_failover_manager: Option<Arc<RegionFailoverManager>>,
|
||||
}
|
||||
|
||||
impl RegionLeaseHandler {
|
||||
pub(crate) fn new(
|
||||
kv_store: KvStoreRef,
|
||||
region_failover_manager: Option<Arc<RegionFailoverManager>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
kv_store,
|
||||
region_failover_manager,
|
||||
}
|
||||
}
|
||||
|
||||
/// Filter out the regions that are currently in failover.
|
||||
/// It's meaningless to extend the lease of a region if it is in failover.
|
||||
fn filter_failover_regions(
|
||||
&self,
|
||||
cluster_id: ClusterId,
|
||||
table_ident: &TableIdent,
|
||||
regions: Vec<RegionNumber>,
|
||||
) -> Vec<RegionNumber> {
|
||||
if let Some(region_failover_manager) = &self.region_failover_manager {
|
||||
let mut region_failover_key = RegionFailoverKey {
|
||||
cluster_id,
|
||||
table_ident: table_ident.clone(),
|
||||
region_number: 0,
|
||||
};
|
||||
|
||||
regions
|
||||
.into_iter()
|
||||
.filter(|region| {
|
||||
region_failover_key.region_number = *region;
|
||||
!region_failover_manager.is_region_failover_running(®ion_failover_key)
|
||||
})
|
||||
.collect()
|
||||
} else {
|
||||
regions
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl HeartbeatHandler for RegionLeaseHandler {
|
||||
fn is_acceptable(&self, role: Role) -> bool {
|
||||
role == Role::Datanode
|
||||
}
|
||||
|
||||
async fn handle(
|
||||
&self,
|
||||
req: &HeartbeatRequest,
|
||||
_: &mut Context,
|
||||
acc: &mut HeartbeatAccumulator,
|
||||
) -> Result<()> {
|
||||
let Some(stat) = acc.stat.as_ref() else { return Ok(()) };
|
||||
|
||||
let mut datanode_regions = HashMap::new();
|
||||
stat.region_stats.iter().for_each(|x| {
|
||||
let key = TableGlobalKey {
|
||||
catalog_name: x.catalog.to_string(),
|
||||
schema_name: x.schema.to_string(),
|
||||
table_name: x.table.to_string(),
|
||||
};
|
||||
datanode_regions
|
||||
.entry(key)
|
||||
.or_insert_with(Vec::new)
|
||||
.push(table::engine::region_number(x.id));
|
||||
});
|
||||
|
||||
// TODO(LFC): Retrieve table global values from some cache here.
|
||||
let table_global_values = table_routes::batch_get_table_global_value(
|
||||
&self.kv_store,
|
||||
datanode_regions.keys().collect::<Vec<_>>(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut region_leases = Vec::with_capacity(datanode_regions.len());
|
||||
for (table_global_key, local_regions) in datanode_regions {
|
||||
let Some(Some(table_global_value)) = table_global_values.get(&table_global_key) else { continue };
|
||||
|
||||
let Some(global_regions) = table_global_value.regions_id_map.get(&stat.id) else { continue };
|
||||
|
||||
// Filter out the designated regions from table global metadata for the given table on the given Datanode.
|
||||
let designated_regions = local_regions
|
||||
.into_iter()
|
||||
.filter(|x| global_regions.contains(x))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let table_ident = TableIdent {
|
||||
catalog: table_global_key.catalog_name.to_string(),
|
||||
schema: table_global_key.schema_name.to_string(),
|
||||
table: table_global_key.table_name.to_string(),
|
||||
table_id: table_global_value.table_id(),
|
||||
engine: table_global_value.engine().to_string(),
|
||||
};
|
||||
let designated_regions =
|
||||
self.filter_failover_regions(stat.cluster_id, &table_ident, designated_regions);
|
||||
|
||||
region_leases.push(RegionLease {
|
||||
table_ident: Some(table_ident.into()),
|
||||
regions: designated_regions,
|
||||
duration_since_epoch: req.duration_since_epoch,
|
||||
lease_seconds: REGION_LEASE_SECONDS,
|
||||
});
|
||||
}
|
||||
acc.region_leases = region_leases;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
|
||||
use super::*;
|
||||
use crate::handler::node_stat::{RegionStat, Stat};
|
||||
use crate::metasrv::builder::MetaSrvBuilder;
|
||||
use crate::test_util;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_handle_region_lease() {
|
||||
let region_failover_manager = test_util::create_region_failover_manager();
|
||||
let kv_store = region_failover_manager
|
||||
.create_context()
|
||||
.selector_ctx
|
||||
.kv_store
|
||||
.clone();
|
||||
|
||||
let table_name = "my_table";
|
||||
let _ = table_routes::tests::prepare_table_global_value(&kv_store, table_name).await;
|
||||
|
||||
let table_ident = TableIdent {
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: table_name.to_string(),
|
||||
table_id: 1,
|
||||
engine: "mito".to_string(),
|
||||
};
|
||||
region_failover_manager
|
||||
.running_procedures()
|
||||
.write()
|
||||
.unwrap()
|
||||
.insert(RegionFailoverKey {
|
||||
cluster_id: 1,
|
||||
table_ident: table_ident.clone(),
|
||||
region_number: 1,
|
||||
});
|
||||
|
||||
let handler = RegionLeaseHandler::new(kv_store, Some(region_failover_manager));
|
||||
|
||||
let req = HeartbeatRequest {
|
||||
duration_since_epoch: 1234,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let builder = MetaSrvBuilder::new();
|
||||
let metasrv = builder.build().await.unwrap();
|
||||
let ctx = &mut metasrv.new_ctx();
|
||||
|
||||
let acc = &mut HeartbeatAccumulator::default();
|
||||
let new_region_stat = |region_id: u64| -> RegionStat {
|
||||
RegionStat {
|
||||
id: region_id,
|
||||
catalog: DEFAULT_CATALOG_NAME.to_string(),
|
||||
schema: DEFAULT_SCHEMA_NAME.to_string(),
|
||||
table: table_name.to_string(),
|
||||
..Default::default()
|
||||
}
|
||||
};
|
||||
acc.stat = Some(Stat {
|
||||
cluster_id: 1,
|
||||
id: 1,
|
||||
region_stats: vec![new_region_stat(1), new_region_stat(2), new_region_stat(3)],
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
handler.handle(&req, ctx, acc).await.unwrap();
|
||||
|
||||
// region 1 is during failover and region 3 is not in table global value,
|
||||
// so only region 2's lease is extended.
|
||||
assert_eq!(acc.region_leases.len(), 1);
|
||||
let lease = acc.region_leases.remove(0);
|
||||
assert_eq!(lease.table_ident.unwrap(), table_ident.into());
|
||||
assert_eq!(lease.regions, vec![2]);
|
||||
assert_eq!(lease.duration_since_epoch, 1234);
|
||||
assert_eq!(lease.lease_seconds, REGION_LEASE_SECONDS);
|
||||
}
|
||||
}
|
||||
@@ -20,6 +20,7 @@ use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
use crate::cluster::MetaPeerClient;
|
||||
use crate::error::Result;
|
||||
use crate::handler::mailbox_handler::MailboxHandler;
|
||||
use crate::handler::region_lease_handler::RegionLeaseHandler;
|
||||
use crate::handler::{
|
||||
CheckLeaderHandler, CollectStatsHandler, HeartbeatHandlerGroup, HeartbeatMailbox,
|
||||
KeepLeaseHandler, OnLeaderStartHandler, PersistStatsHandler, Pushers, RegionFailureHandler,
|
||||
@@ -170,6 +171,13 @@ impl MetaSrvBuilder {
|
||||
)
|
||||
};
|
||||
|
||||
let region_lease_handler = RegionLeaseHandler::new(
|
||||
kv_store.clone(),
|
||||
region_failover_handler
|
||||
.as_ref()
|
||||
.map(|x| x.region_failover_manager().clone()),
|
||||
);
|
||||
|
||||
let group = HeartbeatHandlerGroup::new(pushers);
|
||||
let keep_lease_handler = KeepLeaseHandler::new(kv_store.clone());
|
||||
group.add_handler(ResponseHeaderHandler::default()).await;
|
||||
@@ -184,6 +192,7 @@ impl MetaSrvBuilder {
|
||||
if let Some(region_failover_handler) = region_failover_handler {
|
||||
group.add_handler(region_failover_handler).await;
|
||||
}
|
||||
group.add_handler(region_lease_handler).await;
|
||||
group.add_handler(PersistStatsHandler::default()).await;
|
||||
group
|
||||
}
|
||||
|
||||
@@ -21,12 +21,13 @@ mod update_metadata;
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::fmt::Debug;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use catalog::helper::TableGlobalKey;
|
||||
use common_meta::RegionIdent;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::{ClusterId, RegionIdent};
|
||||
use common_procedure::error::{
|
||||
Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
|
||||
};
|
||||
@@ -38,6 +39,7 @@ use common_telemetry::{error, info, warn};
|
||||
use failover_start::RegionFailoverStart;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionNumber;
|
||||
|
||||
use crate::error::{Error, RegisterProcedureLoaderSnafu, Result};
|
||||
use crate::lock::DistLockRef;
|
||||
@@ -48,26 +50,41 @@ use crate::service::store::ext::KvStoreExt;
|
||||
const OPEN_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
const CLOSE_REGION_MESSAGE_TIMEOUT: Duration = Duration::from_secs(2);
|
||||
|
||||
/// A key for the preventing running multiple failover procedures for the same region.
|
||||
#[derive(PartialEq, Eq, Hash, Clone)]
|
||||
pub(crate) struct RegionFailoverKey {
|
||||
pub(crate) cluster_id: ClusterId,
|
||||
pub(crate) table_ident: TableIdent,
|
||||
pub(crate) region_number: RegionNumber,
|
||||
}
|
||||
|
||||
impl From<RegionIdent> for RegionFailoverKey {
|
||||
fn from(region_ident: RegionIdent) -> Self {
|
||||
Self {
|
||||
cluster_id: region_ident.cluster_id,
|
||||
table_ident: region_ident.table_ident,
|
||||
region_number: region_ident.region_number,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct RegionFailoverManager {
|
||||
mailbox: MailboxRef,
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
selector: SelectorRef,
|
||||
selector_ctx: SelectorContext,
|
||||
dist_lock: DistLockRef,
|
||||
running_procedures: Arc<Mutex<HashSet<RegionIdent>>>,
|
||||
running_procedures: Arc<RwLock<HashSet<RegionFailoverKey>>>,
|
||||
}
|
||||
|
||||
struct FailoverProcedureGuard<'a> {
|
||||
running_procedures: Arc<Mutex<HashSet<RegionIdent>>>,
|
||||
failed_region: &'a RegionIdent,
|
||||
struct FailoverProcedureGuard {
|
||||
running_procedures: Arc<RwLock<HashSet<RegionFailoverKey>>>,
|
||||
key: RegionFailoverKey,
|
||||
}
|
||||
|
||||
impl Drop for FailoverProcedureGuard<'_> {
|
||||
impl Drop for FailoverProcedureGuard {
|
||||
fn drop(&mut self) {
|
||||
self.running_procedures
|
||||
.lock()
|
||||
.unwrap()
|
||||
.remove(self.failed_region);
|
||||
self.running_procedures.write().unwrap().remove(&self.key);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,11 +102,11 @@ impl RegionFailoverManager {
|
||||
selector,
|
||||
selector_ctx,
|
||||
dist_lock,
|
||||
running_procedures: Arc::new(Mutex::new(HashSet::new())),
|
||||
running_procedures: Arc::new(RwLock::new(HashSet::new())),
|
||||
}
|
||||
}
|
||||
|
||||
fn create_context(&self) -> RegionFailoverContext {
|
||||
pub(crate) fn create_context(&self) -> RegionFailoverContext {
|
||||
RegionFailoverContext {
|
||||
mailbox: self.mailbox.clone(),
|
||||
selector: self.selector.clone(),
|
||||
@@ -113,19 +130,36 @@ impl RegionFailoverManager {
|
||||
})
|
||||
}
|
||||
|
||||
fn insert_running_procedures(&self, failed_region: &RegionIdent) -> bool {
|
||||
let mut procedures = self.running_procedures.lock().unwrap();
|
||||
if procedures.contains(failed_region) {
|
||||
return false;
|
||||
pub(crate) fn is_region_failover_running(&self, key: &RegionFailoverKey) -> bool {
|
||||
self.running_procedures.read().unwrap().contains(key)
|
||||
}
|
||||
|
||||
fn insert_running_procedures(
|
||||
&self,
|
||||
failed_region: &RegionIdent,
|
||||
) -> Option<FailoverProcedureGuard> {
|
||||
let key = RegionFailoverKey::from(failed_region.clone());
|
||||
let mut procedures = self.running_procedures.write().unwrap();
|
||||
if procedures.insert(key.clone()) {
|
||||
Some(FailoverProcedureGuard {
|
||||
running_procedures: self.running_procedures.clone(),
|
||||
key,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
procedures.insert(failed_region.clone())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn running_procedures(&self) -> Arc<RwLock<HashSet<RegionFailoverKey>>> {
|
||||
self.running_procedures.clone()
|
||||
}
|
||||
|
||||
pub(crate) async fn do_region_failover(&self, failed_region: &RegionIdent) -> Result<()> {
|
||||
if !self.insert_running_procedures(failed_region) {
|
||||
let Some(guard) = self.insert_running_procedures(failed_region) else {
|
||||
warn!("Region failover procedure for region {failed_region} is already running!");
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
if !self.table_exists(failed_region).await? {
|
||||
// The table could be dropped before the failure detector knows it. Then the region
|
||||
@@ -142,13 +176,9 @@ impl RegionFailoverManager {
|
||||
info!("Starting region failover procedure {procedure_id} for region {failed_region:?}");
|
||||
|
||||
let procedure_manager = self.procedure_manager.clone();
|
||||
let running_procedures = self.running_procedures.clone();
|
||||
let failed_region = failed_region.clone();
|
||||
common_runtime::spawn_bg(async move {
|
||||
let _guard = FailoverProcedureGuard {
|
||||
running_procedures,
|
||||
failed_region: &failed_region,
|
||||
};
|
||||
let _ = guard;
|
||||
|
||||
let watcher = &mut match procedure_manager.submit(procedure_with_id).await {
|
||||
Ok(watcher) => watcher,
|
||||
@@ -178,7 +208,7 @@ impl RegionFailoverManager {
|
||||
let table_global_value = self
|
||||
.selector_ctx
|
||||
.kv_store
|
||||
.get(table_global_key.to_string().into_bytes())
|
||||
.get(table_global_key.to_raw_key())
|
||||
.await?;
|
||||
Ok(table_global_value.is_some())
|
||||
}
|
||||
@@ -232,7 +262,8 @@ trait State: Sync + Send + Debug {
|
||||
/// │ │ │
|
||||
/// └─────────┘ │ Sends "Close Region" request
|
||||
/// │ to the failed Datanode, and
|
||||
/// ┌─────────┐ │ wait for 2 seconds
|
||||
/// | wait for the Region lease expiry
|
||||
/// ┌─────────┐ │ seconds
|
||||
/// │ │ │
|
||||
/// │ ┌──▼────▼──────┐
|
||||
/// Wait candidate │ │ActivateRegion◄───────────────────────┐
|
||||
@@ -260,7 +291,6 @@ trait State: Sync + Send + Debug {
|
||||
/// │ Broadcast Invalidate Table
|
||||
/// │ Cache
|
||||
/// │
|
||||
/// │
|
||||
/// ┌────────▼────────┐
|
||||
/// │RegionFailoverEnd│
|
||||
/// └─────────────────┘
|
||||
|
||||
@@ -28,6 +28,7 @@ use super::{RegionFailoverContext, State};
|
||||
use crate::error::{
|
||||
Error, Result, RetryLaterSnafu, SerializeToJsonSnafu, UnexpectedInstructionReplySnafu,
|
||||
};
|
||||
use crate::handler::region_lease_handler::REGION_LEASE_SECONDS;
|
||||
use crate::handler::HeartbeatMailbox;
|
||||
use crate::procedure::region_failover::CLOSE_REGION_MESSAGE_TIMEOUT;
|
||||
use crate::service::mailbox::{Channel, MailboxReceiver};
|
||||
@@ -35,11 +36,15 @@ use crate::service::mailbox::{Channel, MailboxReceiver};
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub(super) struct DeactivateRegion {
|
||||
candidate: Peer,
|
||||
region_lease_expiry_seconds: u64,
|
||||
}
|
||||
|
||||
impl DeactivateRegion {
|
||||
pub(super) fn new(candidate: Peer) -> Self {
|
||||
Self { candidate }
|
||||
Self {
|
||||
candidate,
|
||||
region_lease_expiry_seconds: REGION_LEASE_SECONDS * 2,
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_close_region_message(
|
||||
@@ -95,15 +100,21 @@ impl DeactivateRegion {
|
||||
}
|
||||
Err(e) if matches!(e, Error::MailboxTimeout { .. }) => {
|
||||
// Since we are in a region failover situation, the Datanode that the failed region
|
||||
// resides might be unreachable. So region deactivation is happened in a "try our
|
||||
// best" effort, do not retry if mailbox received timeout.
|
||||
// However, if the region failover procedure is also used in a planned maintenance
|
||||
// situation in the future, a proper retry is a must.
|
||||
// resides might be unreachable. So we wait for the region lease to expire. The
|
||||
// region would be closed by its own [RegionAliveKeeper].
|
||||
self.wait_for_region_lease_expiry().await;
|
||||
Ok(Box::new(ActivateRegion::new(self.candidate)))
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
/// Sleep for `region_lease_expiry_seconds`, to make sure the region is closed (by its
|
||||
/// region alive keeper). This is critical for region not being opened in multiple Datanodes
|
||||
/// simultaneously.
|
||||
async fn wait_for_region_lease_expiry(&self) {
|
||||
tokio::time::sleep(Duration::from_secs(self.region_lease_expiry_seconds)).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -120,8 +131,8 @@ impl State for DeactivateRegion {
|
||||
let mailbox_receiver = match result {
|
||||
Ok(mailbox_receiver) => mailbox_receiver,
|
||||
Err(e) if matches!(e, Error::PusherNotFound { .. }) => {
|
||||
// The Datanode could be unreachable and deregistered from pushers,
|
||||
// so simply advancing to the next state here.
|
||||
// See the mailbox received timeout situation comments above.
|
||||
self.wait_for_region_lease_expiry().await;
|
||||
return Ok(Box::new(ActivateRegion::new(self.candidate)));
|
||||
}
|
||||
Err(e) => return Err(e),
|
||||
@@ -212,7 +223,10 @@ mod tests {
|
||||
let mut env = TestingEnvBuilder::new().build().await;
|
||||
let failed_region = env.failed_region(1).await;
|
||||
|
||||
let state = DeactivateRegion::new(Peer::new(2, ""));
|
||||
let state = DeactivateRegion {
|
||||
candidate: Peer::new(2, ""),
|
||||
region_lease_expiry_seconds: 2,
|
||||
};
|
||||
let mailbox_receiver = state
|
||||
.send_close_region_message(&env.context, &failed_region, Duration::from_millis(100))
|
||||
.await
|
||||
|
||||
@@ -12,13 +12,17 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
use api::v1::meta::{PutRequest, TableRouteValue};
|
||||
use catalog::helper::{TableGlobalKey, TableGlobalValue};
|
||||
use common_meta::key::TableRouteKey;
|
||||
use common_meta::rpc::store::{BatchGetRequest, BatchGetResponse};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use crate::error::{
|
||||
DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result, TableRouteNotFoundSnafu,
|
||||
ConvertProtoDataSnafu, DecodeTableRouteSnafu, InvalidCatalogValueSnafu, Result,
|
||||
TableRouteNotFoundSnafu,
|
||||
};
|
||||
use crate::service::store::ext::KvStoreExt;
|
||||
use crate::service::store::kv::KvStoreRef;
|
||||
@@ -27,12 +31,40 @@ pub async fn get_table_global_value(
|
||||
kv_store: &KvStoreRef,
|
||||
key: &TableGlobalKey,
|
||||
) -> Result<Option<TableGlobalValue>> {
|
||||
let key = key.to_string().into_bytes();
|
||||
let kv = kv_store.get(key).await?;
|
||||
let kv = kv_store.get(key.to_raw_key()).await?;
|
||||
kv.map(|kv| TableGlobalValue::from_bytes(kv.value).context(InvalidCatalogValueSnafu))
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub(crate) async fn batch_get_table_global_value(
|
||||
kv_store: &KvStoreRef,
|
||||
keys: Vec<&TableGlobalKey>,
|
||||
) -> Result<HashMap<TableGlobalKey, Option<TableGlobalValue>>> {
|
||||
let req = BatchGetRequest {
|
||||
keys: keys.iter().map(|x| x.to_raw_key()).collect::<Vec<_>>(),
|
||||
};
|
||||
let mut resp: BatchGetResponse = kv_store
|
||||
.batch_get(req.into())
|
||||
.await?
|
||||
.try_into()
|
||||
.context(ConvertProtoDataSnafu)?;
|
||||
|
||||
let kvs = resp.take_kvs();
|
||||
let mut result = HashMap::with_capacity(kvs.len());
|
||||
for kv in kvs {
|
||||
let key = TableGlobalKey::try_from_raw_key(kv.key()).context(InvalidCatalogValueSnafu)?;
|
||||
let value = TableGlobalValue::from_bytes(kv.value()).context(InvalidCatalogValueSnafu)?;
|
||||
result.insert(key, Some(value));
|
||||
}
|
||||
|
||||
for key in keys {
|
||||
if !result.contains_key(key) {
|
||||
result.insert(key.clone(), None);
|
||||
}
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub(crate) async fn put_table_global_value(
|
||||
kv_store: &KvStoreRef,
|
||||
key: &TableGlobalKey,
|
||||
@@ -40,7 +72,7 @@ pub(crate) async fn put_table_global_value(
|
||||
) -> Result<()> {
|
||||
let req = PutRequest {
|
||||
header: None,
|
||||
key: key.to_string().into_bytes(),
|
||||
key: key.to_raw_key(),
|
||||
value: value.as_bytes().context(InvalidCatalogValueSnafu)?,
|
||||
prev_kv: false,
|
||||
};
|
||||
@@ -228,12 +260,12 @@ pub(crate) mod tests {
|
||||
async fn test_put_and_get_table_global_value() {
|
||||
let kv_store = Arc::new(MemStore::new()) as _;
|
||||
|
||||
let key = TableGlobalKey {
|
||||
let not_exist_key = TableGlobalKey {
|
||||
catalog_name: "not_exist_catalog".to_string(),
|
||||
schema_name: "not_exist_schema".to_string(),
|
||||
table_name: "not_exist_table".to_string(),
|
||||
};
|
||||
assert!(get_table_global_value(&kv_store, &key)
|
||||
assert!(get_table_global_value(&kv_store, ¬_exist_key)
|
||||
.await
|
||||
.unwrap()
|
||||
.is_none());
|
||||
@@ -244,6 +276,12 @@ pub(crate) mod tests {
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert_eq!(actual, value);
|
||||
|
||||
let keys = vec![¬_exist_key, &key];
|
||||
let result = batch_get_table_global_value(&kv_store, keys).await.unwrap();
|
||||
assert_eq!(result.len(), 2);
|
||||
assert!(result.get(¬_exist_key).unwrap().is_none());
|
||||
assert_eq!(result.get(&key).unwrap().as_ref().unwrap(), &value);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::TableRef;
|
||||
pub mod manager;
|
||||
|
||||
/// Represents a resolved path to a table of the form “catalog.schema.table”
|
||||
#[derive(Debug, PartialEq)]
|
||||
#[derive(Debug, PartialEq, Eq, Hash)]
|
||||
pub struct TableReference<'a> {
|
||||
pub catalog: &'a str,
|
||||
pub schema: &'a str,
|
||||
|
||||
Reference in New Issue
Block a user