feat: start region alive keepers (#1796)

* feat: start region alive keepers
This commit is contained in:
LFC
2023-06-20 15:45:29 +08:00
committed by GitHub
parent 4fdee5ea3c
commit cbc2620a59
10 changed files with 380 additions and 165 deletions

View File

@@ -85,6 +85,7 @@ impl RemoteCatalogManager {
catalog_name: catalog_name.to_string(),
backend: self.backend.clone(),
engine_manager: self.engine_manager.clone(),
region_alive_keepers: self.region_alive_keepers.clone(),
}) as _
}
@@ -132,10 +133,17 @@ impl RemoteCatalogManager {
increment_gauge!(crate::metrics::METRIC_CATALOG_MANAGER_CATALOG_COUNT, 1.0);
let region_alive_keepers = self.region_alive_keepers.clone();
joins.push(common_runtime::spawn_bg(async move {
let max_table_id =
initiate_schemas(node_id, backend, engine_manager, &catalog_name, catalog)
.await?;
let max_table_id = initiate_schemas(
node_id,
backend,
engine_manager,
&catalog_name,
catalog,
region_alive_keepers,
)
.await?;
info!(
"Catalog name: {}, max table id allocated: {}",
&catalog_name, max_table_id
@@ -164,6 +172,7 @@ impl RemoteCatalogManager {
self.engine_manager.clone(),
catalog_name,
schema_name,
self.region_alive_keepers.clone(),
);
let catalog_provider = self.new_catalog_provider(catalog_name);
@@ -209,6 +218,7 @@ fn new_schema_provider(
engine_manager: TableEngineManagerRef,
catalog_name: &str,
schema_name: &str,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> SchemaProviderRef {
Arc::new(RemoteSchemaProvider {
catalog_name: catalog_name.to_string(),
@@ -216,6 +226,7 @@ fn new_schema_provider(
node_id,
backend,
engine_manager,
region_alive_keepers,
}) as _
}
@@ -249,6 +260,7 @@ async fn initiate_schemas(
engine_manager: TableEngineManagerRef,
catalog_name: &str,
catalog: CatalogProviderRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Result<u32> {
let mut schemas = iter_remote_schemas(&backend, catalog_name).await;
let mut joins = Vec::new();
@@ -268,6 +280,7 @@ async fn initiate_schemas(
engine_manager.clone(),
&catalog_name,
&schema_name,
region_alive_keepers.clone(),
);
catalog
.register_schema(schema_name.clone(), schema.clone())
@@ -611,18 +624,7 @@ impl CatalogManager for RemoteCatalogManager {
&[crate::metrics::db_label(catalog, schema)],
);
schema_provider
.register_table(table_name.to_string(), request.table.clone())
.await?;
let table_ident = TableIdent {
catalog: request.catalog,
schema: request.schema,
table: request.table_name,
table_id: request.table_id,
engine: request.table.table_info().meta.engine.clone(),
};
self.region_alive_keepers
.register_table(table_ident, request.table)
.register_table(table_name.to_string(), request.table)
.await?;
Ok(true)
@@ -678,6 +680,7 @@ impl CatalogManager for RemoteCatalogManager {
self.engine_manager.clone(),
&catalog_name,
&schema_name,
self.region_alive_keepers.clone(),
);
catalog_provider
.register_schema(schema_name, schema_provider)
@@ -813,6 +816,7 @@ pub struct RemoteCatalogProvider {
catalog_name: String,
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
}
impl RemoteCatalogProvider {
@@ -821,12 +825,14 @@ impl RemoteCatalogProvider {
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
node_id: u64,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Self {
Self {
node_id,
catalog_name,
backend,
engine_manager,
region_alive_keepers,
}
}
@@ -844,6 +850,7 @@ impl RemoteCatalogProvider {
node_id: self.node_id,
backend: self.backend.clone(),
engine_manager: self.engine_manager.clone(),
region_alive_keepers: self.region_alive_keepers.clone(),
};
Arc::new(provider) as Arc<_>
}
@@ -906,6 +913,7 @@ pub struct RemoteSchemaProvider {
node_id: u64,
backend: KvBackendRef,
engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
}
impl RemoteSchemaProvider {
@@ -915,6 +923,7 @@ impl RemoteSchemaProvider {
node_id: u64,
engine_manager: TableEngineManagerRef,
backend: KvBackendRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Self {
Self {
catalog_name,
@@ -922,6 +931,7 @@ impl RemoteSchemaProvider {
node_id,
backend,
engine_manager,
region_alive_keepers,
}
}
@@ -1004,6 +1014,18 @@ impl SchemaProvider for RemoteSchemaProvider {
&table_value.as_bytes().context(InvalidCatalogValueSnafu)?,
)
.await?;
let table_ident = TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id: table_info.ident.table_id,
engine: table_info.meta.engine.clone(),
};
self.region_alive_keepers
.register_table(table_ident, table)
.await?;
debug!(
"Successfully set catalog table entry, key: {}, table value: {:?}",
table_key, table_value

View File

@@ -14,6 +14,7 @@
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
@@ -30,7 +31,7 @@ use table::engine::manager::TableEngineManagerRef;
use table::engine::{CloseTableResult, EngineContext, TableEngineRef};
use table::requests::CloseTableRequest;
use table::TableRef;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
@@ -40,6 +41,8 @@ use crate::error::{Result, TableEngineNotFoundSnafu};
pub struct RegionAliveKeepers {
table_engine_manager: TableEngineManagerRef,
keepers: Arc<Mutex<HashMap<TableIdent, Arc<RegionAliveKeeper>>>>,
heartbeat_interval_millis: u64,
started: AtomicBool,
/// The epoch when [RegionAliveKeepers] is created. It's used to get a monotonically non-decreasing
/// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically
@@ -49,23 +52,24 @@ pub struct RegionAliveKeepers {
}
impl RegionAliveKeepers {
pub fn new(table_engine_manager: TableEngineManagerRef) -> Self {
pub fn new(
table_engine_manager: TableEngineManagerRef,
heartbeat_interval_millis: u64,
) -> Self {
Self {
table_engine_manager,
keepers: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: AtomicBool::new(false),
epoch: Instant::now(),
}
}
async fn find_keeper(&self, table_ident: &TableIdent) -> Option<Arc<RegionAliveKeeper>> {
pub async fn find_keeper(&self, table_ident: &TableIdent) -> Option<Arc<RegionAliveKeeper>> {
self.keepers.lock().await.get(table_ident).cloned()
}
pub(crate) async fn register_table(
&self,
table_ident: TableIdent,
table: TableRef,
) -> Result<()> {
pub async fn register_table(&self, table_ident: TableIdent, table: TableRef) -> Result<()> {
let keeper = self.find_keeper(&table_ident).await;
if keeper.is_some() {
return Ok(());
@@ -78,17 +82,29 @@ impl RegionAliveKeepers {
engine_name: &table_ident.engine,
})?;
let keeper = Arc::new(RegionAliveKeeper::new(table_engine, table_ident.clone()));
let keeper = Arc::new(RegionAliveKeeper::new(
table_engine,
table_ident.clone(),
self.heartbeat_interval_millis,
));
for r in table.table_info().meta.region_numbers.iter() {
keeper.register_region(*r).await;
}
info!("Register RegionAliveKeeper for table {table_ident}");
self.keepers.lock().await.insert(table_ident, keeper);
let mut keepers = self.keepers.lock().await;
keepers.insert(table_ident.clone(), keeper.clone());
if self.started.load(Ordering::Relaxed) {
keeper.start().await;
info!("RegionAliveKeeper for table {table_ident} is started!");
} else {
info!("RegionAliveKeeper for table {table_ident} is registered but not started yet!");
}
Ok(())
}
pub(crate) async fn deregister_table(&self, table_ident: &TableIdent) {
pub async fn deregister_table(&self, table_ident: &TableIdent) {
if self.keepers.lock().await.remove(table_ident).is_some() {
info!("Deregister RegionAliveKeeper for table {table_ident}");
}
@@ -114,10 +130,17 @@ impl RegionAliveKeepers {
keeper.deregister_region(region_ident.region_number).await
}
pub async fn start(&self, heartbeat_interval_millis: u64) {
for keeper in self.keepers.lock().await.values() {
keeper.start(heartbeat_interval_millis).await;
pub async fn start(&self) {
let keepers = self.keepers.lock().await;
for keeper in keepers.values() {
keeper.start().await;
}
self.started.store(true, Ordering::Relaxed);
info!(
"RegionAliveKeepers for tables {:?} are started!",
keepers.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
);
}
pub fn epoch(&self) -> Instant {
@@ -171,18 +194,26 @@ impl HeartbeatResponseHandler for RegionAliveKeepers {
/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this
/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to
/// countdown.
struct RegionAliveKeeper {
pub struct RegionAliveKeeper {
table_engine: TableEngineRef,
table_ident: TableIdent,
countdown_task_handles: Arc<Mutex<HashMap<RegionNumber, Arc<CountdownTaskHandle>>>>,
heartbeat_interval_millis: u64,
started: AtomicBool,
}
impl RegionAliveKeeper {
fn new(table_engine: TableEngineRef, table_ident: TableIdent) -> Self {
fn new(
table_engine: TableEngineRef,
table_ident: TableIdent,
heartbeat_interval_millis: u64,
) -> Self {
Self {
table_engine,
table_ident,
countdown_task_handles: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: AtomicBool::new(false),
}
}
@@ -210,14 +241,22 @@ impl RegionAliveKeeper {
|| on_task_finished,
));
self.countdown_task_handles
.lock()
.await
.insert(region, handle);
info!(
"Register alive countdown for new region {region} in table {}",
self.table_ident
)
let mut handles = self.countdown_task_handles.lock().await;
handles.insert(region, handle.clone());
if self.started.load(Ordering::Relaxed) {
handle.start(self.heartbeat_interval_millis).await;
info!(
"Region alive countdown for region {region} in table {} is started!",
self.table_ident
);
} else {
info!(
"Region alive countdown for region {region} in table {} is registered but not started yet!",
self.table_ident
);
}
}
async fn deregister_region(&self, region: RegionNumber) {
@@ -235,14 +274,18 @@ impl RegionAliveKeeper {
}
}
async fn start(&self, heartbeat_interval_millis: u64) {
for handle in self.countdown_task_handles.lock().await.values() {
handle.start(heartbeat_interval_millis).await;
async fn start(&self) {
let handles = self.countdown_task_handles.lock().await;
for handle in handles.values() {
handle.start(self.heartbeat_interval_millis).await;
}
self.started.store(true, Ordering::Relaxed);
info!(
"RegionAliveKeeper for table {} is started!",
"Region alive countdowns for regions {:?} in table {} are started!",
handles.keys().copied().collect::<Vec<_>>(),
self.table_ident
)
);
}
async fn keep_lived(&self, designated_regions: Vec<RegionNumber>, deadline: Instant) {
@@ -253,15 +296,24 @@ impl RegionAliveKeeper {
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
}
pub async fn deadline(&self, region: RegionNumber) -> Option<Instant> {
let mut deadline = None;
if let Some(handle) = self.find_handle(&region).await {
let (s, r) = oneshot::channel();
if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() {
deadline = r.await.ok()
}
}
deadline
}
}
#[derive(Debug)]
enum CountdownCommand {
Start(u64),
Reset(Instant),
#[cfg(test)]
Deadline(tokio::sync::oneshot::Sender<Instant>),
Deadline(oneshot::Sender<Instant>),
}
struct CountdownTaskHandle {
@@ -362,7 +414,10 @@ impl CountdownTask {
},
Some(CountdownCommand::Reset(deadline)) => {
if countdown.deadline() < deadline {
debug!("Reset deadline to region {region} of table {table_ident} to {deadline:?}");
debug!(
"Reset deadline of region {region} of table {table_ident} to approximately {} seconds later",
(deadline - Instant::now()).as_secs_f32(),
);
countdown.set(tokio::time::sleep_until(deadline));
}
// Else the countdown could be either:
@@ -378,10 +433,8 @@ impl CountdownTask {
);
break;
},
#[cfg(test)]
Some(CountdownCommand::Deadline(tx)) => {
tx.send(countdown.deadline()).unwrap()
let _ = tx.send(countdown.deadline());
}
}
}
@@ -433,7 +486,6 @@ mod test {
use table::engine::{TableEngine, TableReference};
use table::requests::{CreateTableRequest, TableOptions};
use table::test_util::EmptyTable;
use tokio::sync::oneshot;
use super::*;
use crate::remote::mock::MockTableEngine;
@@ -441,7 +493,7 @@ mod test {
async fn prepare_keepers() -> (TableIdent, RegionAliveKeepers) {
let table_engine = Arc::new(MockTableEngine::default());
let table_engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine));
let keepers = RegionAliveKeepers::new(table_engine_manager);
let keepers = RegionAliveKeepers::new(table_engine_manager, 5000);
let catalog = "my_catalog";
let schema = "my_schema";
@@ -483,7 +535,7 @@ mod test {
async fn test_handle_heartbeat_response() {
let (table_ident, keepers) = prepare_keepers().await;
keepers.start(5000).await;
keepers.start().await;
let startup_protection_until = Instant::now() + Duration::from_secs(21);
let duration_since_epoch = (Instant::now() - keepers.epoch).as_millis() as _;
@@ -517,8 +569,7 @@ mod test {
keep_alive_until: Instant,
is_kept_live: bool,
) {
let handles = keeper.countdown_task_handles.lock().await;
let deadline = deadline(&handles.get(&region_number).unwrap().tx).await;
let deadline = keeper.deadline(region_number).await.unwrap();
if is_kept_live {
assert!(deadline > startup_protection_until && deadline == keep_alive_until);
} else {
@@ -555,11 +606,16 @@ mod test {
})
.await;
keepers.start(5000).await;
keepers.start().await;
for keeper in keepers.keepers.lock().await.values() {
for handle in keeper.countdown_task_handles.lock().await.values() {
let regions = {
let handles = keeper.countdown_task_handles.lock().await;
handles.keys().copied().collect::<Vec<_>>()
};
for region in regions {
// assert countdown tasks are started
assert!(deadline(&handle.tx).await <= Instant::now() + Duration::from_secs(20));
let deadline = keeper.deadline(region).await.unwrap();
assert!(deadline <= Instant::now() + Duration::from_secs(20));
}
}
@@ -598,22 +654,13 @@ mod test {
table_id: 1024,
engine: "mito".to_string(),
};
let keeper = RegionAliveKeeper::new(table_engine, table_ident);
let keeper = RegionAliveKeeper::new(table_engine, table_ident, 1000);
let region = 1;
assert!(keeper.find_handle(&region).await.is_none());
keeper.register_region(region).await;
assert!(keeper.find_handle(&region).await.is_some());
let sender = &keeper
.countdown_task_handles
.lock()
.await
.get(&region)
.unwrap()
.tx
.clone();
let ten_seconds_later = || Instant::now() + Duration::from_secs(10);
keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await;
@@ -622,12 +669,12 @@ mod test {
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29);
// assert if keeper is not started, keep_lived is of no use
assert!(deadline(sender).await > far_future);
assert!(keeper.deadline(region).await.unwrap() > far_future);
keeper.start(1000).await;
keeper.start().await;
keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await;
// assert keep_lived works if keeper is started
assert!(deadline(sender).await <= ten_seconds_later());
assert!(keeper.deadline(region).await.unwrap() <= ten_seconds_later());
keeper.deregister_region(region).await;
assert!(keeper.find_handle(&region).await.is_none());
@@ -726,6 +773,12 @@ mod test {
task.run().await;
});
async fn deadline(tx: &mpsc::Sender<CountdownCommand>) -> Instant {
let (s, r) = oneshot::channel();
tx.send(CountdownCommand::Deadline(s)).await.unwrap();
r.await.unwrap()
}
// if countdown task is not started, its deadline is set to far future
assert!(deadline(&tx).await > Instant::now() + Duration::from_secs(86400 * 365 * 29));
@@ -747,10 +800,4 @@ mod test {
tokio::time::sleep(Duration::from_millis(2000)).await;
assert!(!table_engine.table_exists(ctx, &table_ref));
}
async fn deadline(tx: &mpsc::Sender<CountdownCommand>) -> Instant {
let (s, r) = oneshot::channel();
tx.send(CountdownCommand::Deadline(s)).await.unwrap();
r.await.unwrap()
}
}

View File

@@ -19,6 +19,7 @@ mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::mock::{MockKvBackend, MockTableEngine};
@@ -29,11 +30,27 @@ mod tests {
};
use catalog::{CatalogManager, RegisterTableRequest};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MITO_ENGINE};
use common_meta::ident::TableIdent;
use datatypes::schema::RawSchema;
use futures_util::StreamExt;
use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef};
use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
use table::test_util::EmptyTable;
use tokio::time::Instant;
struct TestingComponents {
kv_backend: KvBackendRef,
catalog_manager: Arc<RemoteCatalogManager>,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
}
impl TestingComponents {
fn table_engine(&self) -> TableEngineRef {
self.table_engine_manager.engine(MITO_ENGINE).unwrap()
}
}
#[tokio::test]
async fn test_backend() {
@@ -121,14 +138,7 @@ mod tests {
assert!(ret.is_none());
}
async fn prepare_components(
node_id: u64,
) -> (
KvBackendRef,
TableEngineRef,
Arc<RemoteCatalogManager>,
TableEngineManagerRef,
) {
async fn prepare_components(node_id: u64) -> TestingComponents {
let cached_backend = Arc::new(CachedMetaKvBackend::wrap(
Arc::new(MockKvBackend::default()),
));
@@ -136,30 +146,34 @@ mod tests {
let table_engine = Arc::new(MockTableEngine::default());
let engine_manager = Arc::new(MemoryTableEngineManager::alias(
MITO_ENGINE.to_string(),
table_engine.clone(),
table_engine,
));
let region_alive_keepers = Arc::new(RegionAliveKeepers::new(engine_manager.clone(), 5000));
let catalog_manager = RemoteCatalogManager::new(
engine_manager.clone(),
node_id,
cached_backend.clone(),
Arc::new(RegionAliveKeepers::new(engine_manager.clone())),
region_alive_keepers.clone(),
);
catalog_manager.start().await.unwrap();
(
cached_backend,
table_engine,
Arc::new(catalog_manager),
engine_manager as Arc<_>,
)
TestingComponents {
kv_backend: cached_backend,
catalog_manager: Arc::new(catalog_manager),
table_engine_manager: engine_manager,
region_alive_keepers,
}
}
#[tokio::test]
async fn test_remote_catalog_default() {
common_telemetry::init_default_ut_logging();
let node_id = 42;
let (_, _, catalog_manager, _) = prepare_components(node_id).await;
let TestingComponents {
catalog_manager, ..
} = prepare_components(node_id).await;
assert_eq!(
vec![DEFAULT_CATALOG_NAME.to_string()],
catalog_manager.catalog_names().await.unwrap()
@@ -180,14 +194,16 @@ mod tests {
async fn test_remote_catalog_register_nonexistent() {
common_telemetry::init_default_ut_logging();
let node_id = 42;
let (_, table_engine, catalog_manager, _) = prepare_components(node_id).await;
let components = prepare_components(node_id).await;
// register a new table with an nonexistent catalog
let catalog_name = "nonexistent_catalog".to_string();
let schema_name = "nonexistent_schema".to_string();
let table_name = "fail_table".to_string();
// this schema has no effect
let table_schema = RawSchema::new(vec![]);
let table = table_engine
let table = components
.table_engine()
.create_table(
&EngineContext {},
CreateTableRequest {
@@ -213,7 +229,7 @@ mod tests {
table_id: 1,
table,
};
let res = catalog_manager.register_table(reg_req).await;
let res = components.catalog_manager.register_table(reg_req).await;
// because nonexistent_catalog does not exist yet.
assert_matches!(
@@ -225,7 +241,8 @@ mod tests {
#[tokio::test]
async fn test_register_table() {
let node_id = 42;
let (_, table_engine, catalog_manager, _) = prepare_components(node_id).await;
let components = prepare_components(node_id).await;
let catalog_manager = &components.catalog_manager;
let default_catalog = catalog_manager
.catalog(DEFAULT_CATALOG_NAME)
.await
@@ -249,7 +266,8 @@ mod tests {
let table_id = 1;
// this schema has no effect
let table_schema = RawSchema::new(vec![]);
let table = table_engine
let table = components
.table_engine()
.create_table(
&EngineContext {},
CreateTableRequest {
@@ -285,8 +303,10 @@ mod tests {
#[tokio::test]
async fn test_register_catalog_schema_table() {
let node_id = 42;
let (backend, table_engine, catalog_manager, engine_manager) =
prepare_components(node_id).await;
let components = prepare_components(node_id).await;
let backend = &components.kv_backend;
let catalog_manager = components.catalog_manager.clone();
let engine_manager = components.table_engine_manager.clone();
let catalog_name = "test_catalog".to_string();
let schema_name = "nonexistent_schema".to_string();
@@ -295,6 +315,7 @@ mod tests {
backend.clone(),
engine_manager.clone(),
node_id,
components.region_alive_keepers.clone(),
));
// register catalog to catalog manager
@@ -308,7 +329,8 @@ mod tests {
HashSet::from_iter(catalog_manager.catalog_names().await.unwrap().into_iter())
);
let table_to_register = table_engine
let table_to_register = components
.table_engine()
.create_table(
&EngineContext {},
CreateTableRequest {
@@ -355,6 +377,7 @@ mod tests {
node_id,
engine_manager,
backend.clone(),
components.region_alive_keepers.clone(),
));
let prev = new_catalog
@@ -374,4 +397,94 @@ mod tests {
.collect()
)
}
#[tokio::test]
async fn test_register_table_before_and_after_region_alive_keeper_started() {
let components = prepare_components(42).await;
let catalog_manager = &components.catalog_manager;
let region_alive_keepers = &components.region_alive_keepers;
let table_before = TableIdent {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "table_before".to_string(),
table_id: 1,
engine: MITO_ENGINE.to_string(),
};
let request = RegisterTableRequest {
catalog: table_before.catalog.clone(),
schema: table_before.schema.clone(),
table_name: table_before.table.clone(),
table_id: table_before.table_id,
table: Arc::new(EmptyTable::new(CreateTableRequest {
id: table_before.table_id,
catalog_name: table_before.catalog.clone(),
schema_name: table_before.schema.clone(),
table_name: table_before.table.clone(),
desc: None,
schema: RawSchema::new(vec![]),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
})),
};
assert!(catalog_manager.register_table(request).await.unwrap());
let keeper = region_alive_keepers
.find_keeper(&table_before)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29);
// assert region alive countdown is not started
assert!(deadline > far_future);
region_alive_keepers.start().await;
let table_after = TableIdent {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "table_after".to_string(),
table_id: 2,
engine: MITO_ENGINE.to_string(),
};
let request = RegisterTableRequest {
catalog: table_after.catalog.clone(),
schema: table_after.schema.clone(),
table_name: table_after.table.clone(),
table_id: table_after.table_id,
table: Arc::new(EmptyTable::new(CreateTableRequest {
id: table_after.table_id,
catalog_name: table_after.catalog.clone(),
schema_name: table_after.schema.clone(),
table_name: table_after.table.clone(),
desc: None,
schema: RawSchema::new(vec![]),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
})),
};
assert!(catalog_manager.register_table(request).await.unwrap());
let keeper = region_alive_keepers
.find_keeper(&table_after)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
// assert countdown is started for the table registered after [RegionAliveKeepers] started
assert!(deadline <= Instant::now() + Duration::from_secs(20));
let keeper = region_alive_keepers
.find_keeper(&table_before)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
// assert countdown is started for the table registered before [RegionAliveKeepers] started, too
assert!(deadline <= Instant::now() + Duration::from_secs(20));
}
}

View File

@@ -30,6 +30,7 @@ use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::time::Instant;
use crate::datanode::DatanodeOptions;
use crate::error::{self, MetaClientInitSnafu, Result};
pub(crate) mod handler;
@@ -57,23 +58,23 @@ impl HeartbeatTask {
/// Create a new heartbeat task instance.
pub fn new(
node_id: u64,
server_addr: String,
server_hostname: Option<String>,
opts: &DatanodeOptions,
meta_client: Arc<MetaClient>,
catalog_manager: CatalogManagerRef,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
heartbeat_interval_millis: u64,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Self {
Self {
node_id,
// We use datanode's start time millis as the node's epoch.
node_epoch: common_time::util::current_time_millis() as u64,
server_addr,
server_hostname,
server_addr: opts.rpc_addr.clone(),
server_hostname: opts.rpc_hostname.clone(),
running: Arc::new(AtomicBool::new(false)),
meta_client,
catalog_manager,
interval: 5_000, // default interval is set to 5 secs
interval: heartbeat_interval_millis,
resp_handler_executor,
region_alive_keepers,
}
@@ -140,7 +141,7 @@ impl HeartbeatTask {
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
self.region_alive_keepers.start(interval).await;
self.region_alive_keepers.start().await;
let meta_client = self.meta_client.clone();
let catalog_manager_clone = self.catalog_manager.clone();

View File

@@ -23,6 +23,7 @@ use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::RegionIdent;
use common_telemetry::{error, info, warn};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
@@ -55,25 +56,8 @@ impl HeartbeatResponseHandler for CloseRegionHandler {
let mailbox = ctx.mailbox.clone();
let self_ref = Arc::new(self.clone());
let region_alive_keepers = self.region_alive_keepers.clone();
common_runtime::spawn_bg(async move {
let table_ident = &region_ident.table_ident;
let table_ref = TableReference::full(
&table_ident.catalog,
&table_ident.schema,
&table_ident.table,
);
let result = self_ref
.close_region_inner(
table_ident.engine.clone(),
&table_ref,
vec![region_ident.region_number],
)
.await;
if matches!(result, Ok(true)) {
region_alive_keepers.deregister_region(&region_ident).await;
}
let result = self_ref.close_region_inner(region_ident).await;
if let Err(e) = mailbox
.send((meta, CloseRegionHandler::map_result(result)))
@@ -152,20 +136,21 @@ impl CloseRegionHandler {
Ok(true)
}
async fn close_region_inner(
&self,
engine: String,
table_ref: &TableReference<'_>,
region_numbers: Vec<RegionNumber>,
) -> Result<bool> {
let engine =
self.table_engine_manager
.engine(&engine)
.context(error::TableEngineNotFoundSnafu {
engine_name: &engine,
})?;
async fn close_region_inner(&self, region_ident: RegionIdent) -> Result<bool> {
let table_ident = &region_ident.table_ident;
let engine_name = &table_ident.engine;
let engine = self
.table_engine_manager
.engine(engine_name)
.context(error::TableEngineNotFoundSnafu { engine_name })?;
let ctx = EngineContext::default();
let table_ref = &TableReference::full(
&table_ident.catalog,
&table_ident.schema,
&table_ident.table,
);
let region_numbers = vec![region_ident.region_number];
if self
.regions_closed(
table_ref.catalog,
@@ -203,7 +188,15 @@ impl CloseRegionHandler {
})? {
CloseTableResult::NotFound | CloseTableResult::Released(_) => {
// Deregister table if The table released.
self.deregister_table(table_ref).await
let deregistered = self.deregister_table(table_ref).await?;
if deregistered {
self.region_alive_keepers
.deregister_table(table_ident)
.await;
}
Ok(deregistered)
}
CloseTableResult::PartialClosed(regions) => {
// Requires caller to update the region_numbers
@@ -211,6 +204,11 @@ impl CloseRegionHandler {
"Close partial regions: {:?} in table: {}",
regions, table_ref
);
self.region_alive_keepers
.deregister_region(&region_ident)
.await;
Ok(true)
}
};

View File

@@ -195,8 +195,12 @@ impl Instance {
let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client.clone()));
let region_alive_keepers =
Arc::new(RegionAliveKeepers::new(engine_manager.clone()));
let heartbeat_interval_millis = 5000;
let region_alive_keepers = Arc::new(RegionAliveKeepers::new(
engine_manager.clone(),
heartbeat_interval_millis,
));
let catalog_manager = Arc::new(RemoteCatalogManager::new(
engine_manager.clone(),
@@ -222,11 +226,11 @@ impl Instance {
let heartbeat_task = Some(HeartbeatTask::new(
opts.node_id.context(MissingNodeIdSnafu)?,
opts.rpc_addr.clone(),
opts.rpc_hostname.clone(),
opts,
meta_client,
catalog_manager.clone(),
Arc::new(handlers_executor),
heartbeat_interval_millis,
region_alive_keepers,
));

View File

@@ -14,6 +14,7 @@
use std::assert_matches::assert_matches;
use std::sync::Arc;
use std::time::Duration;
use api::v1::greptime_request::Request as GrpcRequest;
use api::v1::meta::HeartbeatResponse;
@@ -32,8 +33,10 @@ use datatypes::prelude::ConcreteDataType;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use table::engine::manager::TableEngineManagerRef;
use table::TableRef;
use test_util::MockInstance;
use tokio::sync::mpsc::{self, Receiver};
use tokio::time::Instant;
use crate::heartbeat::handler::close_region::CloseRegionHandler;
use crate::heartbeat::handler::open_region::OpenRegionHandler;
@@ -64,7 +67,7 @@ async fn test_close_region_handler() {
CloseRegionHandler::new(
catalog_manager_ref.clone(),
engine_manager_ref.clone(),
Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone())),
Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone(), 5000)),
),
)]));
@@ -134,43 +137,57 @@ async fn test_open_region_handler() {
..
} = prepare_handler_test("test_open_region_handler").await;
let region_alive_keeper = Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone()));
let region_alive_keepers = Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone(), 5000));
region_alive_keepers.start().await;
let executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(OpenRegionHandler::new(
catalog_manager_ref.clone(),
engine_manager_ref.clone(),
region_alive_keeper.clone(),
region_alive_keepers.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager_ref.clone(),
engine_manager_ref.clone(),
region_alive_keeper,
region_alive_keepers.clone(),
)),
]));
prepare_table(instance.inner()).await;
let instruction = open_region_instruction();
let Instruction::OpenRegion(region_ident) = instruction.clone() else { unreachable!() };
let table_ident = &region_ident.table_ident;
let table = prepare_table(instance.inner()).await;
region_alive_keepers
.register_table(table_ident.clone(), table)
.await
.unwrap();
// Opens a opened table
handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()).await;
handle_instruction(executor.clone(), mailbox.clone(), instruction.clone()).await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::OpenRegion(SimpleReply { result: true, .. })
);
let keeper = region_alive_keepers.find_keeper(table_ident).await.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
assert!(deadline <= Instant::now() + Duration::from_secs(20));
// Opens a non-exist table
let non_exist_table_ident = TableIdent {
catalog: "greptime".to_string(),
schema: "public".to_string(),
table: "non-exist".to_string(),
table_id: 2024,
engine: "mito".to_string(),
};
handle_instruction(
executor.clone(),
mailbox.clone(),
Instruction::OpenRegion(RegionIdent {
table_ident: TableIdent {
catalog: "greptime".to_string(),
schema: "public".to_string(),
table: "non-exist".to_string(),
table_id: 2024,
engine: "mito".to_string(),
},
table_ident: non_exist_table_ident.clone(),
region_number: 0,
cluster_id: 1,
datanode_id: 2,
@@ -183,6 +200,11 @@ async fn test_open_region_handler() {
InstructionReply::OpenRegion(SimpleReply { result: false, .. })
);
assert!(region_alive_keepers
.find_keeper(&non_exist_table_ident)
.await
.is_none());
// Closes demo table
handle_instruction(
executor.clone(),
@@ -197,8 +219,13 @@ async fn test_open_region_handler() {
);
assert_test_table_not_found(instance.inner()).await;
assert!(region_alive_keepers
.find_keeper(table_ident)
.await
.is_none());
// Opens demo table
handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()).await;
handle_instruction(executor.clone(), mailbox.clone(), instruction).await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
@@ -275,10 +302,10 @@ fn open_region_instruction() -> Instruction {
})
}
async fn prepare_table(instance: &Instance) {
async fn prepare_table(instance: &Instance) -> TableRef {
test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype())
.await
.unwrap();
.unwrap()
}
async fn assert_test_table_not_found(instance: &Instance) {

View File

@@ -22,6 +22,7 @@ use servers::Mode;
use snafu::ResultExt;
use table::engine::{EngineContext, TableEngineRef};
use table::requests::{CreateTableRequest, TableOptions};
use table::TableRef;
use crate::datanode::{
DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, StorageConfig, WalConfig,
@@ -84,7 +85,7 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
pub(crate) async fn create_test_table(
instance: &Instance,
ts_type: ConcreteDataType,
) -> Result<()> {
) -> Result<TableRef> {
let column_schemas = vec![
ColumnSchema::new("host", ConcreteDataType::string_datatype(), true),
ColumnSchema::new("cpu", ConcreteDataType::float64_datatype(), true),
@@ -125,8 +126,8 @@ pub(crate) async fn create_test_table(
.unwrap()
.unwrap();
schema_provider
.register_table(table_name.to_string(), table)
.register_table(table_name.to_string(), table.clone())
.await
.unwrap();
Ok(())
Ok(table)
}

View File

@@ -37,8 +37,10 @@ impl EmptyTable {
.next_column_id(0)
.options(req.table_options)
.region_numbers(req.region_numbers)
.engine(req.engine)
.build();
let table_info = TableInfoBuilder::default()
.table_id(req.id)
.catalog_name(req.catalog_name)
.schema_name(req.schema_name)
.name(req.table_name)

View File

@@ -56,7 +56,7 @@ impl MemTable {
Self::new_with_catalog(
table_name,
recordbatch,
0,
1,
"greptime".to_string(),
"public".to_string(),
regions,