From cbc2620a5945aed9bf2226d94acfbd1091be4ad0 Mon Sep 17 00:00:00 2001 From: LFC Date: Tue, 20 Jun 2023 15:45:29 +0800 Subject: [PATCH] feat: start region alive keepers (#1796) * feat: start region alive keepers --- src/catalog/src/remote/manager.rs | 52 ++++-- src/catalog/src/remote/region_alive_keeper.rs | 175 +++++++++++------- src/catalog/tests/remote_catalog_tests.rs | 163 +++++++++++++--- src/datanode/src/heartbeat.rs | 13 +- .../src/heartbeat/handler/close_region.rs | 60 +++--- src/datanode/src/instance.rs | 12 +- src/datanode/src/tests.rs | 59 ++++-- src/datanode/src/tests/test_util.rs | 7 +- src/table/src/test_util/empty_table.rs | 2 + src/table/src/test_util/memtable.rs | 2 +- 10 files changed, 380 insertions(+), 165 deletions(-) diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 6cc2c78799..dc8b521793 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -85,6 +85,7 @@ impl RemoteCatalogManager { catalog_name: catalog_name.to_string(), backend: self.backend.clone(), engine_manager: self.engine_manager.clone(), + region_alive_keepers: self.region_alive_keepers.clone(), }) as _ } @@ -132,10 +133,17 @@ impl RemoteCatalogManager { increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0); + let region_alive_keepers = self.region_alive_keepers.clone(); joins.push(common_runtime::spawn_bg(async move { - let max_table_id = - initiate_schemas(node_id, backend, engine_manager, &catalog_name, catalog) - .await?; + let max_table_id = initiate_schemas( + node_id, + backend, + engine_manager, + &catalog_name, + catalog, + region_alive_keepers, + ) + .await?; info!( "Catalog name: {}, max table id allocated: {}", &catalog_name, max_table_id @@ -164,6 +172,7 @@ impl RemoteCatalogManager { self.engine_manager.clone(), catalog_name, schema_name, + self.region_alive_keepers.clone(), ); let catalog_provider = self.new_catalog_provider(catalog_name); @@ -209,6 +218,7 @@ fn new_schema_provider( engine_manager: TableEngineManagerRef, catalog_name: &str, schema_name: &str, + region_alive_keepers: Arc, ) -> SchemaProviderRef { Arc::new(RemoteSchemaProvider { catalog_name: catalog_name.to_string(), @@ -216,6 +226,7 @@ fn new_schema_provider( node_id, backend, engine_manager, + region_alive_keepers, }) as _ } @@ -249,6 +260,7 @@ async fn initiate_schemas( engine_manager: TableEngineManagerRef, catalog_name: &str, catalog: CatalogProviderRef, + region_alive_keepers: Arc, ) -> Result { let mut schemas = iter_remote_schemas(&backend, catalog_name).await; let mut joins = Vec::new(); @@ -268,6 +280,7 @@ async fn initiate_schemas( engine_manager.clone(), &catalog_name, &schema_name, + region_alive_keepers.clone(), ); catalog .register_schema(schema_name.clone(), schema.clone()) @@ -611,18 +624,7 @@ impl CatalogManager for RemoteCatalogManager { &[crate::metrics::db_label(catalog, schema)], ); schema_provider - .register_table(table_name.to_string(), request.table.clone()) - .await?; - - let table_ident = TableIdent { - catalog: request.catalog, - schema: request.schema, - table: request.table_name, - table_id: request.table_id, - engine: request.table.table_info().meta.engine.clone(), - }; - self.region_alive_keepers - .register_table(table_ident, request.table) + .register_table(table_name.to_string(), request.table) .await?; Ok(true) @@ -678,6 +680,7 @@ impl CatalogManager for RemoteCatalogManager { self.engine_manager.clone(), &catalog_name, &schema_name, + self.region_alive_keepers.clone(), ); catalog_provider .register_schema(schema_name, schema_provider) @@ -813,6 +816,7 @@ pub struct RemoteCatalogProvider { catalog_name: String, backend: KvBackendRef, engine_manager: TableEngineManagerRef, + region_alive_keepers: Arc, } impl RemoteCatalogProvider { @@ -821,12 +825,14 @@ impl RemoteCatalogProvider { backend: KvBackendRef, engine_manager: TableEngineManagerRef, node_id: u64, + region_alive_keepers: Arc, ) -> Self { Self { node_id, catalog_name, backend, engine_manager, + region_alive_keepers, } } @@ -844,6 +850,7 @@ impl RemoteCatalogProvider { node_id: self.node_id, backend: self.backend.clone(), engine_manager: self.engine_manager.clone(), + region_alive_keepers: self.region_alive_keepers.clone(), }; Arc::new(provider) as Arc<_> } @@ -906,6 +913,7 @@ pub struct RemoteSchemaProvider { node_id: u64, backend: KvBackendRef, engine_manager: TableEngineManagerRef, + region_alive_keepers: Arc, } impl RemoteSchemaProvider { @@ -915,6 +923,7 @@ impl RemoteSchemaProvider { node_id: u64, engine_manager: TableEngineManagerRef, backend: KvBackendRef, + region_alive_keepers: Arc, ) -> Self { Self { catalog_name, @@ -922,6 +931,7 @@ impl RemoteSchemaProvider { node_id, backend, engine_manager, + region_alive_keepers, } } @@ -1004,6 +1014,18 @@ impl SchemaProvider for RemoteSchemaProvider { &table_value.as_bytes().context(InvalidCatalogValueSnafu)?, ) .await?; + + let table_ident = TableIdent { + catalog: table_info.catalog_name.clone(), + schema: table_info.schema_name.clone(), + table: table_info.name.clone(), + table_id: table_info.ident.table_id, + engine: table_info.meta.engine.clone(), + }; + self.region_alive_keepers + .register_table(table_ident, table) + .await?; + debug!( "Successfully set catalog table entry, key: {}, table value: {:?}", table_key, table_value diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs index 327e846b3b..61daee4cf1 100644 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ b/src/catalog/src/remote/region_alive_keeper.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use async_trait::async_trait; @@ -30,7 +31,7 @@ use table::engine::manager::TableEngineManagerRef; use table::engine::{CloseTableResult, EngineContext, TableEngineRef}; use table::requests::CloseTableRequest; use table::TableRef; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::task::JoinHandle; use tokio::time::{Duration, Instant}; @@ -40,6 +41,8 @@ use crate::error::{Result, TableEngineNotFoundSnafu}; pub struct RegionAliveKeepers { table_engine_manager: TableEngineManagerRef, keepers: Arc>>>, + heartbeat_interval_millis: u64, + started: AtomicBool, /// The epoch when [RegionAliveKeepers] is created. It's used to get a monotonically non-decreasing /// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically @@ -49,23 +52,24 @@ pub struct RegionAliveKeepers { } impl RegionAliveKeepers { - pub fn new(table_engine_manager: TableEngineManagerRef) -> Self { + pub fn new( + table_engine_manager: TableEngineManagerRef, + heartbeat_interval_millis: u64, + ) -> Self { Self { table_engine_manager, keepers: Arc::new(Mutex::new(HashMap::new())), + heartbeat_interval_millis, + started: AtomicBool::new(false), epoch: Instant::now(), } } - async fn find_keeper(&self, table_ident: &TableIdent) -> Option> { + pub async fn find_keeper(&self, table_ident: &TableIdent) -> Option> { self.keepers.lock().await.get(table_ident).cloned() } - pub(crate) async fn register_table( - &self, - table_ident: TableIdent, - table: TableRef, - ) -> Result<()> { + pub async fn register_table(&self, table_ident: TableIdent, table: TableRef) -> Result<()> { let keeper = self.find_keeper(&table_ident).await; if keeper.is_some() { return Ok(()); @@ -78,17 +82,29 @@ impl RegionAliveKeepers { engine_name: &table_ident.engine, })?; - let keeper = Arc::new(RegionAliveKeeper::new(table_engine, table_ident.clone())); + let keeper = Arc::new(RegionAliveKeeper::new( + table_engine, + table_ident.clone(), + self.heartbeat_interval_millis, + )); for r in table.table_info().meta.region_numbers.iter() { keeper.register_region(*r).await; } - info!("Register RegionAliveKeeper for table {table_ident}"); - self.keepers.lock().await.insert(table_ident, keeper); + let mut keepers = self.keepers.lock().await; + keepers.insert(table_ident.clone(), keeper.clone()); + + if self.started.load(Ordering::Relaxed) { + keeper.start().await; + + info!("RegionAliveKeeper for table {table_ident} is started!"); + } else { + info!("RegionAliveKeeper for table {table_ident} is registered but not started yet!"); + } Ok(()) } - pub(crate) async fn deregister_table(&self, table_ident: &TableIdent) { + pub async fn deregister_table(&self, table_ident: &TableIdent) { if self.keepers.lock().await.remove(table_ident).is_some() { info!("Deregister RegionAliveKeeper for table {table_ident}"); } @@ -114,10 +130,17 @@ impl RegionAliveKeepers { keeper.deregister_region(region_ident.region_number).await } - pub async fn start(&self, heartbeat_interval_millis: u64) { - for keeper in self.keepers.lock().await.values() { - keeper.start(heartbeat_interval_millis).await; + pub async fn start(&self) { + let keepers = self.keepers.lock().await; + for keeper in keepers.values() { + keeper.start().await; } + self.started.store(true, Ordering::Relaxed); + + info!( + "RegionAliveKeepers for tables {:?} are started!", + keepers.keys().map(|x| x.to_string()).collect::>(), + ); } pub fn epoch(&self) -> Instant { @@ -171,18 +194,26 @@ impl HeartbeatResponseHandler for RegionAliveKeepers { /// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this /// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to /// countdown. -struct RegionAliveKeeper { +pub struct RegionAliveKeeper { table_engine: TableEngineRef, table_ident: TableIdent, countdown_task_handles: Arc>>>, + heartbeat_interval_millis: u64, + started: AtomicBool, } impl RegionAliveKeeper { - fn new(table_engine: TableEngineRef, table_ident: TableIdent) -> Self { + fn new( + table_engine: TableEngineRef, + table_ident: TableIdent, + heartbeat_interval_millis: u64, + ) -> Self { Self { table_engine, table_ident, countdown_task_handles: Arc::new(Mutex::new(HashMap::new())), + heartbeat_interval_millis, + started: AtomicBool::new(false), } } @@ -210,14 +241,22 @@ impl RegionAliveKeeper { || on_task_finished, )); - self.countdown_task_handles - .lock() - .await - .insert(region, handle); - info!( - "Register alive countdown for new region {region} in table {}", - self.table_ident - ) + let mut handles = self.countdown_task_handles.lock().await; + handles.insert(region, handle.clone()); + + if self.started.load(Ordering::Relaxed) { + handle.start(self.heartbeat_interval_millis).await; + + info!( + "Region alive countdown for region {region} in table {} is started!", + self.table_ident + ); + } else { + info!( + "Region alive countdown for region {region} in table {} is registered but not started yet!", + self.table_ident + ); + } } async fn deregister_region(&self, region: RegionNumber) { @@ -235,14 +274,18 @@ impl RegionAliveKeeper { } } - async fn start(&self, heartbeat_interval_millis: u64) { - for handle in self.countdown_task_handles.lock().await.values() { - handle.start(heartbeat_interval_millis).await; + async fn start(&self) { + let handles = self.countdown_task_handles.lock().await; + for handle in handles.values() { + handle.start(self.heartbeat_interval_millis).await; } + + self.started.store(true, Ordering::Relaxed); info!( - "RegionAliveKeeper for table {} is started!", + "Region alive countdowns for regions {:?} in table {} are started!", + handles.keys().copied().collect::>(), self.table_ident - ) + ); } async fn keep_lived(&self, designated_regions: Vec, deadline: Instant) { @@ -253,15 +296,24 @@ impl RegionAliveKeeper { // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it. } } + + pub async fn deadline(&self, region: RegionNumber) -> Option { + let mut deadline = None; + if let Some(handle) = self.find_handle(®ion).await { + let (s, r) = oneshot::channel(); + if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() { + deadline = r.await.ok() + } + } + deadline + } } #[derive(Debug)] enum CountdownCommand { Start(u64), Reset(Instant), - - #[cfg(test)] - Deadline(tokio::sync::oneshot::Sender), + Deadline(oneshot::Sender), } struct CountdownTaskHandle { @@ -362,7 +414,10 @@ impl CountdownTask { }, Some(CountdownCommand::Reset(deadline)) => { if countdown.deadline() < deadline { - debug!("Reset deadline to region {region} of table {table_ident} to {deadline:?}"); + debug!( + "Reset deadline of region {region} of table {table_ident} to approximately {} seconds later", + (deadline - Instant::now()).as_secs_f32(), + ); countdown.set(tokio::time::sleep_until(deadline)); } // Else the countdown could be either: @@ -378,10 +433,8 @@ impl CountdownTask { ); break; }, - - #[cfg(test)] Some(CountdownCommand::Deadline(tx)) => { - tx.send(countdown.deadline()).unwrap() + let _ = tx.send(countdown.deadline()); } } } @@ -433,7 +486,6 @@ mod test { use table::engine::{TableEngine, TableReference}; use table::requests::{CreateTableRequest, TableOptions}; use table::test_util::EmptyTable; - use tokio::sync::oneshot; use super::*; use crate::remote::mock::MockTableEngine; @@ -441,7 +493,7 @@ mod test { async fn prepare_keepers() -> (TableIdent, RegionAliveKeepers) { let table_engine = Arc::new(MockTableEngine::default()); let table_engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine)); - let keepers = RegionAliveKeepers::new(table_engine_manager); + let keepers = RegionAliveKeepers::new(table_engine_manager, 5000); let catalog = "my_catalog"; let schema = "my_schema"; @@ -483,7 +535,7 @@ mod test { async fn test_handle_heartbeat_response() { let (table_ident, keepers) = prepare_keepers().await; - keepers.start(5000).await; + keepers.start().await; let startup_protection_until = Instant::now() + Duration::from_secs(21); let duration_since_epoch = (Instant::now() - keepers.epoch).as_millis() as _; @@ -517,8 +569,7 @@ mod test { keep_alive_until: Instant, is_kept_live: bool, ) { - let handles = keeper.countdown_task_handles.lock().await; - let deadline = deadline(&handles.get(®ion_number).unwrap().tx).await; + let deadline = keeper.deadline(region_number).await.unwrap(); if is_kept_live { assert!(deadline > startup_protection_until && deadline == keep_alive_until); } else { @@ -555,11 +606,16 @@ mod test { }) .await; - keepers.start(5000).await; + keepers.start().await; for keeper in keepers.keepers.lock().await.values() { - for handle in keeper.countdown_task_handles.lock().await.values() { + let regions = { + let handles = keeper.countdown_task_handles.lock().await; + handles.keys().copied().collect::>() + }; + for region in regions { // assert countdown tasks are started - assert!(deadline(&handle.tx).await <= Instant::now() + Duration::from_secs(20)); + let deadline = keeper.deadline(region).await.unwrap(); + assert!(deadline <= Instant::now() + Duration::from_secs(20)); } } @@ -598,22 +654,13 @@ mod test { table_id: 1024, engine: "mito".to_string(), }; - let keeper = RegionAliveKeeper::new(table_engine, table_ident); + let keeper = RegionAliveKeeper::new(table_engine, table_ident, 1000); let region = 1; assert!(keeper.find_handle(®ion).await.is_none()); keeper.register_region(region).await; assert!(keeper.find_handle(®ion).await.is_some()); - let sender = &keeper - .countdown_task_handles - .lock() - .await - .get(®ion) - .unwrap() - .tx - .clone(); - let ten_seconds_later = || Instant::now() + Duration::from_secs(10); keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await; @@ -622,12 +669,12 @@ mod test { let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29); // assert if keeper is not started, keep_lived is of no use - assert!(deadline(sender).await > far_future); + assert!(keeper.deadline(region).await.unwrap() > far_future); - keeper.start(1000).await; + keeper.start().await; keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await; // assert keep_lived works if keeper is started - assert!(deadline(sender).await <= ten_seconds_later()); + assert!(keeper.deadline(region).await.unwrap() <= ten_seconds_later()); keeper.deregister_region(region).await; assert!(keeper.find_handle(®ion).await.is_none()); @@ -726,6 +773,12 @@ mod test { task.run().await; }); + async fn deadline(tx: &mpsc::Sender) -> Instant { + let (s, r) = oneshot::channel(); + tx.send(CountdownCommand::Deadline(s)).await.unwrap(); + r.await.unwrap() + } + // if countdown task is not started, its deadline is set to far future assert!(deadline(&tx).await > Instant::now() + Duration::from_secs(86400 * 365 * 29)); @@ -747,10 +800,4 @@ mod test { tokio::time::sleep(Duration::from_millis(2000)).await; assert!(!table_engine.table_exists(ctx, &table_ref)); } - - async fn deadline(tx: &mpsc::Sender) -> Instant { - let (s, r) = oneshot::channel(); - tx.send(CountdownCommand::Deadline(s)).await.unwrap(); - r.await.unwrap() - } } diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index 776c6be6c9..9d3f539f83 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -19,6 +19,7 @@ mod tests { use std::assert_matches::assert_matches; use std::collections::HashSet; use std::sync::Arc; + use std::time::Duration; use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; use catalog::remote::mock::{MockKvBackend, MockTableEngine}; @@ -29,11 +30,27 @@ mod tests { }; use catalog::{CatalogManager, RegisterTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE}; + use common_meta::ident::TableIdent; use datatypes::schema::RawSchema; use futures_util::StreamExt; use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef}; use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; + use table::test_util::EmptyTable; + use tokio::time::Instant; + + struct TestingComponents { + kv_backend: KvBackendRef, + catalog_manager: Arc, + table_engine_manager: TableEngineManagerRef, + region_alive_keepers: Arc, + } + + impl TestingComponents { + fn table_engine(&self) -> TableEngineRef { + self.table_engine_manager.engine(MITO_ENGINE).unwrap() + } + } #[tokio::test] async fn test_backend() { @@ -121,14 +138,7 @@ mod tests { assert!(ret.is_none()); } - async fn prepare_components( - node_id: u64, - ) -> ( - KvBackendRef, - TableEngineRef, - Arc, - TableEngineManagerRef, - ) { + async fn prepare_components(node_id: u64) -> TestingComponents { let cached_backend = Arc::new(CachedMetaKvBackend::wrap( Arc::new(MockKvBackend::default()), )); @@ -136,30 +146,34 @@ mod tests { let table_engine = Arc::new(MockTableEngine::default()); let engine_manager = Arc::new(MemoryTableEngineManager::alias( MITO_ENGINE.to_string(), - table_engine.clone(), + table_engine, )); + let region_alive_keepers = Arc::new(RegionAliveKeepers::new(engine_manager.clone(), 5000)); + let catalog_manager = RemoteCatalogManager::new( engine_manager.clone(), node_id, cached_backend.clone(), - Arc::new(RegionAliveKeepers::new(engine_manager.clone())), + region_alive_keepers.clone(), ); catalog_manager.start().await.unwrap(); - ( - cached_backend, - table_engine, - Arc::new(catalog_manager), - engine_manager as Arc<_>, - ) + TestingComponents { + kv_backend: cached_backend, + catalog_manager: Arc::new(catalog_manager), + table_engine_manager: engine_manager, + region_alive_keepers, + } } #[tokio::test] async fn test_remote_catalog_default() { common_telemetry::init_default_ut_logging(); let node_id = 42; - let (_, _, catalog_manager, _) = prepare_components(node_id).await; + let TestingComponents { + catalog_manager, .. + } = prepare_components(node_id).await; assert_eq!( vec![DEFAULT_CATALOG_NAME.to_string()], catalog_manager.catalog_names().await.unwrap() @@ -180,14 +194,16 @@ mod tests { async fn test_remote_catalog_register_nonexistent() { common_telemetry::init_default_ut_logging(); let node_id = 42; - let (_, table_engine, catalog_manager, _) = prepare_components(node_id).await; + let components = prepare_components(node_id).await; + // register a new table with an nonexistent catalog let catalog_name = "nonexistent_catalog".to_string(); let schema_name = "nonexistent_schema".to_string(); let table_name = "fail_table".to_string(); // this schema has no effect let table_schema = RawSchema::new(vec![]); - let table = table_engine + let table = components + .table_engine() .create_table( &EngineContext {}, CreateTableRequest { @@ -213,7 +229,7 @@ mod tests { table_id: 1, table, }; - let res = catalog_manager.register_table(reg_req).await; + let res = components.catalog_manager.register_table(reg_req).await; // because nonexistent_catalog does not exist yet. assert_matches!( @@ -225,7 +241,8 @@ mod tests { #[tokio::test] async fn test_register_table() { let node_id = 42; - let (_, table_engine, catalog_manager, _) = prepare_components(node_id).await; + let components = prepare_components(node_id).await; + let catalog_manager = &components.catalog_manager; let default_catalog = catalog_manager .catalog(DEFAULT_CATALOG_NAME) .await @@ -249,7 +266,8 @@ mod tests { let table_id = 1; // this schema has no effect let table_schema = RawSchema::new(vec![]); - let table = table_engine + let table = components + .table_engine() .create_table( &EngineContext {}, CreateTableRequest { @@ -285,8 +303,10 @@ mod tests { #[tokio::test] async fn test_register_catalog_schema_table() { let node_id = 42; - let (backend, table_engine, catalog_manager, engine_manager) = - prepare_components(node_id).await; + let components = prepare_components(node_id).await; + let backend = &components.kv_backend; + let catalog_manager = components.catalog_manager.clone(); + let engine_manager = components.table_engine_manager.clone(); let catalog_name = "test_catalog".to_string(); let schema_name = "nonexistent_schema".to_string(); @@ -295,6 +315,7 @@ mod tests { backend.clone(), engine_manager.clone(), node_id, + components.region_alive_keepers.clone(), )); // register catalog to catalog manager @@ -308,7 +329,8 @@ mod tests { HashSet::from_iter(catalog_manager.catalog_names().await.unwrap().into_iter()) ); - let table_to_register = table_engine + let table_to_register = components + .table_engine() .create_table( &EngineContext {}, CreateTableRequest { @@ -355,6 +377,7 @@ mod tests { node_id, engine_manager, backend.clone(), + components.region_alive_keepers.clone(), )); let prev = new_catalog @@ -374,4 +397,94 @@ mod tests { .collect() ) } + + #[tokio::test] + async fn test_register_table_before_and_after_region_alive_keeper_started() { + let components = prepare_components(42).await; + let catalog_manager = &components.catalog_manager; + let region_alive_keepers = &components.region_alive_keepers; + + let table_before = TableIdent { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: "table_before".to_string(), + table_id: 1, + engine: MITO_ENGINE.to_string(), + }; + let request = RegisterTableRequest { + catalog: table_before.catalog.clone(), + schema: table_before.schema.clone(), + table_name: table_before.table.clone(), + table_id: table_before.table_id, + table: Arc::new(EmptyTable::new(CreateTableRequest { + id: table_before.table_id, + catalog_name: table_before.catalog.clone(), + schema_name: table_before.schema.clone(), + table_name: table_before.table.clone(), + desc: None, + schema: RawSchema::new(vec![]), + region_numbers: vec![0], + primary_key_indices: vec![], + create_if_not_exists: false, + table_options: Default::default(), + engine: MITO_ENGINE.to_string(), + })), + }; + assert!(catalog_manager.register_table(request).await.unwrap()); + + let keeper = region_alive_keepers + .find_keeper(&table_before) + .await + .unwrap(); + let deadline = keeper.deadline(0).await.unwrap(); + let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29); + // assert region alive countdown is not started + assert!(deadline > far_future); + + region_alive_keepers.start().await; + + let table_after = TableIdent { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table: "table_after".to_string(), + table_id: 2, + engine: MITO_ENGINE.to_string(), + }; + let request = RegisterTableRequest { + catalog: table_after.catalog.clone(), + schema: table_after.schema.clone(), + table_name: table_after.table.clone(), + table_id: table_after.table_id, + table: Arc::new(EmptyTable::new(CreateTableRequest { + id: table_after.table_id, + catalog_name: table_after.catalog.clone(), + schema_name: table_after.schema.clone(), + table_name: table_after.table.clone(), + desc: None, + schema: RawSchema::new(vec![]), + region_numbers: vec![0], + primary_key_indices: vec![], + create_if_not_exists: false, + table_options: Default::default(), + engine: MITO_ENGINE.to_string(), + })), + }; + assert!(catalog_manager.register_table(request).await.unwrap()); + + let keeper = region_alive_keepers + .find_keeper(&table_after) + .await + .unwrap(); + let deadline = keeper.deadline(0).await.unwrap(); + // assert countdown is started for the table registered after [RegionAliveKeepers] started + assert!(deadline <= Instant::now() + Duration::from_secs(20)); + + let keeper = region_alive_keepers + .find_keeper(&table_before) + .await + .unwrap(); + let deadline = keeper.deadline(0).await.unwrap(); + // assert countdown is started for the table registered before [RegionAliveKeepers] started, too + assert!(deadline <= Instant::now() + Duration::from_secs(20)); + } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index efd0c1ec20..6c9e3e0365 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -30,6 +30,7 @@ use snafu::ResultExt; use tokio::sync::mpsc; use tokio::time::Instant; +use crate::datanode::DatanodeOptions; use crate::error::{self, MetaClientInitSnafu, Result}; pub(crate) mod handler; @@ -57,23 +58,23 @@ impl HeartbeatTask { /// Create a new heartbeat task instance. pub fn new( node_id: u64, - server_addr: String, - server_hostname: Option, + opts: &DatanodeOptions, meta_client: Arc, catalog_manager: CatalogManagerRef, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + heartbeat_interval_millis: u64, region_alive_keepers: Arc, ) -> Self { Self { node_id, // We use datanode's start time millis as the node's epoch. node_epoch: common_time::util::current_time_millis() as u64, - server_addr, - server_hostname, + server_addr: opts.rpc_addr.clone(), + server_hostname: opts.rpc_hostname.clone(), running: Arc::new(AtomicBool::new(false)), meta_client, catalog_manager, - interval: 5_000, // default interval is set to 5 secs + interval: heartbeat_interval_millis, resp_handler_executor, region_alive_keepers, } @@ -140,7 +141,7 @@ impl HeartbeatTask { let addr = resolve_addr(&self.server_addr, &self.server_hostname); info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); - self.region_alive_keepers.start(interval).await; + self.region_alive_keepers.start().await; let meta_client = self.meta_client.clone(); let catalog_manager_clone = self.catalog_manager.clone(); diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index 1dc0157fe7..abc492d40f 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -23,6 +23,7 @@ use common_meta::heartbeat::handler::{ HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, }; use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; +use common_meta::RegionIdent; use common_telemetry::{error, info, warn}; use snafu::ResultExt; use store_api::storage::RegionNumber; @@ -55,25 +56,8 @@ impl HeartbeatResponseHandler for CloseRegionHandler { let mailbox = ctx.mailbox.clone(); let self_ref = Arc::new(self.clone()); - let region_alive_keepers = self.region_alive_keepers.clone(); common_runtime::spawn_bg(async move { - let table_ident = ®ion_ident.table_ident; - let table_ref = TableReference::full( - &table_ident.catalog, - &table_ident.schema, - &table_ident.table, - ); - let result = self_ref - .close_region_inner( - table_ident.engine.clone(), - &table_ref, - vec![region_ident.region_number], - ) - .await; - - if matches!(result, Ok(true)) { - region_alive_keepers.deregister_region(®ion_ident).await; - } + let result = self_ref.close_region_inner(region_ident).await; if let Err(e) = mailbox .send((meta, CloseRegionHandler::map_result(result))) @@ -152,20 +136,21 @@ impl CloseRegionHandler { Ok(true) } - async fn close_region_inner( - &self, - engine: String, - table_ref: &TableReference<'_>, - region_numbers: Vec, - ) -> Result { - let engine = - self.table_engine_manager - .engine(&engine) - .context(error::TableEngineNotFoundSnafu { - engine_name: &engine, - })?; + async fn close_region_inner(&self, region_ident: RegionIdent) -> Result { + let table_ident = ®ion_ident.table_ident; + let engine_name = &table_ident.engine; + let engine = self + .table_engine_manager + .engine(engine_name) + .context(error::TableEngineNotFoundSnafu { engine_name })?; let ctx = EngineContext::default(); + let table_ref = &TableReference::full( + &table_ident.catalog, + &table_ident.schema, + &table_ident.table, + ); + let region_numbers = vec![region_ident.region_number]; if self .regions_closed( table_ref.catalog, @@ -203,7 +188,15 @@ impl CloseRegionHandler { })? { CloseTableResult::NotFound | CloseTableResult::Released(_) => { // Deregister table if The table released. - self.deregister_table(table_ref).await + let deregistered = self.deregister_table(table_ref).await?; + + if deregistered { + self.region_alive_keepers + .deregister_table(table_ident) + .await; + } + + Ok(deregistered) } CloseTableResult::PartialClosed(regions) => { // Requires caller to update the region_numbers @@ -211,6 +204,11 @@ impl CloseRegionHandler { "Close partial regions: {:?} in table: {}", regions, table_ref ); + + self.region_alive_keepers + .deregister_region(®ion_ident) + .await; + Ok(true) } }; diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 5e5a63006f..3cd1e57e6d 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -195,8 +195,12 @@ impl Instance { let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone())); - let region_alive_keepers = - Arc::new(RegionAliveKeepers::new(engine_manager.clone())); + let heartbeat_interval_millis = 5000; + + let region_alive_keepers = Arc::new(RegionAliveKeepers::new( + engine_manager.clone(), + heartbeat_interval_millis, + )); let catalog_manager = Arc::new(RemoteCatalogManager::new( engine_manager.clone(), @@ -222,11 +226,11 @@ impl Instance { let heartbeat_task = Some(HeartbeatTask::new( opts.node_id.context(MissingNodeIdSnafu)?, - opts.rpc_addr.clone(), - opts.rpc_hostname.clone(), + opts, meta_client, catalog_manager.clone(), Arc::new(handlers_executor), + heartbeat_interval_millis, region_alive_keepers, )); diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 1796a6c875..5b4ba4de3d 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -14,6 +14,7 @@ use std::assert_matches::assert_matches; use std::sync::Arc; +use std::time::Duration; use api::v1::greptime_request::Request as GrpcRequest; use api::v1::meta::HeartbeatResponse; @@ -32,8 +33,10 @@ use datatypes::prelude::ConcreteDataType; use servers::query_handler::grpc::GrpcQueryHandler; use session::context::QueryContext; use table::engine::manager::TableEngineManagerRef; +use table::TableRef; use test_util::MockInstance; use tokio::sync::mpsc::{self, Receiver}; +use tokio::time::Instant; use crate::heartbeat::handler::close_region::CloseRegionHandler; use crate::heartbeat::handler::open_region::OpenRegionHandler; @@ -64,7 +67,7 @@ async fn test_close_region_handler() { CloseRegionHandler::new( catalog_manager_ref.clone(), engine_manager_ref.clone(), - Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone())), + Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone(), 5000)), ), )])); @@ -134,43 +137,57 @@ async fn test_open_region_handler() { .. } = prepare_handler_test("test_open_region_handler").await; - let region_alive_keeper = Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone())); + let region_alive_keepers = Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone(), 5000)); + region_alive_keepers.start().await; let executor = Arc::new(HandlerGroupExecutor::new(vec![ Arc::new(OpenRegionHandler::new( catalog_manager_ref.clone(), engine_manager_ref.clone(), - region_alive_keeper.clone(), + region_alive_keepers.clone(), )), Arc::new(CloseRegionHandler::new( catalog_manager_ref.clone(), engine_manager_ref.clone(), - region_alive_keeper, + region_alive_keepers.clone(), )), ])); - prepare_table(instance.inner()).await; + let instruction = open_region_instruction(); + let Instruction::OpenRegion(region_ident) = instruction.clone() else { unreachable!() }; + let table_ident = ®ion_ident.table_ident; + + let table = prepare_table(instance.inner()).await; + region_alive_keepers + .register_table(table_ident.clone(), table) + .await + .unwrap(); // Opens a opened table - handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()).await; + handle_instruction(executor.clone(), mailbox.clone(), instruction.clone()).await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( reply, InstructionReply::OpenRegion(SimpleReply { result: true, .. }) ); + let keeper = region_alive_keepers.find_keeper(table_ident).await.unwrap(); + let deadline = keeper.deadline(0).await.unwrap(); + assert!(deadline <= Instant::now() + Duration::from_secs(20)); + // Opens a non-exist table + let non_exist_table_ident = TableIdent { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table: "non-exist".to_string(), + table_id: 2024, + engine: "mito".to_string(), + }; handle_instruction( executor.clone(), mailbox.clone(), Instruction::OpenRegion(RegionIdent { - table_ident: TableIdent { - catalog: "greptime".to_string(), - schema: "public".to_string(), - table: "non-exist".to_string(), - table_id: 2024, - engine: "mito".to_string(), - }, + table_ident: non_exist_table_ident.clone(), region_number: 0, cluster_id: 1, datanode_id: 2, @@ -183,6 +200,11 @@ async fn test_open_region_handler() { InstructionReply::OpenRegion(SimpleReply { result: false, .. }) ); + assert!(region_alive_keepers + .find_keeper(&non_exist_table_ident) + .await + .is_none()); + // Closes demo table handle_instruction( executor.clone(), @@ -197,8 +219,13 @@ async fn test_open_region_handler() { ); assert_test_table_not_found(instance.inner()).await; + assert!(region_alive_keepers + .find_keeper(table_ident) + .await + .is_none()); + // Opens demo table - handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()).await; + handle_instruction(executor.clone(), mailbox.clone(), instruction).await; let (_, reply) = rx.recv().await.unwrap(); assert_matches!( reply, @@ -275,10 +302,10 @@ fn open_region_instruction() -> Instruction { }) } -async fn prepare_table(instance: &Instance) { +async fn prepare_table(instance: &Instance) -> TableRef { test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype()) .await - .unwrap(); + .unwrap() } async fn assert_test_table_not_found(instance: &Instance) { diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 91e344a00d..d59f9f5670 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -22,6 +22,7 @@ use servers::Mode; use snafu::ResultExt; use table::engine::{EngineContext, TableEngineRef}; use table::requests::{CreateTableRequest, TableOptions}; +use table::TableRef; use crate::datanode::{ DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig, @@ -84,7 +85,7 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) pub(crate) async fn create_test_table( instance: &Instance, ts_type: ConcreteDataType, -) -> Result<()> { +) -> Result { let column_schemas = vec![ ColumnSchema::new("host", ConcreteDataType::string_datatype(), true), ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true), @@ -125,8 +126,8 @@ pub(crate) async fn create_test_table( .unwrap() .unwrap(); schema_provider - .register_table(table_name.to_string(), table) + .register_table(table_name.to_string(), table.clone()) .await .unwrap(); - Ok(()) + Ok(table) } diff --git a/src/table/src/test_util/empty_table.rs b/src/table/src/test_util/empty_table.rs index 679ace6887..0503515642 100644 --- a/src/table/src/test_util/empty_table.rs +++ b/src/table/src/test_util/empty_table.rs @@ -37,8 +37,10 @@ impl EmptyTable { .next_column_id(0) .options(req.table_options) .region_numbers(req.region_numbers) + .engine(req.engine) .build(); let table_info = TableInfoBuilder::default() + .table_id(req.id) .catalog_name(req.catalog_name) .schema_name(req.schema_name) .name(req.table_name) diff --git a/src/table/src/test_util/memtable.rs b/src/table/src/test_util/memtable.rs index f2e942ce8d..3d27650f20 100644 --- a/src/table/src/test_util/memtable.rs +++ b/src/table/src/test_util/memtable.rs @@ -56,7 +56,7 @@ impl MemTable { Self::new_with_catalog( table_name, recordbatch, - 0, + 1, "greptime".to_string(), "public".to_string(), regions,