feat: register & deregister region failure detectors actively (#4223)

* feat: Use DATANODE_LEASE_SECS from distributed_time_constants for heartbeat pause duration

* feat: introduce `RegionFailureDetectorController` to manage region failure detectors

* feat: add `RegionFailureDetectorController` to `DdlContext`

* feat: add `region_failure_detector_controller` to `Context` in region migration

* feat: register region failure detectors during rollback region migration procedure

* feat: deregister region failure detectors during drop table procedure

* feat: register region failure detectors during create table procedure

* fix: update meta config

* chore: apply suggestions from CR

* chore: avoid cloning

* chore: rename

* chore: reduce the size of the test

* chore: apply suggestions from CR

* chore: move channel initialization into `RegionSupervisor::channel`

* chore: minor refactor

* chore: rename ident
This commit is contained in:
Weny Xu
2024-07-01 13:58:27 +08:00
committed by GitHub
parent 214fd38f69
commit 6a634f8e5d
26 changed files with 447 additions and 137 deletions

View File

@@ -259,7 +259,7 @@
| `failure_detector` | -- | -- | -- |
| `failure_detector.threshold` | Float | `8.0` | -- |
| `failure_detector.min_std_deviation` | String | `100ms` | -- |
| `failure_detector.acceptable_heartbeat_pause` | String | `3000ms` | -- |
| `failure_detector.acceptable_heartbeat_pause` | String | `10000ms` | -- |
| `failure_detector.first_heartbeat_estimate` | String | `1000ms` | -- |
| `datanode` | -- | -- | Datanode options. |
| `datanode.client` | -- | -- | Datanode client options. |

View File

@@ -54,7 +54,7 @@ max_metadata_value_size = "1500KiB"
[failure_detector]
threshold = 8.0
min_std_deviation = "100ms"
acceptable_heartbeat_pause = "3000ms"
acceptable_heartbeat_pause = "10000ms"
first_heartbeat_estimate = "1000ms"
## Datanode options.

View File

@@ -25,7 +25,7 @@ use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::cache_invalidator::CacheInvalidatorRef;
use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::{DdlContext, ProcedureExecutorRef};
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
@@ -559,6 +559,7 @@ impl StartCommand {
flow_metadata_manager,
flow_metadata_allocator,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager,
true,

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use common_telemetry::tracing_context::W3cTrace;
use store_api::storage::{RegionNumber, TableId};
use store_api::storage::{RegionId, RegionNumber, TableId};
use crate::cache_invalidator::CacheInvalidatorRef;
use crate::ddl::flow_meta::FlowMetadataAllocatorRef;
@@ -30,7 +30,7 @@ use crate::peer::PeerLookupServiceRef;
use crate::region_keeper::MemoryRegionKeeperRef;
use crate::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse};
use crate::ClusterId;
use crate::{ClusterId, DatanodeId};
pub mod alter_logical_tables;
pub mod alter_table;
@@ -102,6 +102,33 @@ pub struct TableMetadata {
pub region_wal_options: HashMap<RegionNumber, String>,
}
pub type RegionFailureDetectorControllerRef = Arc<dyn RegionFailureDetectorController>;
pub type DetectingRegion = (ClusterId, DatanodeId, RegionId);
/// Used for actively registering Region failure detectors.
///
/// Ensuring the Region Supervisor can detect Region failures without relying on the first heartbeat from the datanode.
#[async_trait::async_trait]
pub trait RegionFailureDetectorController: Send + Sync {
/// Registers failure detectors for the given identifiers.
async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);
/// Deregisters failure detectors for the given identifiers.
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>);
}
/// A noop implementation of [`RegionFailureDetectorController`].
#[derive(Debug, Clone)]
pub struct NoopRegionFailureDetectorControl;
#[async_trait::async_trait]
impl RegionFailureDetectorController for NoopRegionFailureDetectorControl {
async fn register_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}
async fn deregister_failure_detectors(&self, _detecting_regions: Vec<DetectingRegion>) {}
}
/// The context of ddl.
#[derive(Clone)]
pub struct DdlContext {
@@ -121,4 +148,28 @@ pub struct DdlContext {
pub flow_metadata_allocator: FlowMetadataAllocatorRef,
/// look up peer by id.
pub peer_lookup_service: PeerLookupServiceRef,
/// controller of region failure detector.
pub region_failure_detector_controller: RegionFailureDetectorControllerRef,
}
impl DdlContext {
/// Notifies the RegionSupervisor to register failure detector of new created regions.
///
/// The datanode may crash without sending a heartbeat that contains information about newly created regions,
/// which may prevent the RegionSupervisor from detecting failures in these newly created regions.
pub async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
self.region_failure_detector_controller
.register_failure_detectors(detecting_regions)
.await;
}
/// Notifies the RegionSupervisor to remove failure detectors.
///
/// Once the regions were dropped, subsequent heartbeats no longer include these regions.
/// Therefore, we should remove the failure detectors for these dropped regions.
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
self.region_failure_detector_controller
.deregister_failure_detectors(detecting_regions)
.await;
}
}

View File

@@ -33,7 +33,10 @@ use table::metadata::{RawTableInfo, TableId};
use table::table_reference::TableReference;
use crate::ddl::create_table_template::{build_template, CreateRequestBuilder};
use crate::ddl::utils::{add_peer_context_if_needed, handle_retry_error, region_storage_path};
use crate::ddl::utils::{
add_peer_context_if_needed, convert_region_routes_to_detecting_regions, handle_retry_error,
region_storage_path,
};
use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext};
use crate::error::{self, Result};
use crate::key::table_name::TableNameKey;
@@ -265,16 +268,25 @@ impl CreateTableProcedure {
/// - Failed to create table metadata.
async fn on_create_metadata(&mut self) -> Result<Status> {
let table_id = self.table_id();
let cluster_id = self.creator.data.cluster_id;
let manager = &self.context.table_metadata_manager;
let raw_table_info = self.table_info().clone();
// Safety: the region_wal_options must be allocated.
let region_wal_options = self.region_wal_options()?.clone();
// Safety: the table_route must be allocated.
let table_route = TableRouteValue::Physical(self.table_route()?.clone());
let physical_table_route = self.table_route()?.clone();
let detecting_regions = convert_region_routes_to_detecting_regions(
cluster_id,
&physical_table_route.region_routes,
);
let table_route = TableRouteValue::Physical(physical_table_route);
manager
.create_table_metadata(raw_table_info, table_route, region_wal_options)
.await?;
self.context
.register_failure_detectors(detecting_regions)
.await;
info!("Created table metadata for table {table_id}");
self.creator.opening_regions.clear();

View File

@@ -35,6 +35,7 @@ use crate::ddl::DdlContext;
use crate::error::Result;
use crate::key::table_name::TableNameValue;
use crate::lock_key::{CatalogLock, SchemaLock};
use crate::ClusterId;
pub struct DropDatabaseProcedure {
/// The context of procedure runtime.
@@ -53,6 +54,7 @@ pub(crate) enum DropTableTarget {
/// Context of [DropDatabaseProcedure] execution.
pub(crate) struct DropDatabaseContext {
cluster_id: ClusterId,
catalog: String,
schema: String,
drop_if_exists: bool,
@@ -85,6 +87,7 @@ impl DropDatabaseProcedure {
Self {
runtime_context: context,
context: DropDatabaseContext {
cluster_id: 0,
catalog,
schema,
drop_if_exists,
@@ -105,6 +108,7 @@ impl DropDatabaseProcedure {
Ok(Self {
runtime_context,
context: DropDatabaseContext {
cluster_id: 0,
catalog,
schema,
drop_if_exists,

View File

@@ -221,6 +221,7 @@ mod tests {
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
@@ -256,6 +257,7 @@ mod tests {
// It always starts from Logical
let mut state = DropDatabaseCursor::new(DropTableTarget::Logical);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
@@ -284,6 +286,7 @@ mod tests {
let ddl_context = new_ddl_context(node_manager);
let mut state = DropDatabaseCursor::new(DropTableTarget::Physical);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,

View File

@@ -96,10 +96,11 @@ impl State for DropDatabaseExecutor {
async fn next(
&mut self,
ddl_ctx: &DdlContext,
_ctx: &mut DropDatabaseContext,
ctx: &mut DropDatabaseContext,
) -> Result<(Box<dyn State>, Status)> {
self.register_dropping_regions(ddl_ctx)?;
let executor = DropTableExecutor::new(self.table_name.clone(), self.table_id, true);
let executor =
DropTableExecutor::new(ctx.cluster_id, self.table_name.clone(), self.table_id, true);
// Deletes metadata for table permanently.
let table_route_value = TableRouteValue::new(
self.table_id,
@@ -186,6 +187,7 @@ mod tests {
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
@@ -198,6 +200,7 @@ mod tests {
}
// Execute again
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
@@ -238,6 +241,7 @@ mod tests {
DropTableTarget::Logical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
@@ -250,6 +254,7 @@ mod tests {
}
// Execute again
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
@@ -339,6 +344,7 @@ mod tests {
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,
@@ -368,6 +374,7 @@ mod tests {
DropTableTarget::Physical,
);
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
drop_if_exists: false,

View File

@@ -118,6 +118,7 @@ mod tests {
.unwrap();
let mut state = DropDatabaseRemoveMetadata;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
@@ -144,6 +145,7 @@ mod tests {
// Schema not exists
let mut state = DropDatabaseRemoveMetadata;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,

View File

@@ -89,6 +89,7 @@ mod tests {
let ddl_context = new_ddl_context(node_manager);
let mut step = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: false,
@@ -104,6 +105,7 @@ mod tests {
let ddl_context = new_ddl_context(node_manager);
let mut state = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: true,
@@ -126,6 +128,7 @@ mod tests {
.unwrap();
let mut state = DropDatabaseStart;
let mut ctx = DropDatabaseContext {
cluster_id: 0,
catalog: "foo".to_string(),
schema: "bar".to_string(),
drop_if_exists: false,

View File

@@ -279,6 +279,7 @@ impl DropTableData {
fn build_executor(&self) -> DropTableExecutor {
DropTableExecutor::new(
self.cluster_id,
self.task.table_name(),
self.task.table_id,
self.task.drop_if_exists,

View File

@@ -26,13 +26,14 @@ use table::metadata::TableId;
use table::table_name::TableName;
use crate::cache_invalidator::Context;
use crate::ddl::utils::add_peer_context_if_needed;
use crate::ddl::utils::{add_peer_context_if_needed, convert_region_routes_to_detecting_regions};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::instruction::CacheIdent;
use crate::key::table_name::TableNameKey;
use crate::key::table_route::TableRouteValue;
use crate::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
use crate::ClusterId;
/// [Control] indicated to the caller whether to go to the next step.
#[derive(Debug)]
@@ -50,8 +51,14 @@ impl<T> Control<T> {
impl DropTableExecutor {
/// Returns the [DropTableExecutor].
pub fn new(table: TableName, table_id: TableId, drop_if_exists: bool) -> Self {
pub fn new(
cluster_id: ClusterId,
table: TableName,
table_id: TableId,
drop_if_exists: bool,
) -> Self {
Self {
cluster_id,
table,
table_id,
drop_if_exists,
@@ -64,6 +71,7 @@ impl DropTableExecutor {
/// - Invalidates the cache on the Frontend nodes.
/// - Drops the regions on the Datanode nodes.
pub struct DropTableExecutor {
cluster_id: ClusterId,
table: TableName,
table_id: TableId,
drop_if_exists: bool,
@@ -130,7 +138,17 @@ impl DropTableExecutor {
) -> Result<()> {
ctx.table_metadata_manager
.destroy_table_metadata(self.table_id, &self.table, table_route_value)
.await
.await?;
let detecting_regions = if table_route_value.is_physical() {
// Safety: checked.
let regions = table_route_value.region_routes().unwrap();
convert_region_routes_to_detecting_regions(self.cluster_id, regions)
} else {
vec![]
};
ctx.deregister_failure_detectors(detecting_regions).await;
Ok(())
}
/// Restores the table metadata.
@@ -274,6 +292,7 @@ mod tests {
let node_manager = Arc::new(MockDatanodeManager::new(()));
let ctx = new_ddl_context(node_manager);
let executor = DropTableExecutor::new(
0,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
1024,
true,
@@ -283,6 +302,7 @@ mod tests {
// Drops a non-exists table
let executor = DropTableExecutor::new(
0,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
1024,
false,
@@ -292,6 +312,7 @@ mod tests {
// Drops a exists table
let executor = DropTableExecutor::new(
0,
TableName::new(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, "my_table"),
1024,
false,

View File

@@ -19,11 +19,14 @@ use snafu::{ensure, location, Location, OptionExt};
use store_api::metric_engine_consts::LOGICAL_TABLE_METADATA_KEY;
use table::metadata::TableId;
use crate::ddl::DetectingRegion;
use crate::error::{Error, Result, TableNotFoundSnafu, UnsupportedSnafu};
use crate::key::table_name::TableNameKey;
use crate::key::TableMetadataManagerRef;
use crate::peer::Peer;
use crate::rpc::ddl::CreateTableTask;
use crate::rpc::router::RegionRoute;
use crate::ClusterId;
/// Adds [Peer] context if the error is unretryable.
pub fn add_peer_context_if_needed(datanode: Peer) -> impl FnOnce(Error) -> Error {
@@ -126,3 +129,19 @@ pub async fn get_physical_table_id(
.get_physical_table_id(logical_table_id)
.await
}
/// Converts a list of [`RegionRoute`] to a list of [`DetectingRegion`].
pub fn convert_region_routes_to_detecting_regions(
cluster_id: ClusterId,
region_routes: &[RegionRoute],
) -> Vec<DetectingRegion> {
region_routes
.iter()
.flat_map(|route| {
route
.leader_peer
.as_ref()
.map(|peer| (cluster_id, peer.id, route.region.id))
})
.collect::<Vec<_>>()
}

View File

@@ -805,7 +805,7 @@ mod tests {
use crate::ddl::flow_meta::FlowMetadataAllocator;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::DdlContext;
use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use crate::key::flow::FlowMetadataManager;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
@@ -856,6 +856,7 @@ mod tests {
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),
true,

View File

@@ -24,7 +24,7 @@ use common_recordbatch::SendableRecordBatchStream;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::ddl::flow_meta::FlowMetadataAllocator;
use crate::ddl::table_meta::TableMetadataAllocator;
use crate::ddl::DdlContext;
use crate::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use crate::error::Result;
use crate::key::flow::FlowMetadataManager;
use crate::key::TableMetadataManager;
@@ -182,6 +182,7 @@ pub fn new_ddl_context_with_kv_backend(
flow_metadata_allocator,
flow_metadata_manager,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
}
}

View File

@@ -15,6 +15,7 @@
use std::collections::VecDeque;
use std::time::Duration;
use common_meta::distributed_time_constants;
use serde::{Deserialize, Serialize};
/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
@@ -37,7 +38,7 @@ use serde::{Deserialize, Serialize};
///
/// where F is the cumulative distribution function of a normal distribution with mean
/// and standard deviation estimated from historical heartbeat inter-arrival times.
#[cfg_attr(test, derive(Clone))]
#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
pub(crate) struct PhiAccrualFailureDetector {
/// A low threshold is prone to generate many wrong suspicions but ensures a quick detection
/// in the event of a real crash. Conversely, a high threshold generates fewer mistakes but
@@ -82,7 +83,9 @@ impl Default for PhiAccrualFailureDetectorOptions {
Self {
threshold: 8_f32,
min_std_deviation: Duration::from_millis(100),
acceptable_heartbeat_pause: Duration::from_millis(3000),
acceptable_heartbeat_pause: Duration::from_secs(
distributed_time_constants::DATANODE_LEASE_SECS,
),
first_heartbeat_estimate: Duration::from_millis(1000),
}
}
@@ -195,7 +198,7 @@ fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
/// It is capped by the number of samples specified in `max_sample_size`.
///
/// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory.
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
struct HeartbeatHistory {
/// Number of samples to use for calculation of mean and standard deviation of inter-arrival
/// times.

View File

@@ -26,8 +26,10 @@ pub struct RegionFailureHandler {
}
impl RegionFailureHandler {
pub(crate) fn new(mut region_supervisor: RegionSupervisor) -> Self {
let heartbeat_acceptor = region_supervisor.heartbeat_acceptor();
pub(crate) fn new(
mut region_supervisor: RegionSupervisor,
heartbeat_acceptor: HeartbeatAcceptor,
) -> Self {
info!("Starting region supervisor");
common_runtime::spawn_bg(async move { region_supervisor.run().await });
Self { heartbeat_acceptor }
@@ -71,13 +73,13 @@ mod tests {
use crate::handler::{HeartbeatAccumulator, HeartbeatHandler};
use crate::metasrv::builder::MetasrvBuilder;
use crate::region::supervisor::tests::new_test_supervisor;
use crate::region::supervisor::Event;
use crate::region::supervisor::{Event, HeartbeatAcceptor};
#[tokio::test]
async fn test_handle_heartbeat() {
let supervisor = new_test_supervisor();
let sender = supervisor.sender();
let handler = RegionFailureHandler::new(supervisor);
let (supervisor, sender) = new_test_supervisor();
let heartbeat_acceptor = HeartbeatAcceptor::new(sender.clone());
let handler = RegionFailureHandler::new(supervisor, heartbeat_acceptor);
let req = &HeartbeatRequest::default();
let builder = MetasrvBuilder::new();
let metasrv = builder.build().await.unwrap();

View File

@@ -22,7 +22,9 @@ use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
use common_grpc::channel_manager::ChannelConfig;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
use common_meta::ddl::DdlContext;
use common_meta::ddl::{
DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
};
use common_meta::ddl_manager::DdlManager;
use common_meta::distributed_time_constants;
use common_meta::key::flow::FlowMetadataManager;
@@ -68,7 +70,10 @@ use crate::metasrv::{
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::DefaultContextFactory;
use crate::pubsub::PublisherRef;
use crate::region::supervisor::{RegionSupervisor, DEFAULT_TICK_INTERVAL};
use crate::region::supervisor::{
HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorTicker,
DEFAULT_TICK_INTERVAL,
};
use crate::selector::lease_based::LeaseBasedSelector;
use crate::selector::round_robin::RoundRobinSelector;
use crate::service::mailbox::MailboxRef;
@@ -282,6 +287,60 @@ impl MetasrvBuilder {
},
));
let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
if !is_remote_wal && options.enable_region_failover {
return error::UnexpectedSnafu {
violated: "Region failover is not supported in the local WAL implementation!",
}
.fail();
}
let (tx, rx) = RegionSupervisor::channel();
let (region_failure_detector_controller, region_supervisor_ticker): (
RegionFailureDetectorControllerRef,
Option<std::sync::Arc<RegionSupervisorTicker>>,
) = if options.enable_region_failover && is_remote_wal {
(
Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
Some(Arc::new(RegionSupervisorTicker::new(
DEFAULT_TICK_INTERVAL,
tx.clone(),
))),
)
} else {
(Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
};
let region_migration_manager = Arc::new(RegionMigrationManager::new(
procedure_manager.clone(),
DefaultContextFactory::new(
table_metadata_manager.clone(),
memory_region_keeper.clone(),
region_failure_detector_controller.clone(),
mailbox.clone(),
options.server_addr.clone(),
),
));
region_migration_manager.try_start()?;
let region_failover_handler = if options.enable_region_failover && is_remote_wal {
let region_supervisor = RegionSupervisor::new(
rx,
options.failure_detector,
selector_ctx.clone(),
selector.clone(),
region_migration_manager.clone(),
leader_cached_kv_backend.clone() as _,
peer_lookup_service.clone(),
);
Some(RegionFailureHandler::new(
region_supervisor,
HeartbeatAcceptor::new(tx),
))
} else {
None
};
let ddl_manager = Arc::new(
DdlManager::try_new(
DdlContext {
@@ -292,7 +351,8 @@ impl MetasrvBuilder {
table_metadata_allocator: table_metadata_allocator.clone(),
flow_metadata_manager: flow_metadata_manager.clone(),
flow_metadata_allocator: flow_metadata_allocator.clone(),
peer_lookup_service: peer_lookup_service.clone(),
peer_lookup_service,
region_failure_detector_controller,
},
procedure_manager.clone(),
true,
@@ -300,44 +360,6 @@ impl MetasrvBuilder {
.context(error::InitDdlManagerSnafu)?,
);
let region_migration_manager = Arc::new(RegionMigrationManager::new(
procedure_manager.clone(),
DefaultContextFactory::new(
table_metadata_manager.clone(),
memory_region_keeper.clone(),
mailbox.clone(),
options.server_addr.clone(),
),
));
region_migration_manager.try_start()?;
if !is_remote_wal && options.enable_region_failover {
return error::UnexpectedSnafu {
violated: "Region failover is not supported in the local WAL implementation!",
}
.fail();
}
let (region_failover_handler, region_supervisor_ticker) =
if options.enable_region_failover && is_remote_wal {
let region_supervisor = RegionSupervisor::new(
options.failure_detector,
DEFAULT_TICK_INTERVAL,
selector_ctx.clone(),
selector.clone(),
region_migration_manager.clone(),
leader_cached_kv_backend.clone() as _,
peer_lookup_service,
);
let region_supervisor_ticker = region_supervisor.ticker();
(
Some(RegionFailureHandler::new(region_supervisor)),
Some(region_supervisor_ticker),
)
} else {
(None, None)
};
let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {

View File

@@ -29,6 +29,7 @@ use std::time::Duration;
use api::v1::meta::MailboxMessage;
use common_error::ext::BoxedError;
use common_meta::ddl::RegionFailureDetectorControllerRef;
use common_meta::instruction::{CacheIdent, Instruction};
use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
use common_meta::key::table_info::TableInfoValue;
@@ -154,15 +155,17 @@ pub struct DefaultContextFactory {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,
mailbox: MailboxRef,
server_addr: String,
}
impl DefaultContextFactory {
/// Returns an [ContextFactoryImpl].
/// Returns an [`DefaultContextFactory`].
pub fn new(
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,
mailbox: MailboxRef,
server_addr: String,
) -> Self {
@@ -170,6 +173,7 @@ impl DefaultContextFactory {
volatile_ctx: VolatileContext::default(),
table_metadata_manager,
opening_region_keeper,
region_failure_detector_controller,
mailbox,
server_addr,
}
@@ -183,6 +187,7 @@ impl ContextFactory for DefaultContextFactory {
volatile_ctx: self.volatile_ctx,
table_metadata_manager: self.table_metadata_manager,
opening_region_keeper: self.opening_region_keeper,
region_failure_detector_controller: self.region_failure_detector_controller,
mailbox: self.mailbox,
server_addr: self.server_addr,
}
@@ -195,6 +200,7 @@ pub struct Context {
volatile_ctx: VolatileContext,
table_metadata_manager: TableMetadataManagerRef,
opening_region_keeper: MemoryRegionKeeperRef,
region_failure_detector_controller: RegionFailureDetectorControllerRef,
mailbox: MailboxRef,
server_addr: String,
}
@@ -236,6 +242,20 @@ impl Context {
Ok(table_route_value.as_ref().unwrap())
}
/// Notifies the RegionSupervisor to register failure detectors of failed region.
///
/// The original failure detector was removed once the procedure was triggered.
/// Now, we need to register the failure detector for the failed region.
pub async fn register_failure_detectors(&self) {
let cluster_id = self.persistent_ctx.cluster_id;
let datanode_id = self.persistent_ctx.from_peer.id;
let region_id = self.persistent_ctx.region_id;
self.region_failure_detector_controller
.register_failure_detectors(vec![(cluster_id, datanode_id, region_id)])
.await;
}
/// Removes the `table_route` of [VolatileContext], returns true if any.
pub fn remove_table_route_value(&mut self) -> bool {
let value = self.volatile_ctx.table_route.take();

View File

@@ -20,6 +20,7 @@ use std::time::Duration;
use api::v1::meta::mailbox_message::Payload;
use api::v1::meta::{HeartbeatResponse, MailboxMessage, RequestHeader};
use common_meta::ddl::NoopRegionFailureDetectorControl;
use common_meta::instruction::{
DowngradeRegionReply, InstructionReply, SimpleReply, UpgradeRegionReply,
};
@@ -150,6 +151,7 @@ impl TestingEnv {
volatile_ctx: Default::default(),
mailbox: self.mailbox_ctx.mailbox().clone(),
server_addr: self.server_addr.to_string(),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
}
}

View File

@@ -51,6 +51,7 @@ impl UpdateMetadata {
});
}
ctx.register_failure_detectors().await;
ctx.remove_table_route_value();
Ok(())
@@ -60,6 +61,7 @@ impl UpdateMetadata {
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use common_meta::key::test_utils::new_test_table_info;
use common_meta::peer::Peer;
@@ -71,6 +73,7 @@ mod tests {
use crate::procedure::region_migration::test_util::{self, TestingEnv};
use crate::procedure::region_migration::update_metadata::UpdateMetadata;
use crate::procedure::region_migration::{ContextFactory, PersistentContext, State};
use crate::region::supervisor::RegionFailureDetectorControl;
fn new_persistent_context() -> PersistentContext {
test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
@@ -98,6 +101,8 @@ mod tests {
let env = TestingEnv::new();
let mut ctx = env.context_factory().new_context(persistent_context);
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
ctx.region_failure_detector_controller = Arc::new(RegionFailureDetectorControl::new(tx));
let table_id = ctx.region_id().table_id();
let table_info = new_test_table_info(1024, vec![1, 2, 3]).into();
@@ -161,8 +166,18 @@ mod tests {
assert!(ctx.volatile_ctx.table_route.is_none());
assert!(err.is_retryable());
assert!(format!("{err:?}").contains("Failed to update the table route"));
assert_eq!(rx.len(), 0);
state.rollback_downgraded_region(&mut ctx).await.unwrap();
let event = rx.try_recv().unwrap();
let detecting_regions = event.into_region_failure_detectors();
assert_eq!(
detecting_regions,
vec![(
ctx.persistent_ctx.cluster_id,
from_peer.id,
ctx.persistent_ctx.region_id
)]
);
let table_route = table_metadata_manager
.table_route_manager()

View File

@@ -114,7 +114,7 @@ pub mod test_data {
use common_catalog::consts::MITO2_ENGINE;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::DdlContext;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
@@ -226,6 +226,7 @@ pub mod test_data {
flow_metadata_allocator,
memory_region_keeper: Arc::new(MemoryRegionKeeper::new()),
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
}
}
}

View File

@@ -14,27 +14,24 @@
use std::ops::DerefMut;
use common_meta::{ClusterId, DatanodeId};
use common_meta::ddl::DetectingRegion;
use dashmap::mapref::multiple::RefMulti;
use dashmap::DashMap;
use store_api::storage::RegionId;
use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions};
pub(crate) type Ident = (ClusterId, DatanodeId, RegionId);
/// Detects the region failures.
pub(crate) struct RegionFailureDetector {
options: PhiAccrualFailureDetectorOptions,
detectors: DashMap<Ident, PhiAccrualFailureDetector>,
detectors: DashMap<DetectingRegion, PhiAccrualFailureDetector>,
}
pub(crate) struct FailureDetectorEntry<'a> {
e: RefMulti<'a, Ident, PhiAccrualFailureDetector>,
e: RefMulti<'a, DetectingRegion, PhiAccrualFailureDetector>,
}
impl FailureDetectorEntry<'_> {
pub(crate) fn region_ident(&self) -> &Ident {
pub(crate) fn region_ident(&self) -> &DetectingRegion {
self.e.key()
}
@@ -51,16 +48,31 @@ impl RegionFailureDetector {
}
}
/// Returns [PhiAccrualFailureDetector] of the specific ([DatanodeId],[RegionId]).
/// Returns [`PhiAccrualFailureDetector`] of the specific [`DetectingRegion`].
pub(crate) fn region_failure_detector(
&self,
ident: Ident,
detecting_region: DetectingRegion,
) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
self.detectors
.entry(ident)
.entry(detecting_region)
.or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options))
}
/// Returns A mutable reference to the [`PhiAccrualFailureDetector`] for the specified [`DetectingRegion`].
/// If a detector already exists for the region, it is returned. Otherwise, a new
/// detector is created and initialized with the provided timestamp.
pub(crate) fn maybe_init_region_failure_detector(
&self,
detecting_region: DetectingRegion,
ts_millis: i64,
) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
self.detectors.entry(detecting_region).or_insert_with(|| {
let mut detector = PhiAccrualFailureDetector::from_options(self.options);
detector.heartbeat(ts_millis);
detector
})
}
/// Returns a [FailureDetectorEntry] iterator.
pub(crate) fn iter(&self) -> impl Iterator<Item = FailureDetectorEntry> + '_ {
self.detectors
@@ -69,8 +81,8 @@ impl RegionFailureDetector {
}
/// Removes the specific [PhiAccrualFailureDetector] if exists.
pub(crate) fn remove(&self, ident: &Ident) {
self.detectors.remove(ident);
pub(crate) fn remove(&self, region: &DetectingRegion) {
self.detectors.remove(region);
}
/// Removes all [PhiAccrualFailureDetector]s.
@@ -78,10 +90,10 @@ impl RegionFailureDetector {
self.detectors.clear()
}
/// Returns true if the specific `ident` exists.
/// Returns true if the specific [`DetectingRegion`] exists.
#[cfg(test)]
pub(crate) fn contains(&self, ident: &Ident) -> bool {
self.detectors.contains_key(ident)
pub(crate) fn contains(&self, region: &DetectingRegion) -> bool {
self.detectors.contains_key(region)
}
/// Returns the length
@@ -110,14 +122,16 @@ impl RegionFailureDetector {
#[cfg(test)]
mod tests {
use store_api::storage::RegionId;
use super::*;
#[test]
fn test_default_failure_detector_container() {
let container = RegionFailureDetector::new(Default::default());
let ident = (0, 2, RegionId::new(1, 1));
let _ = container.region_failure_detector(ident);
assert!(container.contains(&ident));
let detecting_region = (0, 2, RegionId::new(1, 1));
let _ = container.region_failure_detector(detecting_region);
assert!(container.contains(&detecting_region));
{
let mut iter = container.iter();

View File

@@ -16,6 +16,7 @@ use std::fmt::Debug;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
use common_meta::key::MAINTENANCE_KEY;
use common_meta::kv_backend::KvBackendRef;
use common_meta::peer::PeerLookupServiceRef;
@@ -29,13 +30,13 @@ use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::time::{interval, MissedTickBehavior};
use super::failure_detector::RegionFailureDetector;
use crate::error::{self, Result};
use crate::failure_detector::PhiAccrualFailureDetectorOptions;
use crate::handler::node_stat::Stat;
use crate::metasrv::{SelectorContext, SelectorRef};
use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
use crate::procedure::region_migration::RegionMigrationProcedureTask;
use crate::region::failure_detector::RegionFailureDetector;
use crate::selector::SelectorOptions;
/// `DatanodeHeartbeat` represents the heartbeat signal sent from a datanode.
@@ -75,18 +76,38 @@ impl From<&Stat> for DatanodeHeartbeat {
/// of the supervisor during tests.
pub(crate) enum Event {
Tick,
RegisterFailureDetectors(Vec<DetectingRegion>),
DeregisterFailureDetectors(Vec<DetectingRegion>),
HeartbeatArrived(DatanodeHeartbeat),
Clear,
#[cfg(test)]
Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
}
#[cfg(test)]
impl Event {
pub(crate) fn into_region_failure_detectors(self) -> Vec<DetectingRegion> {
match self {
Self::RegisterFailureDetectors(detecting_regions) => detecting_regions,
_ => unreachable!(),
}
}
}
impl Debug for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Tick => write!(f, "Tick"),
Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
Self::Clear => write!(f, "Clear"),
Self::RegisterFailureDetectors(arg0) => f
.debug_tuple("RegisterFailureDetectors")
.field(arg0)
.finish(),
Self::DeregisterFailureDetectors(arg0) => f
.debug_tuple("DeregisterFailureDetectors")
.field(arg0)
.finish(),
#[cfg(test)]
Self::Dump(_) => f.debug_struct("Dump").finish(),
}
@@ -109,6 +130,14 @@ pub struct RegionSupervisorTicker {
}
impl RegionSupervisorTicker {
pub(crate) fn new(tick_interval: Duration, sender: Sender<Event>) -> Self {
Self {
tick_handle: Mutex::new(None),
tick_interval,
sender,
}
}
/// Starts the ticker.
pub fn start(&self) {
let mut handle = self.tick_handle.lock().unwrap();
@@ -160,12 +189,8 @@ pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
pub struct RegionSupervisor {
/// Used to detect the failure of regions.
failure_detector: RegionFailureDetector,
/// The interval of tick
tick_interval: Duration,
/// Receives [Event]s.
receiver: Receiver<Event>,
/// [Event] Sender.
sender: Sender<Event>,
/// The context of [`SelectorRef`]
selector_context: SelectorContext,
/// Candidate node selector.
@@ -178,44 +203,77 @@ pub struct RegionSupervisor {
peer_lookup: PeerLookupServiceRef,
}
/// Controller for managing failure detectors for regions.
#[derive(Debug, Clone)]
pub struct RegionFailureDetectorControl {
sender: Sender<Event>,
}
impl RegionFailureDetectorControl {
pub(crate) fn new(sender: Sender<Event>) -> Self {
Self { sender }
}
}
#[async_trait::async_trait]
impl RegionFailureDetectorController for RegionFailureDetectorControl {
async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
if let Err(err) = self
.sender
.send(Event::RegisterFailureDetectors(detecting_regions))
.await
{
error!(err; "RegionSupervisor is stop receiving heartbeat");
}
}
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
if let Err(err) = self
.sender
.send(Event::DeregisterFailureDetectors(detecting_regions))
.await
{
error!(err; "RegionSupervisor is stop receiving heartbeat");
}
}
}
/// [`HeartbeatAcceptor`] forwards heartbeats to [`RegionSupervisor`].
pub(crate) struct HeartbeatAcceptor {
sender: Sender<Event>,
}
impl HeartbeatAcceptor {
pub(crate) fn new(sender: Sender<Event>) -> Self {
Self { sender }
}
/// Accepts heartbeats from datanodes.
pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
if let Err(e) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
error!(e; "RegionSupervisor is stop receiving heartbeat");
if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
error!(err; "RegionSupervisor is stop receiving heartbeat");
}
}
}
#[cfg(test)]
impl RegionSupervisor {
/// Returns the [Event] sender.
pub(crate) fn sender(&self) -> Sender<Event> {
self.sender.clone()
/// Returns a a mpsc channel with a buffer capacity of 1024 for sending and receiving `Event` messages.
pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
tokio::sync::mpsc::channel(1024)
}
}
impl RegionSupervisor {
pub(crate) fn new(
event_receiver: Receiver<Event>,
options: PhiAccrualFailureDetectorOptions,
tick_interval: Duration,
selector_context: SelectorContext,
selector: SelectorRef,
region_migration_manager: RegionMigrationManagerRef,
kv_backend: KvBackendRef,
peer_lookup: PeerLookupServiceRef,
) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(1024);
Self {
failure_detector: RegionFailureDetector::new(options),
tick_interval,
receiver: rx,
sender: tx,
receiver: event_receiver,
selector_context,
selector,
region_migration_manager,
@@ -224,22 +282,6 @@ impl RegionSupervisor {
}
}
/// Returns the [`HeartbeatAcceptor`].
pub(crate) fn heartbeat_acceptor(&self) -> HeartbeatAcceptor {
HeartbeatAcceptor {
sender: self.sender.clone(),
}
}
/// Returns the [`RegionSupervisorTicker`].
pub(crate) fn ticker(&self) -> RegionSupervisorTickerRef {
Arc::new(RegionSupervisorTicker {
tick_interval: self.tick_interval,
sender: self.sender.clone(),
tick_handle: Mutex::new(None),
})
}
/// Runs the main loop.
pub(crate) async fn run(&mut self) {
while let Some(event) = self.receiver.recv().await {
@@ -248,6 +290,12 @@ impl RegionSupervisor {
let regions = self.detect_region_failure();
self.handle_region_failures(regions).await;
}
Event::RegisterFailureDetectors(detecting_regions) => {
self.register_failure_detectors(detecting_regions).await
}
Event::DeregisterFailureDetectors(detecting_regions) => {
self.deregister_failure_detectors(detecting_regions).await
}
Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
Event::Clear => self.clear(),
#[cfg(test)]
@@ -259,6 +307,21 @@ impl RegionSupervisor {
info!("RegionSupervisor is stopped!");
}
async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
let ts_millis = current_time_millis();
for region in detecting_regions {
// The corresponding region has `acceptable_heartbeat_pause_millis` to send heartbeat from datanode.
self.failure_detector
.maybe_init_region_failure_detector(region, ts_millis);
}
}
async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
for region in detecting_regions {
self.failure_detector.remove(&region)
}
}
async fn handle_region_failures(&self, mut regions: Vec<(ClusterId, DatanodeId, RegionId)>) {
if regions.is_empty() {
return;
@@ -376,8 +439,10 @@ impl RegionSupervisor {
/// Updates the state of corresponding failure detectors.
fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
for region_id in heartbeat.regions {
let ident = (heartbeat.cluster_id, heartbeat.datanode_id, region_id);
let mut detector = self.failure_detector.region_failure_detector(ident);
let detecting_region = (heartbeat.cluster_id, heartbeat.datanode_id, region_id);
let mut detector = self
.failure_detector
.region_failure_detector(detecting_region);
detector.heartbeat(heartbeat.timestamp);
}
}
@@ -393,22 +458,25 @@ pub(crate) mod tests {
use std::sync::{Arc, Mutex};
use std::time::Duration;
use common_meta::ddl::RegionFailureDetectorController;
use common_meta::peer::Peer;
use common_meta::test_util::NoopPeerLookupService;
use common_time::util::current_time_millis;
use rand::Rng;
use store_api::storage::RegionId;
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tokio::time::sleep;
use crate::procedure::region_migration::manager::RegionMigrationManager;
use crate::procedure::region_migration::test_util::TestingEnv;
use crate::region::supervisor::{
DatanodeHeartbeat, Event, RegionSupervisor, RegionSupervisorTicker,
DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
RegionSupervisorTicker,
};
use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector};
pub(crate) fn new_test_supervisor() -> RegionSupervisor {
pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
let env = TestingEnv::new();
let selector_context = new_test_selector_context();
let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
@@ -419,22 +487,25 @@ pub(crate) mod tests {
));
let kv_backend = env.kv_backend();
let peer_lookup = Arc::new(NoopPeerLookupService);
let (tx, rx) = RegionSupervisor::channel();
RegionSupervisor::new(
Default::default(),
Duration::from_secs(1),
selector_context,
selector,
region_migration_manager,
kv_backend,
peer_lookup,
(
RegionSupervisor::new(
rx,
Default::default(),
selector_context,
selector,
region_migration_manager,
kv_backend,
peer_lookup,
),
tx,
)
}
#[tokio::test]
async fn test_heartbeat() {
let mut supervisor = new_test_supervisor();
let sender = supervisor.sender();
let (mut supervisor, sender) = new_test_supervisor();
tokio::spawn(async move { supervisor.run().await });
sender
@@ -526,4 +597,37 @@ pub(crate) mod tests {
}
}
}
#[tokio::test]
async fn test_region_failure_detector_controller() {
let (mut supervisor, sender) = new_test_supervisor();
let controller = RegionFailureDetectorControl::new(sender.clone());
tokio::spawn(async move { supervisor.run().await });
let detecting_region = (0, 1, RegionId::new(1, 1));
controller
.register_failure_detectors(vec![detecting_region])
.await;
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
let detector = rx.await.unwrap();
let region_detector = detector.region_failure_detector(detecting_region).clone();
// Registers failure detector again
controller
.register_failure_detectors(vec![detecting_region])
.await;
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
let detector = rx.await.unwrap();
let got = detector.region_failure_detector(detecting_region).clone();
assert_eq!(region_detector, got);
controller
.deregister_failure_detectors(vec![detecting_region])
.await;
let (tx, rx) = oneshot::channel();
sender.send(Event::Dump(tx)).await.unwrap();
assert!(rx.await.unwrap().is_empty());
}
}

View File

@@ -84,9 +84,9 @@ impl Arbitrary<'_> for FuzzInput {
let seed = u.int_in_range(u64::MIN..=u64::MAX)?;
let mut rng = ChaChaRng::seed_from_u64(seed);
let columns = rng.gen_range(2..64);
let rows = rng.gen_range(2..4096);
let rows = rng.gen_range(2..2048);
let tables = rng.gen_range(1..64);
let inserts = rng.gen_range(2..16);
let inserts = rng.gen_range(2..8);
Ok(FuzzInput {
columns,
rows,
@@ -264,7 +264,7 @@ async fn execute_failover(ctx: FuzzContext, input: FuzzInput) -> Result<()> {
let mut rng = ChaCha20Rng::seed_from_u64(input.seed);
info!("Generates {} tables", input.tables);
let exprs = generate_create_exprs(input.tables, input.columns, &mut rng)?;
let parallelism = 8;
let parallelism = 4;
let table_ctxs = exprs
.iter()
.map(|expr| Arc::new(TableContext::from(expr)))

View File

@@ -23,7 +23,7 @@ use common_config::KvBackendConfig;
use common_meta::cache::LayeredCacheRegistryBuilder;
use common_meta::ddl::flow_meta::FlowMetadataAllocator;
use common_meta::ddl::table_meta::TableMetadataAllocator;
use common_meta::ddl::DdlContext;
use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
use common_meta::ddl_manager::DdlManager;
use common_meta::key::flow::FlowMetadataManager;
use common_meta::key::TableMetadataManager;
@@ -199,6 +199,7 @@ impl GreptimeDbStandaloneBuilder {
flow_metadata_manager,
flow_metadata_allocator,
peer_lookup_service: Arc::new(StandalonePeerLookupService::new()),
region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
},
procedure_manager.clone(),
register_procedure_loaders,