feat: adapt region keep aliver for region server (#2333)

* basic impl

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor, collapse one layer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove old heartbeat handler impls

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove old region alive keeper

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove remote catalog manager

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* global replace

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test countdown task

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-06 04:52:46 -05:00
parent dac6b2e80a
commit f287a5db9f
12 changed files with 549 additions and 2518 deletions

View File

@@ -15,14 +15,11 @@
use std::sync::Arc;
pub use client::{CachedMetaKvBackend, MetaKvBackend};
pub use manager::RemoteCatalogManager;
mod client;
mod manager;
#[cfg(feature = "testing")]
pub mod mock;
pub mod region_alive_keeper;
#[async_trait::async_trait]
pub trait KvCacheInvalidator: Send + Sync {

View File

@@ -1,436 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use common_catalog::consts::MITO_ENGINE;
use common_meta::ident::TableIdent;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::datanode_table::DatanodeTableValue;
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::TableMetadataManagerRef;
use common_telemetry::{error, info, warn};
use futures_util::TryStreamExt;
use metrics::increment_gauge;
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::manager::TableEngineManagerRef;
use table::engine::EngineContext;
use table::requests::OpenTableRequest;
use table::TableRef;
use tokio::sync::Mutex;
use crate::error::{
OpenTableSnafu, ParallelOpenTableSnafu, Result, TableEngineNotFoundSnafu, TableExistsSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnimplementedSnafu,
};
use crate::local::MemoryCatalogManager;
use crate::remote::region_alive_keeper::RegionAliveKeepers;
use crate::{
handle_system_table_request, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
};
/// Catalog manager based on metasrv.
pub struct RemoteCatalogManager {
node_id: u64,
engine_manager: TableEngineManagerRef,
system_table_requests: Mutex<Vec<RegisterSystemTableRequest>>,
region_alive_keepers: Arc<RegionAliveKeepers>,
memory_catalog_manager: Arc<MemoryCatalogManager>,
table_metadata_manager: TableMetadataManagerRef,
}
impl RemoteCatalogManager {
pub fn new(
engine_manager: TableEngineManagerRef,
node_id: u64,
region_alive_keepers: Arc<RegionAliveKeepers>,
table_metadata_manager: TableMetadataManagerRef,
) -> Self {
Self {
engine_manager,
node_id,
system_table_requests: Default::default(),
region_alive_keepers,
memory_catalog_manager: MemoryCatalogManager::with_default_setup(),
table_metadata_manager,
}
}
async fn initiate_catalogs(&self) -> Result<()> {
let tables = self
.table_metadata_manager
.datanode_table_manager()
.tables(self.node_id)
.try_collect::<Vec<_>>()
.await
.context(TableMetadataManagerSnafu)?;
let joins = tables
.into_iter()
.map(|datanode_table_value| {
let engine_manager = self.engine_manager.clone();
let memory_catalog_manager = self.memory_catalog_manager.clone();
let table_metadata_manager = self.table_metadata_manager.clone();
let region_alive_keepers = self.region_alive_keepers.clone();
common_runtime::spawn_bg(async move {
let table_id = datanode_table_value.table_id;
if let Err(e) = open_and_register_table(
engine_manager,
datanode_table_value,
memory_catalog_manager,
table_metadata_manager,
region_alive_keepers,
)
.await
{
// Note that we don't return error here if table opened failed. This is because
// we don't want those broken tables to impede the startup of Datanode.
// However, this could be changed in the future.
error!(e; "Failed to open or register table, id = {table_id}")
}
})
})
.collect::<Vec<_>>();
let _ = futures::future::try_join_all(joins)
.await
.context(ParallelOpenTableSnafu)?;
Ok(())
}
}
async fn open_and_register_table(
engine_manager: TableEngineManagerRef,
datanode_table_value: DatanodeTableValue,
memory_catalog_manager: Arc<MemoryCatalogManager>,
table_metadata_manager: TableMetadataManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Result<()> {
let context = EngineContext {};
let table_id = datanode_table_value.table_id;
let region_numbers = datanode_table_value.regions;
let table_info_value = table_metadata_manager
.table_info_manager()
.get(table_id)
.await
.context(TableMetadataManagerSnafu)?
.context(TableNotFoundSnafu {
table_info: format!("table id: {table_id}"),
})?;
let table_info = &table_info_value.table_info;
let catalog_name = table_info.catalog_name.clone();
let schema_name = table_info.schema_name.clone();
let table_name = table_info.name.clone();
let request = OpenTableRequest {
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
table_id,
region_numbers: region_numbers.clone(),
};
let engine =
engine_manager
.engine(&table_info.meta.engine)
.context(TableEngineNotFoundSnafu {
engine_name: &table_info.meta.engine,
})?;
let table_ident = TableIdent {
catalog: catalog_name,
schema: schema_name,
table: table_name,
table_id,
engine: table_info.meta.engine.clone(),
};
let table = engine
.open_table(&context, request)
.await
.with_context(|_| OpenTableSnafu {
table_info: table_ident.to_string(),
})?
.with_context(|| TableNotFoundSnafu {
table_info: table_ident.to_string(),
})?;
info!("Successfully opened table, {table_ident}");
if !memory_catalog_manager
.catalog_exist(&table_ident.catalog)
.await?
{
memory_catalog_manager.register_catalog_sync(table_ident.catalog.clone())?;
}
if !memory_catalog_manager
.schema_exist(&table_ident.catalog, &table_ident.schema)
.await?
{
memory_catalog_manager.register_schema_sync(RegisterSchemaRequest {
catalog: table_ident.catalog.clone(),
schema: table_ident.schema.clone(),
})?;
}
let request = RegisterTableRequest {
catalog: table_ident.catalog.clone(),
schema: table_ident.schema.clone(),
table_name: table_ident.table.clone(),
table_id,
table,
};
let registered =
register_table(&memory_catalog_manager, &region_alive_keepers, request).await?;
ensure!(
registered,
TableExistsSnafu {
table: table_ident.to_string(),
}
);
info!("Successfully registered table, {table_ident}");
Ok(())
}
async fn register_table(
memory_catalog_manager: &Arc<MemoryCatalogManager>,
region_alive_keepers: &Arc<RegionAliveKeepers>,
request: RegisterTableRequest,
) -> Result<bool> {
let table = request.table.clone();
let registered = memory_catalog_manager.register_table_sync(request)?;
if registered {
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table: table_info.name.clone(),
table_id: table_info.table_id(),
engine: table_info.meta.engine.clone(),
};
region_alive_keepers
.register_table(table_ident, table, memory_catalog_manager.clone())
.await?;
}
Ok(registered)
}
#[async_trait]
impl CatalogManager for RemoteCatalogManager {
async fn start(&self) -> Result<()> {
self.initiate_catalogs().await?;
let mut system_table_requests = self.system_table_requests.lock().await;
let engine = self
.engine_manager
.engine(MITO_ENGINE)
.context(TableEngineNotFoundSnafu {
engine_name: MITO_ENGINE,
})?;
handle_system_table_request(self, engine, &mut system_table_requests).await?;
info!("All system table opened");
Ok(())
}
async fn register_table(&self, request: RegisterTableRequest) -> Result<bool> {
register_table(
&self.memory_catalog_manager,
&self.region_alive_keepers,
request,
)
.await
}
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> {
let Some(table) = self
.memory_catalog_manager
.table(&request.catalog, &request.schema, &request.table_name)
.await?
else {
return Ok(());
};
let table_info = table.table_info();
let table_ident = TableIdent {
catalog: request.catalog.clone(),
schema: request.schema.clone(),
table: request.table_name.clone(),
table_id: table_info.ident.table_id,
engine: table_info.meta.engine.clone(),
};
if let Some(keeper) = self
.region_alive_keepers
.deregister_table(&table_ident)
.await
{
warn!(
"Table {} is deregistered from region alive keepers",
keeper.table_ident(),
);
}
self.memory_catalog_manager.deregister_table(request).await
}
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
self.memory_catalog_manager.register_schema_sync(request)
}
async fn deregister_schema(&self, _request: DeregisterSchemaRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister schema",
}
.fail()
}
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
self.memory_catalog_manager.rename_table(request).await
}
async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
let catalog_name = request.create_table_request.catalog_name.clone();
let schema_name = request.create_table_request.schema_name.clone();
let mut requests = self.system_table_requests.lock().await;
requests.push(request);
increment_gauge!(
crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT,
1.0,
&[crate::metrics::db_label(&catalog_name, &schema_name)],
);
Ok(())
}
async fn schema_exist(&self, catalog: &str, schema: &str) -> Result<bool> {
if !self.catalog_exist(catalog).await? {
return Ok(false);
}
if self
.memory_catalog_manager
.schema_exist(catalog, schema)
.await?
{
return Ok(true);
}
let remote_schema_exists = self
.table_metadata_manager
.schema_manager()
.exist(SchemaNameKey::new(catalog, schema))
.await
.context(TableMetadataManagerSnafu)?;
// Create schema locally if remote schema exists. Since local schema is managed by memory
// catalog manager, creating a local schema is relatively cheap (just a HashMap).
// Besides, if this method ("schema_exist) is called, it's very likely that someone wants to
// create a table in this schema. We should create the schema now.
if remote_schema_exists
&& self
.memory_catalog_manager
.register_schema(RegisterSchemaRequest {
catalog: catalog.to_string(),
schema: schema.to_string(),
})
.await?
{
info!("register schema '{catalog}/{schema}' on demand");
}
Ok(remote_schema_exists)
}
async fn table(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> Result<Option<TableRef>> {
self.memory_catalog_manager
.table(catalog_name, schema_name, table_name)
.await
}
async fn catalog_exist(&self, catalog: &str) -> Result<bool> {
if self.memory_catalog_manager.catalog_exist(catalog).await? {
return Ok(true);
}
let key = CatalogNameKey::new(catalog);
let remote_catalog_exists = self
.table_metadata_manager
.catalog_manager()
.exist(key)
.await
.context(TableMetadataManagerSnafu)?;
// Create catalog locally if remote catalog exists. Since local catalog is managed by memory
// catalog manager, creating a local catalog is relatively cheap (just a HashMap).
// Besides, if this method ("catalog_exist) is called, it's very likely that someone wants to
// create a table in this catalog. We should create the catalog now.
if remote_catalog_exists
&& self
.memory_catalog_manager
.clone()
.register_catalog(catalog.to_string())
.await?
{
info!("register catalog '{catalog}' on demand");
}
Ok(remote_catalog_exists)
}
async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result<bool> {
if !self.catalog_exist(catalog).await? {
return Ok(false);
}
if !self.schema_exist(catalog, schema).await? {
return Ok(false);
}
self.memory_catalog_manager
.table_exist(catalog, schema, table)
.await
}
async fn catalog_names(&self) -> Result<Vec<String>> {
self.memory_catalog_manager.catalog_names().await
}
async fn schema_names(&self, catalog_name: &str) -> Result<Vec<String>> {
self.memory_catalog_manager.schema_names(catalog_name).await
}
async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result<Vec<String>> {
self.memory_catalog_manager
.table_names(catalog_name, schema_name)
.await
}
async fn register_catalog(self: Arc<Self>, name: String) -> Result<bool> {
self.memory_catalog_manager.register_catalog_sync(name)
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -1,865 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use common_meta::error::InvalidProtoMsgSnafu;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::ident::TableIdent;
use common_meta::RegionIdent;
use common_telemetry::{debug, error, info, warn};
use snafu::{OptionExt, ResultExt};
use store_api::storage::{RegionId, RegionNumber};
use table::engine::manager::TableEngineManagerRef;
use table::engine::{CloseTableResult, EngineContext, TableEngineRef};
use table::metadata::TableId;
use table::requests::CloseTableRequest;
use table::TableRef;
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
use crate::error::{Result, TableEngineNotFoundSnafu};
use crate::local::MemoryCatalogManager;
use crate::DeregisterTableRequest;
/// [RegionAliveKeepers] manages all [RegionAliveKeeper] in a scope of tables.
pub struct RegionAliveKeepers {
table_engine_manager: TableEngineManagerRef,
keepers: Arc<Mutex<HashMap<TableId, Arc<RegionAliveKeeper>>>>,
heartbeat_interval_millis: u64,
started: AtomicBool,
/// The epoch when [RegionAliveKeepers] is created. It's used to get a monotonically non-decreasing
/// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically
/// non-decreasing). The heartbeat request will carry the duration since this epoch, and the
/// duration acts like an "invariant point" for region's keep alive lease.
epoch: Instant,
}
impl RegionAliveKeepers {
pub fn new(
table_engine_manager: TableEngineManagerRef,
heartbeat_interval_millis: u64,
) -> Self {
Self {
table_engine_manager,
keepers: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: AtomicBool::new(false),
epoch: Instant::now(),
}
}
pub async fn find_keeper(&self, table_id: TableId) -> Option<Arc<RegionAliveKeeper>> {
self.keepers.lock().await.get(&table_id).cloned()
}
pub async fn register_table(
&self,
table_ident: TableIdent,
table: TableRef,
catalog_manager: Arc<MemoryCatalogManager>,
) -> Result<()> {
let table_id = table_ident.table_id;
let keeper = self.find_keeper(table_id).await;
if keeper.is_some() {
return Ok(());
}
let table_engine = self
.table_engine_manager
.engine(&table_ident.engine)
.context(TableEngineNotFoundSnafu {
engine_name: &table_ident.engine,
})?;
let keeper = Arc::new(RegionAliveKeeper::new(
table_engine,
catalog_manager,
table_ident.clone(),
self.heartbeat_interval_millis,
));
for r in table.table_info().meta.region_numbers.iter() {
keeper.register_region(*r).await;
}
let mut keepers = self.keepers.lock().await;
let _ = keepers.insert(table_id, keeper.clone());
if self.started.load(Ordering::Relaxed) {
keeper.start().await;
info!("RegionAliveKeeper for table {table_ident} is started!");
} else {
info!("RegionAliveKeeper for table {table_ident} is registered but not started yet!");
}
Ok(())
}
pub async fn deregister_table(
&self,
table_ident: &TableIdent,
) -> Option<Arc<RegionAliveKeeper>> {
let table_id = table_ident.table_id;
self.keepers.lock().await.remove(&table_id).map(|x| {
info!("Deregister RegionAliveKeeper for table {table_ident}");
x
})
}
pub async fn register_region(&self, region_ident: &RegionIdent) {
let table_id = region_ident.table_ident.table_id;
let Some(keeper) = self.find_keeper(table_id).await else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for region {region_ident} is not found!");
return;
};
keeper.register_region(region_ident.region_number).await
}
pub async fn deregister_region(&self, region_ident: &RegionIdent) {
let table_id = region_ident.table_ident.table_id;
let Some(keeper) = self.find_keeper(table_id).await else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for region {region_ident} is not found!");
return;
};
let _ = keeper.deregister_region(region_ident.region_number).await;
}
pub async fn start(&self) {
let keepers = self.keepers.lock().await;
for keeper in keepers.values() {
keeper.start().await;
}
self.started.store(true, Ordering::Relaxed);
info!(
"RegionAliveKeepers for tables {:?} are started!",
keepers.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
);
}
pub fn epoch(&self) -> Instant {
self.epoch
}
}
#[async_trait]
impl HeartbeatResponseHandler for RegionAliveKeepers {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
ctx.response.region_lease.is_some()
}
async fn handle(
&self,
ctx: &mut HeartbeatResponseHandlerContext,
) -> common_meta::error::Result<HandleControl> {
let region_lease = ctx
.response
.region_lease
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'region_lease' is missing in heartbeat response",
})?;
let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
for raw_region_id in &region_lease.region_ids {
let region_id = RegionId::from_u64(*raw_region_id);
let table_id = region_id.table_id();
let Some(keeper) = self.keepers.lock().await.get(&table_id).cloned() else {
// Alive keeper could be affected by lagging msg, just warn and ignore.
warn!("Alive keeper for table {table_id} is not found!");
continue;
};
// TODO(jeremy): refactor this, use region_id
keeper
.keep_lived(vec![region_id.region_number()], deadline)
.await;
}
Ok(HandleControl::Continue)
}
}
/// [RegionAliveKeeper] starts a countdown for each region in a table. When deadline is reached,
/// the region will be closed.
/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its
/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this
/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to
/// countdown.
pub struct RegionAliveKeeper {
catalog_manager: Arc<MemoryCatalogManager>,
table_engine: TableEngineRef,
table_ident: TableIdent,
countdown_task_handles: Arc<Mutex<HashMap<RegionNumber, Arc<CountdownTaskHandle>>>>,
heartbeat_interval_millis: u64,
started: AtomicBool,
}
impl RegionAliveKeeper {
fn new(
table_engine: TableEngineRef,
catalog_manager: Arc<MemoryCatalogManager>,
table_ident: TableIdent,
heartbeat_interval_millis: u64,
) -> Self {
Self {
catalog_manager,
table_engine,
table_ident,
countdown_task_handles: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: AtomicBool::new(false),
}
}
async fn find_handle(&self, region: &RegionNumber) -> Option<Arc<CountdownTaskHandle>> {
self.countdown_task_handles
.lock()
.await
.get(region)
.cloned()
}
async fn register_region(&self, region: RegionNumber) {
if self.find_handle(&region).await.is_some() {
return;
}
let countdown_task_handles = Arc::downgrade(&self.countdown_task_handles);
let on_task_finished = async move {
if let Some(x) = countdown_task_handles.upgrade() {
let _ = x.lock().await.remove(&region);
} // Else the countdown task handles map could be dropped because the keeper is dropped.
};
let catalog_manager = self.catalog_manager.clone();
let ident = self.table_ident.clone();
let handle = Arc::new(CountdownTaskHandle::new(
self.table_engine.clone(),
self.table_ident.clone(),
region,
move |result: Option<CloseTableResult>| {
if matches!(result, Some(CloseTableResult::Released(_))) {
let result = catalog_manager.deregister_table_sync(DeregisterTableRequest {
catalog: ident.catalog.to_string(),
schema: ident.schema.to_string(),
table_name: ident.table.to_string(),
});
info!(
"Deregister table: {} after countdown task finished, result: {result:?}",
ident.table_id
);
} else {
debug!("Countdown task returns: {result:?}");
}
on_task_finished
},
));
let mut handles = self.countdown_task_handles.lock().await;
let _ = handles.insert(region, handle.clone());
if self.started.load(Ordering::Relaxed) {
handle.start(self.heartbeat_interval_millis).await;
info!(
"Region alive countdown for region {region} in table {} is started!",
self.table_ident
);
} else {
info!(
"Region alive countdown for region {region} in table {} is registered but not started yet!",
self.table_ident
);
}
}
async fn deregister_region(&self, region: RegionNumber) -> Option<Arc<CountdownTaskHandle>> {
self.countdown_task_handles
.lock()
.await
.remove(&region)
.map(|x| {
info!(
"Deregister alive countdown for region {region} in table {}",
self.table_ident
);
x
})
}
async fn start(&self) {
let handles = self.countdown_task_handles.lock().await;
for handle in handles.values() {
handle.start(self.heartbeat_interval_millis).await;
}
self.started.store(true, Ordering::Relaxed);
info!(
"Region alive countdowns for regions {:?} in table {} are started!",
handles.keys().copied().collect::<Vec<_>>(),
self.table_ident
);
}
async fn keep_lived(&self, designated_regions: Vec<RegionNumber>, deadline: Instant) {
for region in designated_regions {
if let Some(handle) = self.find_handle(&region).await {
handle.reset_deadline(deadline).await;
}
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
}
pub async fn deadline(&self, region: RegionNumber) -> Option<Instant> {
let mut deadline = None;
if let Some(handle) = self.find_handle(&region).await {
let (s, r) = oneshot::channel();
if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() {
deadline = r.await.ok()
}
}
deadline
}
pub fn table_ident(&self) -> &TableIdent {
&self.table_ident
}
}
#[derive(Debug)]
enum CountdownCommand {
Start(u64),
Reset(Instant),
Deadline(oneshot::Sender<Instant>),
}
struct CountdownTaskHandle {
tx: mpsc::Sender<CountdownCommand>,
handler: JoinHandle<()>,
table_ident: TableIdent,
region: RegionNumber,
}
impl CountdownTaskHandle {
/// Creates a new [CountdownTaskHandle] and starts the countdown task.
/// # Params
/// - `on_task_finished`: a callback to be invoked when the task is finished. Note that it will not
/// be invoked if the task is cancelled (by dropping the handle). This is because we want something
/// meaningful to be done when the task is finished, e.g. deregister the handle from the map.
/// While dropping the handle does not necessarily mean the task is finished.
fn new<Fut>(
table_engine: TableEngineRef,
table_ident: TableIdent,
region: RegionNumber,
on_task_finished: impl FnOnce(Option<CloseTableResult>) -> Fut + Send + 'static,
) -> Self
where
Fut: Future<Output = ()> + Send,
{
let (tx, rx) = mpsc::channel(1024);
let mut countdown_task = CountdownTask {
table_engine,
table_ident: table_ident.clone(),
region,
rx,
};
let handler = common_runtime::spawn_bg(async move {
let result = countdown_task.run().await;
on_task_finished(result).await;
});
Self {
tx,
handler,
table_ident,
region,
}
}
async fn start(&self, heartbeat_interval_millis: u64) {
if let Err(e) = self
.tx
.send(CountdownCommand::Start(heartbeat_interval_millis))
.await
{
warn!(
"Failed to start region alive keeper countdown: {e}. \
Maybe the task is stopped due to region been closed."
);
}
}
async fn reset_deadline(&self, deadline: Instant) {
if let Err(e) = self.tx.send(CountdownCommand::Reset(deadline)).await {
warn!(
"Failed to reset region alive keeper deadline: {e}. \
Maybe the task is stopped due to region been closed."
);
}
}
}
impl Drop for CountdownTaskHandle {
fn drop(&mut self) {
debug!(
"Aborting region alive countdown task for region {} in table {}",
self.region, self.table_ident,
);
self.handler.abort();
}
}
struct CountdownTask {
table_engine: TableEngineRef,
table_ident: TableIdent,
region: RegionNumber,
rx: mpsc::Receiver<CountdownCommand>,
}
impl CountdownTask {
// returns true if
async fn run(&mut self) -> Option<CloseTableResult> {
// 30 years. See `Instant::far_future`.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
// Make sure the alive countdown is not gonna happen before heartbeat task is started (the
// "start countdown" command will be sent from heartbeat task).
let countdown = tokio::time::sleep_until(far_future);
tokio::pin!(countdown);
let region = &self.region;
let table_ident = &self.table_ident;
loop {
tokio::select! {
command = self.rx.recv() => {
match command {
Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
// Set first deadline in 4 heartbeats (roughly after 20 seconds from now if heartbeat
// interval is set to default 5 seconds), to make Datanode and Metasrv more tolerable to
// network or other jitters during startup.
let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
countdown.set(tokio::time::sleep_until(first_deadline));
},
Some(CountdownCommand::Reset(deadline)) => {
if countdown.deadline() < deadline {
debug!(
"Reset deadline of region {region} of table {table_ident} to approximately {} seconds later",
(deadline - Instant::now()).as_secs_f32(),
);
countdown.set(tokio::time::sleep_until(deadline));
}
// Else the countdown could be either:
// - not started yet;
// - during startup protection;
// - received a lagging heartbeat message.
// All can be safely ignored.
},
None => {
info!(
"The handle of countdown task for region {region} of table {table_ident} \
is dropped, RegionAliveKeeper out."
);
break;
},
Some(CountdownCommand::Deadline(tx)) => {
let _ = tx.send(countdown.deadline());
}
}
}
() = &mut countdown => {
let result = self.close_region().await;
warn!(
"Region {region} of table {table_ident} is closed, result: {result:?}. \
RegionAliveKeeper out.",
);
return Some(result);
}
}
}
None
}
async fn close_region(&self) -> CloseTableResult {
let ctx = EngineContext::default();
let region = self.region;
let table_ident = &self.table_ident;
loop {
let request = CloseTableRequest {
catalog_name: table_ident.catalog.clone(),
schema_name: table_ident.schema.clone(),
table_name: table_ident.table.clone(),
table_id: table_ident.table_id,
region_numbers: vec![region],
flush: true,
};
match self.table_engine.close_table(&ctx, request).await {
Ok(result) => return result,
// If region is failed to close, immediately retry. Maybe we should panic instead?
Err(e) => error!(e;
"Failed to close region {region} of table {table_ident}. \
For the integrity of data, retry closing and retry without wait.",
),
}
}
}
}
#[cfg(test)]
mod test {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use api::v1::meta::{HeartbeatResponse, RegionLease};
use common_meta::heartbeat::mailbox::HeartbeatMailbox;
use datatypes::schema::RawSchema;
use table::engine::manager::MemoryTableEngineManager;
use table::engine::TableEngine;
use table::requests::{CreateTableRequest, TableOptions};
use table::test_util::EmptyTable;
use super::*;
use crate::remote::mock::MockTableEngine;
async fn prepare_keepers() -> (TableIdent, RegionAliveKeepers) {
let table_engine = Arc::new(MockTableEngine::default());
let table_engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine));
let keepers = RegionAliveKeepers::new(table_engine_manager, 5000);
let catalog = "my_catalog";
let schema = "my_schema";
let table = "my_table";
let table_ident = TableIdent {
catalog: catalog.to_string(),
schema: schema.to_string(),
table: table.to_string(),
table_id: 1,
engine: "MockTableEngine".to_string(),
};
let table = EmptyTable::table(CreateTableRequest {
id: 1,
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
desc: None,
schema: RawSchema {
column_schemas: vec![],
timestamp_index: None,
version: 0,
},
region_numbers: vec![1, 2, 3],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: TableOptions::default(),
engine: "MockTableEngine".to_string(),
});
let catalog_manager = MemoryCatalogManager::new_with_table(table.clone());
keepers
.register_table(table_ident.clone(), table, catalog_manager)
.await
.unwrap();
assert!(keepers
.keepers
.lock()
.await
.contains_key(&table_ident.table_id));
(table_ident, keepers)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_handle_heartbeat_response() {
let (table_ident, keepers) = prepare_keepers().await;
keepers.start().await;
let startup_protection_until = Instant::now() + Duration::from_secs(21);
let duration_since_epoch = (Instant::now() - keepers.epoch).as_millis() as _;
let lease_seconds = 100;
let response = HeartbeatResponse {
region_lease: Some(RegionLease {
region_ids: vec![
RegionId::new(table_ident.table_id, 1).as_u64(),
RegionId::new(table_ident.table_id, 3).as_u64(),
], // Not extending region 2's lease time.
duration_since_epoch,
lease_seconds,
}),
..Default::default()
};
let keep_alive_until = keepers.epoch
+ Duration::from_millis(duration_since_epoch)
+ Duration::from_secs(lease_seconds);
let (tx, _) = mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
let mut ctx = HeartbeatResponseHandlerContext::new(mailbox, response);
assert!(keepers.handle(&mut ctx).await.unwrap() == HandleControl::Continue);
// sleep to wait for background task spawned in `handle`
tokio::time::sleep(Duration::from_secs(1)).await;
async fn test(
keeper: &Arc<RegionAliveKeeper>,
region_number: RegionNumber,
startup_protection_until: Instant,
keep_alive_until: Instant,
is_kept_live: bool,
) {
let deadline = keeper.deadline(region_number).await.unwrap();
if is_kept_live {
assert!(deadline > startup_protection_until && deadline == keep_alive_until);
} else {
assert!(deadline <= startup_protection_until);
}
}
let keeper = &keepers
.keepers
.lock()
.await
.get(&table_ident.table_id)
.cloned()
.unwrap();
// Test region 1 and 3 is kept lived. Their deadlines are updated to desired instant.
test(keeper, 1, startup_protection_until, keep_alive_until, true).await;
test(keeper, 3, startup_protection_until, keep_alive_until, true).await;
// Test region 2 is not kept lived. It's deadline is not updated: still during startup protection period.
test(keeper, 2, startup_protection_until, keep_alive_until, false).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_region_alive_keepers() {
let (table_ident, keepers) = prepare_keepers().await;
keepers
.register_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,
table_ident: table_ident.clone(),
region_number: 4,
})
.await;
keepers.start().await;
for keeper in keepers.keepers.lock().await.values() {
let regions = {
let handles = keeper.countdown_task_handles.lock().await;
handles.keys().copied().collect::<Vec<_>>()
};
for region in regions {
// assert countdown tasks are started
let deadline = keeper.deadline(region).await.unwrap();
assert!(deadline <= Instant::now() + Duration::from_secs(20));
}
}
keepers
.deregister_region(&RegionIdent {
cluster_id: 1,
datanode_id: 1,
table_ident: table_ident.clone(),
region_number: 1,
})
.await;
let mut regions = keepers
.find_keeper(table_ident.table_id)
.await
.unwrap()
.countdown_task_handles
.lock()
.await
.keys()
.copied()
.collect::<Vec<_>>();
regions.sort();
assert_eq!(regions, vec![2, 3, 4]);
let keeper = keepers.deregister_table(&table_ident).await.unwrap();
assert!(Arc::try_unwrap(keeper).is_ok(), "keeper is not dropped");
assert!(keepers.keepers.lock().await.is_empty());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_region_alive_keeper() {
let table_engine = Arc::new(MockTableEngine::default());
let table_ident = TableIdent {
catalog: "my_catalog".to_string(),
schema: "my_schema".to_string(),
table: "my_table".to_string(),
table_id: 1024,
engine: "mito".to_string(),
};
let catalog_manager = MemoryCatalogManager::with_default_setup();
let keeper = RegionAliveKeeper::new(table_engine, catalog_manager, table_ident, 1000);
let region = 1;
assert!(keeper.find_handle(&region).await.is_none());
keeper.register_region(region).await;
let _ = keeper.find_handle(&region).await.unwrap();
let ten_seconds_later = || Instant::now() + Duration::from_secs(10);
keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await;
assert!(keeper.find_handle(&2).await.is_none());
assert!(keeper.find_handle(&3).await.is_none());
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29);
// assert if keeper is not started, keep_lived is of no use
assert!(keeper.deadline(region).await.unwrap() > far_future);
keeper.start().await;
keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await;
// assert keep_lived works if keeper is started
assert!(keeper.deadline(region).await.unwrap() <= ten_seconds_later());
let handle = keeper.deregister_region(region).await.unwrap();
assert!(Arc::try_unwrap(handle).is_ok(), "handle is not dropped");
assert!(keeper.find_handle(&region).await.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_countdown_task_handle() {
let table_engine = Arc::new(MockTableEngine::default());
let table_ident = TableIdent {
catalog: "my_catalog".to_string(),
schema: "my_schema".to_string(),
table: "my_table".to_string(),
table_id: 1024,
engine: "mito".to_string(),
};
let finished = Arc::new(AtomicBool::new(false));
let finished_clone = finished.clone();
let handle = CountdownTaskHandle::new(
table_engine.clone(),
table_ident.clone(),
1,
|_| async move { finished_clone.store(true, Ordering::Relaxed) },
);
let tx = handle.tx.clone();
// assert countdown task is running
tx.send(CountdownCommand::Start(5000)).await.unwrap();
assert!(!finished.load(Ordering::Relaxed));
drop(handle);
tokio::time::sleep(Duration::from_secs(1)).await;
// assert countdown task is stopped
assert!(tx
.try_send(CountdownCommand::Reset(
Instant::now() + Duration::from_secs(10)
))
.is_err());
// assert `on_task_finished` is not called (because the task is aborted by the handle's drop)
assert!(!finished.load(Ordering::Relaxed));
let finished = Arc::new(AtomicBool::new(false));
let finished_clone = finished.clone();
let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, |_| async move {
finished_clone.store(true, Ordering::Relaxed)
});
handle.tx.send(CountdownCommand::Start(100)).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
// assert `on_task_finished` is called when task is finished normally
assert!(finished.load(Ordering::Relaxed));
}
#[tokio::test(flavor = "multi_thread")]
async fn test_countdown_task_run() {
let ctx = &EngineContext::default();
let catalog = "my_catalog";
let schema = "my_schema";
let table = "my_table";
let table_id = 1;
let request = CreateTableRequest {
id: table_id,
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
desc: None,
schema: RawSchema {
column_schemas: vec![],
timestamp_index: None,
version: 0,
},
region_numbers: vec![],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: TableOptions::default(),
engine: "mito".to_string(),
};
let table_engine = Arc::new(MockTableEngine::default());
let _ = table_engine.create_table(ctx, request).await.unwrap();
let table_ident = TableIdent {
catalog: catalog.to_string(),
schema: schema.to_string(),
table: table.to_string(),
table_id,
engine: "mito".to_string(),
};
let (tx, rx) = mpsc::channel(10);
let mut task = CountdownTask {
table_engine: table_engine.clone(),
table_ident,
region: 1,
rx,
};
let _handle = common_runtime::spawn_bg(async move {
task.run().await;
});
async fn deadline(tx: &mpsc::Sender<CountdownCommand>) -> Instant {
let (s, r) = oneshot::channel();
tx.send(CountdownCommand::Deadline(s)).await.unwrap();
r.await.unwrap()
}
// if countdown task is not started, its deadline is set to far future
assert!(deadline(&tx).await > Instant::now() + Duration::from_secs(86400 * 365 * 29));
// start countdown in 250ms * 4 = 1s
tx.send(CountdownCommand::Start(250)).await.unwrap();
// assert deadline is correctly set
assert!(deadline(&tx).await <= Instant::now() + Duration::from_secs(1));
// reset countdown in 1.5s
tx.send(CountdownCommand::Reset(
Instant::now() + Duration::from_millis(1500),
))
.await
.unwrap();
// assert the table is closed after deadline is reached
assert!(table_engine.table_exists(ctx, table_id));
// spare 500ms for the task to close the table
tokio::time::sleep(Duration::from_millis(2000)).await;
assert!(!table_engine.table_exists(ctx, table_id));
}
}

View File

@@ -1,453 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#![feature(assert_matches)]
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use catalog::remote::mock::MockTableEngine;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest};
use common_catalog::consts::{
DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE,
};
use common_meta::helper::CatalogValue;
use common_meta::ident::TableIdent;
use common_meta::key::catalog_name::CatalogNameKey;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::memory::MemoryKvBackend;
use common_meta::kv_backend::KvBackend;
use common_meta::rpc::store::{CompareAndPutRequest, PutRequest};
use datatypes::schema::RawSchema;
use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef};
use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
use table::test_util::EmptyTable;
use tokio::time::Instant;
struct TestingComponents {
catalog_manager: Arc<RemoteCatalogManager>,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
}
impl TestingComponents {
fn table_engine(&self) -> TableEngineRef {
self.table_engine_manager.engine(MITO_ENGINE).unwrap()
}
}
#[tokio::test]
async fn test_cached_backend() {
let backend = CachedMetaKvBackend::wrap(Arc::new(MemoryKvBackend::default()));
let default_catalog_key = CatalogNameKey::new(DEFAULT_CATALOG_NAME).to_string();
let req = PutRequest::new()
.with_key(default_catalog_key.as_bytes())
.with_value(CatalogValue.as_bytes().unwrap());
backend.put(req).await.unwrap();
let ret = backend.get(b"__catalog_name/greptime").await.unwrap();
let _ = ret.unwrap();
let req = CompareAndPutRequest::new()
.with_key(b"__catalog_name/greptime".to_vec())
.with_expect(CatalogValue.as_bytes().unwrap())
.with_value(b"123".to_vec());
let _ = backend.compare_and_put(req).await.unwrap();
let ret = backend.get(b"__catalog_name/greptime").await.unwrap();
assert_eq!(b"123", ret.as_ref().unwrap().value.as_slice());
let req = PutRequest::new()
.with_key(b"__catalog_name/greptime".to_vec())
.with_value(b"1234".to_vec());
let _ = backend.put(req).await;
let ret = backend.get(b"__catalog_name/greptime").await.unwrap();
assert_eq!(b"1234", ret.unwrap().value.as_slice());
backend
.delete(b"__catalog_name/greptime", false)
.await
.unwrap();
let ret = backend.get(b"__catalog_name/greptime").await.unwrap();
assert!(ret.is_none());
}
async fn prepare_components(node_id: u64) -> TestingComponents {
let backend = Arc::new(MemoryKvBackend::default());
let req = PutRequest::new()
.with_key(b"__catalog_name/greptime".to_vec())
.with_value(b"".to_vec());
backend.put(req).await.unwrap();
let req = PutRequest::new()
.with_key(b"__schema_name/greptime-public".to_vec())
.with_value(b"".to_vec());
backend.put(req).await.unwrap();
let cached_backend = Arc::new(CachedMetaKvBackend::wrap(backend));
let table_engine = Arc::new(MockTableEngine::default());
let engine_manager = Arc::new(MemoryTableEngineManager::alias(
MITO_ENGINE.to_string(),
table_engine,
));
let region_alive_keepers = Arc::new(RegionAliveKeepers::new(engine_manager.clone(), 5000));
let catalog_manager = RemoteCatalogManager::new(
engine_manager.clone(),
node_id,
region_alive_keepers.clone(),
Arc::new(TableMetadataManager::new(cached_backend)),
);
catalog_manager.start().await.unwrap();
TestingComponents {
catalog_manager: Arc::new(catalog_manager),
table_engine_manager: engine_manager,
region_alive_keepers,
}
}
#[tokio::test]
async fn test_remote_catalog_default() {
common_telemetry::init_default_ut_logging();
let node_id = 42;
let TestingComponents {
catalog_manager, ..
} = prepare_components(node_id).await;
assert_eq!(
vec![DEFAULT_CATALOG_NAME.to_string()],
catalog_manager.catalog_names().await.unwrap()
);
let mut schema_names = catalog_manager
.schema_names(DEFAULT_CATALOG_NAME)
.await
.unwrap();
schema_names.sort_unstable();
assert_eq!(
vec![
INFORMATION_SCHEMA_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string()
],
schema_names
);
}
#[tokio::test]
async fn test_remote_catalog_register_nonexistent() {
common_telemetry::init_default_ut_logging();
let node_id = 42;
let components = prepare_components(node_id).await;
// register a new table with an nonexistent catalog
let catalog_name = "nonexistent_catalog".to_string();
let schema_name = "nonexistent_schema".to_string();
let table_name = "fail_table".to_string();
// this schema has no effect
let table_schema = RawSchema::new(vec![]);
let table = components
.table_engine()
.create_table(
&EngineContext {},
CreateTableRequest {
id: 1,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
desc: None,
schema: table_schema,
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
},
)
.await
.unwrap();
let reg_req = RegisterTableRequest {
catalog: catalog_name,
schema: schema_name,
table_name,
table_id: 1,
table,
};
let res = components.catalog_manager.register_table(reg_req).await;
// because nonexistent_catalog does not exist yet.
assert_matches!(
res.err().unwrap(),
catalog::error::Error::CatalogNotFound { .. }
);
}
#[tokio::test]
async fn test_register_table() {
let node_id = 42;
let components = prepare_components(node_id).await;
let mut schema_names = components
.catalog_manager
.schema_names(DEFAULT_CATALOG_NAME)
.await
.unwrap();
schema_names.sort_unstable();
assert_eq!(
vec![
INFORMATION_SCHEMA_NAME.to_string(),
DEFAULT_SCHEMA_NAME.to_string(),
],
schema_names
);
// register a new table with an nonexistent catalog
let catalog_name = DEFAULT_CATALOG_NAME.to_string();
let schema_name = DEFAULT_SCHEMA_NAME.to_string();
let table_name = "test_table".to_string();
let table_id = 1;
// this schema has no effect
let table_schema = RawSchema::new(vec![]);
let table = components
.table_engine()
.create_table(
&EngineContext {},
CreateTableRequest {
id: table_id,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: table_name.clone(),
desc: None,
schema: table_schema,
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
},
)
.await
.unwrap();
let reg_req = RegisterTableRequest {
catalog: catalog_name,
schema: schema_name,
table_name: table_name.clone(),
table_id,
table,
};
assert!(components
.catalog_manager
.register_table(reg_req)
.await
.unwrap());
assert_eq!(
vec![table_name],
components
.catalog_manager
.table_names(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.await
.unwrap()
);
}
#[tokio::test]
async fn test_register_catalog_schema_table() {
let node_id = 42;
let components = prepare_components(node_id).await;
let catalog_name = "test_catalog".to_string();
let schema_name = "nonexistent_schema".to_string();
// register catalog to catalog manager
assert!(components
.catalog_manager
.clone()
.register_catalog(catalog_name.clone())
.await
.is_ok());
assert_eq!(
HashSet::<String>::from_iter(vec![
DEFAULT_CATALOG_NAME.to_string(),
catalog_name.clone()
]),
HashSet::from_iter(components.catalog_manager.catalog_names().await.unwrap())
);
let table_to_register = components
.table_engine()
.create_table(
&EngineContext {},
CreateTableRequest {
id: 2,
catalog_name: catalog_name.clone(),
schema_name: schema_name.clone(),
table_name: "".to_string(),
desc: None,
schema: RawSchema::new(vec![]),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
},
)
.await
.unwrap();
let reg_req = RegisterTableRequest {
catalog: catalog_name.clone(),
schema: schema_name.clone(),
table_name: " fail_table".to_string(),
table_id: 2,
table: table_to_register,
};
// this register will fail since schema does not exist yet
assert_matches!(
components
.catalog_manager
.register_table(reg_req.clone())
.await
.unwrap_err(),
catalog::error::Error::SchemaNotFound { .. }
);
let register_schema_request = RegisterSchemaRequest {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
};
assert!(components
.catalog_manager
.register_schema(register_schema_request)
.await
.expect("Register schema should not fail"));
assert!(components
.catalog_manager
.register_table(reg_req)
.await
.unwrap());
assert_eq!(
HashSet::from([schema_name.clone(), INFORMATION_SCHEMA_NAME.to_string()]),
components
.catalog_manager
.schema_names(&catalog_name)
.await
.unwrap()
.into_iter()
.collect()
)
}
#[tokio::test]
async fn test_register_table_before_and_after_region_alive_keeper_started() {
let components = prepare_components(42).await;
let catalog_manager = &components.catalog_manager;
let region_alive_keepers = &components.region_alive_keepers;
let table_before = TableIdent {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "table_before".to_string(),
table_id: 1,
engine: MITO_ENGINE.to_string(),
};
let request = RegisterTableRequest {
catalog: table_before.catalog.clone(),
schema: table_before.schema.clone(),
table_name: table_before.table.clone(),
table_id: table_before.table_id,
table: EmptyTable::table(CreateTableRequest {
id: table_before.table_id,
catalog_name: table_before.catalog.clone(),
schema_name: table_before.schema.clone(),
table_name: table_before.table.clone(),
desc: None,
schema: RawSchema::new(vec![]),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
}),
};
assert!(catalog_manager.register_table(request).await.unwrap());
let keeper = region_alive_keepers
.find_keeper(table_before.table_id)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29);
// assert region alive countdown is not started
assert!(deadline > far_future);
region_alive_keepers.start().await;
let table_after = TableIdent {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table: "table_after".to_string(),
table_id: 2,
engine: MITO_ENGINE.to_string(),
};
let request = RegisterTableRequest {
catalog: table_after.catalog.clone(),
schema: table_after.schema.clone(),
table_name: table_after.table.clone(),
table_id: table_after.table_id,
table: EmptyTable::table(CreateTableRequest {
id: table_after.table_id,
catalog_name: table_after.catalog.clone(),
schema_name: table_after.schema.clone(),
table_name: table_after.table.clone(),
desc: None,
schema: RawSchema::new(vec![]),
region_numbers: vec![0],
primary_key_indices: vec![],
create_if_not_exists: false,
table_options: Default::default(),
engine: MITO_ENGINE.to_string(),
}),
};
assert!(catalog_manager.register_table(request).await.unwrap());
let keeper = region_alive_keepers
.find_keeper(table_after.table_id)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
// assert countdown is started for the table registered after [RegionAliveKeepers] started
assert!(deadline <= Instant::now() + Duration::from_secs(20));
let keeper = region_alive_keepers
.find_keeper(table_before.table_id)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
// assert countdown is started for the table registered before [RegionAliveKeepers] started, too
assert!(deadline <= Instant::now() + Duration::from_secs(20));
}
}

View File

@@ -0,0 +1,467 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_meta::error::InvalidProtoMsgSnafu;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_telemetry::{debug, error, info, trace, warn};
use snafu::OptionExt;
use store_api::region_request::{RegionCloseRequest, RegionRequest};
use store_api::storage::RegionId;
#[cfg(test)]
use tokio::sync::oneshot;
use tokio::sync::{mpsc, Mutex};
use tokio::task::JoinHandle;
use tokio::time::{Duration, Instant};
use crate::region_server::RegionServer;
const MAX_CLOSE_RETRY_TIMES: usize = 10;
/// [RegionAliveKeeper] manages all [CountdownTaskHandle]s.
///
/// [RegionAliveKeeper] starts a [CountdownTask] for each region. When deadline is reached,
/// the region will be closed.
/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its
/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this
/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to
/// countdown.
pub struct RegionAliveKeeper {
region_server: RegionServer,
tasks: Arc<Mutex<HashMap<RegionId, Arc<CountdownTaskHandle>>>>,
heartbeat_interval_millis: u64,
started: AtomicBool,
/// The epoch when [RegionAliveKeepers] is created. It's used to get a monotonically non-decreasing
/// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically
/// non-decreasing). The heartbeat request will carry the duration since this epoch, and the
/// duration acts like an "invariant point" for region's keep alive lease.
epoch: Instant,
}
impl RegionAliveKeeper {
pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self {
Self {
region_server,
tasks: Arc::new(Mutex::new(HashMap::new())),
heartbeat_interval_millis,
started: AtomicBool::new(false),
epoch: Instant::now(),
}
}
async fn find_handle(&self, region_id: RegionId) -> Option<Arc<CountdownTaskHandle>> {
self.tasks.lock().await.get(&region_id).cloned()
}
pub async fn register_region(&self, region_id: RegionId) {
if self.find_handle(region_id).await.is_some() {
return;
}
let tasks = Arc::downgrade(&self.tasks);
let on_task_finished = async move {
if let Some(x) = tasks.upgrade() {
let _ = x.lock().await.remove(&region_id);
} // Else the countdown task handles map could be dropped because the keeper is dropped.
};
let handle = Arc::new(CountdownTaskHandle::new(
self.region_server.clone(),
region_id,
move |result: Option<bool>| {
info!(
"Deregister region: {region_id} after countdown task finished, result: {result:?}",
);
on_task_finished
},
));
let mut handles = self.tasks.lock().await;
let _ = handles.insert(region_id, handle.clone());
if self.started.load(Ordering::Relaxed) {
handle.start(self.heartbeat_interval_millis).await;
info!("Region alive countdown for region {region_id} is started!",);
} else {
info!(
"Region alive countdown for region {region_id} is registered but not started yet!",
);
}
}
pub async fn deregister_region(&self, region_id: RegionId) {
if self.tasks.lock().await.remove(&region_id).is_some() {
info!("Deregister alive countdown for region {region_id}")
}
}
async fn keep_lived(&self, designated_regions: Vec<RegionId>, deadline: Instant) {
for region_id in designated_regions {
if let Some(handle) = self.find_handle(region_id).await {
handle.reset_deadline(deadline).await;
}
// Else the region alive keeper might be triggered by lagging messages, we can safely ignore it.
}
}
#[cfg(test)]
async fn deadline(&self, region_id: RegionId) -> Option<Instant> {
let mut deadline = None;
if let Some(handle) = self.find_handle(region_id).await {
let (s, r) = oneshot::channel();
if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() {
deadline = r.await.ok()
}
}
deadline
}
pub async fn start(&self) {
let tasks = self.tasks.lock().await;
for task in tasks.values() {
task.start(self.heartbeat_interval_millis).await;
}
self.started.store(true, Ordering::Relaxed);
info!(
"RegionAliveKeeper is started with region {:?}",
tasks.keys().map(|x| x.to_string()).collect::<Vec<_>>(),
);
}
pub fn epoch(&self) -> Instant {
self.epoch
}
}
#[async_trait]
impl HeartbeatResponseHandler for RegionAliveKeeper {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
ctx.response.region_lease.is_some()
}
async fn handle(
&self,
ctx: &mut HeartbeatResponseHandlerContext,
) -> common_meta::error::Result<HandleControl> {
let region_lease = ctx
.response
.region_lease
.as_ref()
.context(InvalidProtoMsgSnafu {
err_msg: "'region_lease' is missing in heartbeat response",
})?;
let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch);
let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds);
let region_ids = region_lease
.region_ids
.iter()
.map(|id| RegionId::from_u64(*id))
.collect();
self.keep_lived(region_ids, deadline).await;
Ok(HandleControl::Continue)
}
}
#[derive(Debug)]
enum CountdownCommand {
/// Start this countdown task. The first deadline will be set to
/// 4 * `heartbeat_interval_millis`
Start(u64),
/// Reset countdown deadline to the given instance.
Reset(Instant),
/// Returns the current deadline of the countdown task.
#[cfg(test)]
Deadline(oneshot::Sender<Instant>),
}
struct CountdownTaskHandle {
tx: mpsc::Sender<CountdownCommand>,
handler: JoinHandle<()>,
region_id: RegionId,
}
impl CountdownTaskHandle {
/// Creates a new [CountdownTaskHandle] and starts the countdown task.
/// # Params
/// - `on_task_finished`: a callback to be invoked when the task is finished. Note that it will not
/// be invoked if the task is cancelled (by dropping the handle). This is because we want something
/// meaningful to be done when the task is finished, e.g. deregister the handle from the map.
/// While dropping the handle does not necessarily mean the task is finished.
fn new<Fut>(
region_server: RegionServer,
region_id: RegionId,
on_task_finished: impl FnOnce(Option<bool>) -> Fut + Send + 'static,
) -> Self
where
Fut: Future<Output = ()> + Send,
{
let (tx, rx) = mpsc::channel(1024);
let mut countdown_task = CountdownTask {
region_server,
region_id,
rx,
};
let handler = common_runtime::spawn_bg(async move {
let result = countdown_task.run().await;
on_task_finished(result).await;
});
Self {
tx,
handler,
region_id,
}
}
async fn start(&self, heartbeat_interval_millis: u64) {
if let Err(e) = self
.tx
.send(CountdownCommand::Start(heartbeat_interval_millis))
.await
{
warn!(
"Failed to start region alive keeper countdown: {e}. \
Maybe the task is stopped due to region been closed."
);
}
}
#[cfg(test)]
async fn deadline(&self) -> Option<Instant> {
let (tx, rx) = oneshot::channel();
if self.tx.send(CountdownCommand::Deadline(tx)).await.is_ok() {
return rx.await.ok();
}
None
}
async fn reset_deadline(&self, deadline: Instant) {
if let Err(e) = self.tx.send(CountdownCommand::Reset(deadline)).await {
warn!(
"Failed to reset region alive keeper deadline: {e}. \
Maybe the task is stopped due to region been closed."
);
}
}
}
impl Drop for CountdownTaskHandle {
fn drop(&mut self) {
debug!(
"Aborting region alive countdown task for region {}",
self.region_id
);
self.handler.abort();
}
}
struct CountdownTask {
region_server: RegionServer,
region_id: RegionId,
rx: mpsc::Receiver<CountdownCommand>,
}
impl CountdownTask {
// returns true if region closed successfully
async fn run(&mut self) -> Option<bool> {
// 30 years. See `Instant::far_future`.
let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30);
// Make sure the alive countdown is not gonna happen before heartbeat task is started (the
// "start countdown" command will be sent from heartbeat task).
let countdown = tokio::time::sleep_until(far_future);
tokio::pin!(countdown);
let region_id = self.region_id;
loop {
tokio::select! {
command = self.rx.recv() => {
match command {
Some(CountdownCommand::Start(heartbeat_interval_millis)) => {
// Set first deadline in 4 heartbeats (roughly after 20 seconds from now if heartbeat
// interval is set to default 5 seconds), to make Datanode and Metasrv more tolerable to
// network or other jitters during startup.
let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4;
countdown.set(tokio::time::sleep_until(first_deadline));
},
Some(CountdownCommand::Reset(deadline)) => {
if countdown.deadline() < deadline {
trace!(
"Reset deadline of region {region_id} to approximately {} seconds later",
(deadline - Instant::now()).as_secs_f32(),
);
countdown.set(tokio::time::sleep_until(deadline));
}
// Else the countdown could be either:
// - not started yet;
// - during startup protection;
// - received a lagging heartbeat message.
// All can be safely ignored.
},
None => {
info!(
"The handle of countdown task for region {region_id}\
is dropped, RegionAliveKeeper out."
);
break;
},
#[cfg(test)]
Some(CountdownCommand::Deadline(tx)) => {
let _ = tx.send(countdown.deadline());
}
}
}
() = &mut countdown => {
let result = self.close_region().await;
info!(
"Region {region_id} is closed, result: {result:?}. \
RegionAliveKeeper out.",
);
return Some(result);
}
}
}
None
}
/// Returns if the region is closed successfully.
async fn close_region(&self) -> bool {
for retry in 0..MAX_CLOSE_RETRY_TIMES {
let request = RegionRequest::Close(RegionCloseRequest {});
match self
.region_server
.handle_request(self.region_id, request)
.await
{
Ok(_) => return true,
Err(e) if e.status_code() == StatusCode::RegionNotFound => return true,
// If region is failed to close, immediately retry. Maybe we should panic instead?
Err(e) => error!(e;
"Retry {retry}, failed to close region {}. \
For the integrity of data, retry closing and retry without wait.",
self.region_id,
),
}
}
false
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::tests::mock_region_server;
#[tokio::test(flavor = "multi_thread")]
async fn region_alive_keeper() {
let region_server = mock_region_server();
let alive_keeper = RegionAliveKeeper::new(region_server, 300);
let region_id = RegionId::new(1, 2);
// register a region before starting
alive_keeper.register_region(region_id).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
alive_keeper.start().await;
// started alive keeper should assign deadline to this region
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline >= Instant::now());
// extend lease then sleep
alive_keeper
.keep_lived(vec![region_id], Instant::now() + Duration::from_millis(500))
.await;
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(alive_keeper.find_handle(region_id).await.is_some());
let deadline = alive_keeper.deadline(region_id).await.unwrap();
assert!(deadline >= Instant::now());
// sleep to wait lease expired
tokio::time::sleep(Duration::from_millis(1000)).await;
assert!(alive_keeper.find_handle(region_id).await.is_none());
}
#[tokio::test(flavor = "multi_thread")]
async fn countdown_task() {
let region_server = mock_region_server();
let (tx, rx) = oneshot::channel();
let countdown_handle = CountdownTaskHandle::new(
region_server,
RegionId::new(9999, 2),
|result: Option<bool>| async move {
tx.send((Instant::now(), result)).unwrap();
},
);
// if countdown task is not started, its deadline is set to far future
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_secs(86400 * 365 * 29)
);
// the first deadline should be set to 4 * heartbeat_interval_millis
// we assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test
let heartbeat_interval_millis = 100;
countdown_handle.start(heartbeat_interval_millis).await;
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
);
// reset deadline
// a nearer deadline will be ignored
countdown_handle
.reset_deadline(Instant::now() + Duration::from_millis(heartbeat_interval_millis))
.await;
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3)
);
// only a farther deadline will be accepted
countdown_handle
.reset_deadline(Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5))
.await;
assert!(
countdown_handle.deadline().await.unwrap()
> Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4)
);
// wait for countdown task to finish
let before_await = Instant::now();
let (finish_instant, result) = rx.await.unwrap();
// the mock region server cannot close the region
assert_eq!(result, Some(false));
// this task should be finished after 5 * heartbeat_interval_millis
// we assert 4 times here
assert!(
finish_instant > before_await + Duration::from_millis(heartbeat_interval_millis * 4)
);
}
}

View File

@@ -17,7 +17,6 @@ use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, Peer, RegionStat};
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
@@ -27,11 +26,11 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::{debug, error, info, trace, warn};
use meta_client::client::{HeartbeatSender, MetaClient};
use snafu::{OptionExt, ResultExt};
use table::engine::manager::MemoryTableEngineManager;
use tokio::sync::mpsc;
use tokio::time::Instant;
use self::handler::RegionHeartbeatResponseHandler;
use crate::alive_keeper::RegionAliveKeeper;
use crate::datanode::DatanodeOptions;
use crate::error::{
self, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result,
@@ -51,7 +50,7 @@ pub struct HeartbeatTask {
region_server: RegionServer,
interval: u64,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
region_alive_keeper: Arc<RegionAliveKeeper>,
}
impl Drop for HeartbeatTask {
@@ -77,14 +76,14 @@ impl HeartbeatTask {
let region_server = region_server.unwrap();
let region_alive_keepers = Arc::new(RegionAliveKeepers::new(
Arc::new(MemoryTableEngineManager::new_empty()),
let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
region_server.clone(),
opts.heartbeat.interval_millis,
));
let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
region_alive_keepers.clone(),
region_alive_keeper.clone(),
]));
Ok(Self {
@@ -98,7 +97,7 @@ impl HeartbeatTask {
region_server,
interval: opts.heartbeat.interval_millis,
resp_handler_executor,
region_alive_keepers,
region_alive_keeper,
})
}
@@ -163,7 +162,7 @@ impl HeartbeatTask {
let addr = resolve_addr(&self.server_addr, &self.server_hostname);
info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
self.region_alive_keepers.start().await;
self.region_alive_keeper.start().await;
let meta_client = self.meta_client.clone();
let region_server_clone = self.region_server.clone();
@@ -181,7 +180,7 @@ impl HeartbeatTask {
)
.await?;
let epoch = self.region_alive_keepers.epoch();
let epoch = self.region_alive_keeper.epoch();
common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);

View File

@@ -30,9 +30,6 @@ use store_api::storage::RegionId;
use crate::error::Result;
use crate::region_server::RegionServer;
pub mod close_region;
pub mod open_region;
/// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion].
#[derive(Clone)]
pub struct RegionHeartbeatResponseHandler {

View File

@@ -1,235 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::{CatalogManagerRef, DeregisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_meta::RegionIdent;
use common_telemetry::{error, info, warn};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use table::engine::manager::TableEngineManagerRef;
use table::engine::{CloseTableResult, EngineContext, TableReference};
use table::requests::CloseTableRequest;
use crate::error::{self, Result};
#[derive(Clone)]
pub struct CloseRegionHandler {
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
}
#[async_trait]
impl HeartbeatResponseHandler for CloseRegionHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::CloseRegion { .. }))
)
}
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take()
else {
unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'");
};
let mailbox = ctx.mailbox.clone();
let self_ref = Arc::new(self.clone());
let _handle = common_runtime::spawn_bg(async move {
let result = self_ref.close_region_inner(region_ident).await;
if let Err(e) = mailbox
.send((meta, CloseRegionHandler::map_result(result)))
.await
{
error!(e; "Failed to send reply to mailbox");
}
});
Ok(HandleControl::Done)
}
}
impl CloseRegionHandler {
pub fn new(
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Self {
Self {
catalog_manager,
table_engine_manager,
region_alive_keepers,
}
}
fn map_result(result: Result<bool>) -> InstructionReply {
result.map_or_else(
|error| {
InstructionReply::CloseRegion(SimpleReply {
result: false,
error: Some(error.to_string()),
})
},
|result| {
InstructionReply::CloseRegion(SimpleReply {
result,
error: None,
})
},
)
}
/// Returns true if a table or target regions have been closed.
async fn regions_closed(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region_numbers: &[RegionNumber],
) -> Result<bool> {
if let Some(table) = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(error::AccessCatalogSnafu)?
{
for r in region_numbers {
let region_exist =
table
.contains_region(*r)
.with_context(|_| error::CheckRegionSnafu {
table_name: format_full_table_name(
catalog_name,
schema_name,
table_name,
),
region_number: *r,
})?;
if region_exist {
return Ok(false);
}
}
}
// Returns true if table not exist
Ok(true)
}
async fn close_region_inner(&self, region_ident: RegionIdent) -> Result<bool> {
let table_ident = &region_ident.table_ident;
let engine_name = &table_ident.engine;
let engine = self
.table_engine_manager
.engine(engine_name)
.context(error::TableEngineNotFoundSnafu { engine_name })?;
let ctx = EngineContext::default();
let table_ref = &TableReference::full(
&table_ident.catalog,
&table_ident.schema,
&table_ident.table,
);
let region_numbers = vec![region_ident.region_number];
if self
.regions_closed(
table_ref.catalog,
table_ref.schema,
table_ref.table,
&region_numbers,
)
.await?
{
return Ok(true);
}
if engine
.get_table(&ctx, region_ident.table_ident.table_id)
.with_context(|_| error::GetTableSnafu {
table_name: table_ref.to_string(),
})?
.is_some()
{
return match engine
.close_table(
&ctx,
CloseTableRequest {
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
region_numbers: region_numbers.clone(),
table_id: region_ident.table_ident.table_id,
flush: true,
},
)
.await
.with_context(|_| error::CloseTableSnafu {
table_name: table_ref.to_string(),
region_numbers: region_numbers.clone(),
})? {
CloseTableResult::NotFound | CloseTableResult::Released(_) => {
// Deregister table if The table released.
self.deregister_table(table_ref).await?;
let _ = self
.region_alive_keepers
.deregister_table(table_ident)
.await;
Ok(true)
}
CloseTableResult::PartialClosed(regions) => {
// Requires caller to update the region_numbers
info!(
"Close partial regions: {:?} in table: {}",
regions, table_ref
);
self.region_alive_keepers
.deregister_region(&region_ident)
.await;
Ok(true)
}
};
}
warn!("Trying to close a non-existing table: {}", table_ref);
// Table doesn't exist
Ok(true)
}
async fn deregister_table(&self, table_ref: &TableReference<'_>) -> Result<()> {
self.catalog_manager
.deregister_table(DeregisterTableRequest {
catalog: table_ref.catalog.to_string(),
schema: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
})
.await
.with_context(|_| error::DeregisterTableSnafu {
table_name: table_ref.to_string(),
})
}
}

View File

@@ -1,250 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use async_trait::async_trait;
use catalog::error::{Error as CatalogError, Result as CatalogResult};
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::{CatalogManagerRef, RegisterSchemaRequest, RegisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
use common_telemetry::{error, warn};
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use table::engine::manager::TableEngineManagerRef;
use table::engine::EngineContext;
use table::requests::OpenTableRequest;
use table::Table;
use crate::error::{self, Result};
#[derive(Clone)]
pub struct OpenRegionHandler {
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
}
#[async_trait]
impl HeartbeatResponseHandler for OpenRegionHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message,
Some((_, Instruction::OpenRegion { .. }))
)
}
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take()
else {
unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'");
};
let mailbox = ctx.mailbox.clone();
let self_ref = Arc::new(self.clone());
let region_alive_keepers = self.region_alive_keepers.clone();
let _handle = common_runtime::spawn_bg(async move {
let table_ident = &region_ident.table_ident;
let request = OpenTableRequest {
catalog_name: table_ident.catalog.clone(),
schema_name: table_ident.schema.clone(),
table_name: table_ident.table.clone(),
table_id: table_ident.table_id,
region_numbers: vec![region_ident.region_number],
};
let result = self_ref
.open_region_inner(table_ident.engine.clone(), request)
.await;
if matches!(result, Ok(true)) {
region_alive_keepers.register_region(&region_ident).await;
}
if let Err(e) = mailbox
.send((meta, OpenRegionHandler::map_result(result)))
.await
{
error!(e; "Failed to send reply to mailbox");
}
});
Ok(HandleControl::Done)
}
}
impl OpenRegionHandler {
pub fn new(
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
region_alive_keepers: Arc<RegionAliveKeepers>,
) -> Self {
Self {
catalog_manager,
table_engine_manager,
region_alive_keepers,
}
}
fn map_result(result: Result<bool>) -> InstructionReply {
result.map_or_else(
|error| {
InstructionReply::OpenRegion(SimpleReply {
result: false,
error: Some(error.to_string()),
})
},
|result| {
InstructionReply::OpenRegion(SimpleReply {
result,
error: None,
})
},
)
}
/// Returns true if a table or target regions have been opened.
async fn regions_opened(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region_numbers: &[RegionNumber],
) -> Result<bool> {
if let Some(table) = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(error::AccessCatalogSnafu)?
{
for r in region_numbers {
let region_exist =
table
.contains_region(*r)
.with_context(|_| error::CheckRegionSnafu {
table_name: format_full_table_name(
catalog_name,
schema_name,
table_name,
),
region_number: *r,
})?;
if !region_exist {
warn!(
"Failed to check table: {}, region: {} does not exist",
format_full_table_name(catalog_name, schema_name, table_name,),
r
);
return Ok(false);
}
}
return Ok(true);
}
Ok(false)
}
async fn register_table(
&self,
request: &OpenTableRequest,
table: Arc<dyn Table>,
) -> CatalogResult<bool> {
if !self
.catalog_manager
.catalog_exist(&request.catalog_name)
.await?
{
self.catalog_manager
.clone()
.register_catalog(request.catalog_name.to_string())
.await?;
}
if !self
.catalog_manager
.schema_exist(&request.catalog_name, &request.schema_name)
.await?
{
self.catalog_manager
.register_schema(RegisterSchemaRequest {
catalog: request.catalog_name.to_string(),
schema: request.schema_name.to_string(),
})
.await?;
}
let request = RegisterTableRequest {
catalog: request.catalog_name.to_string(),
schema: request.schema_name.to_string(),
table_name: request.table_name.to_string(),
table_id: request.table_id,
table,
};
self.catalog_manager.register_table(request).await
}
async fn open_region_inner(&self, engine: String, request: OpenTableRequest) -> Result<bool> {
let OpenTableRequest {
catalog_name,
schema_name,
table_name,
region_numbers,
..
} = &request;
let engine =
self.table_engine_manager
.engine(&engine)
.context(error::TableEngineNotFoundSnafu {
engine_name: &engine,
})?;
let ctx = EngineContext::default();
if self
.regions_opened(catalog_name, schema_name, table_name, region_numbers)
.await?
{
return Ok(true);
}
if let Some(table) = engine
.open_table(&ctx, request.clone())
.await
.with_context(|_| error::OpenTableSnafu {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
})?
{
let result = self.register_table(&request, table).await;
match result {
Ok(_) | Err(CatalogError::TableExists { .. }) => Ok(true),
e => e.with_context(|_| error::RegisterTableSnafu {
table_name: format_full_table_name(catalog_name, schema_name, table_name),
}),
}
} else {
// Case 1:
// TODO(weny): Fix/Cleanup the broken table manifest
// The manifest writing operation should be atomic.
// Therefore, we won't meet this case, in theory.
// Case 2: The target region was not found in table meta
// Case 3: The table not exist
Ok(false)
}
}
}

View File

@@ -18,8 +18,7 @@ use std::time::Duration;
use std::{fs, path};
use api::v1::meta::Role;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
use catalog::local::MemoryCatalogManager;
use catalog::CatalogManagerRef;
use common_base::Plugins;
use common_catalog::consts::DEFAULT_CATALOG_NAME;
@@ -29,7 +28,6 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_meta::key::TableMetadataManager;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::store::state_store::ObjectStateStore;
use common_procedure::ProcedureManagerRef;
@@ -51,7 +49,7 @@ use storage::scheduler::{LocalScheduler, SchedulerConfig};
use storage::EngineImpl;
use store_api::logstore::LogStore;
use store_api::path_utils::{CLUSTER_DIR, WAL_DIR};
use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef};
use table::engine::manager::MemoryTableEngineManager;
use table::engine::{TableEngine, TableEngineProcedureRef};
use table::requests::FlushTableRequest;
use table::table::TableIdProviderRef;
@@ -63,8 +61,6 @@ use crate::error::{
ShutdownInstanceSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu,
};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::heartbeat::handler::close_region::CloseRegionHandler;
use crate::heartbeat::handler::open_region::OpenRegionHandler;
use crate::heartbeat::HeartbeatTask;
use crate::row_inserter::RowInserter;
use crate::sql::{SqlHandler, SqlRequest};
@@ -115,9 +111,6 @@ impl Instance {
fn build_heartbeat_task(
opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>,
catalog_manager: CatalogManagerRef,
engine_manager: TableEngineManagerRef,
region_alive_keepers: Option<Arc<RegionAliveKeepers>>,
) -> Result<Option<HeartbeatTask>> {
Ok(match opts.mode {
Mode::Standalone => None,
@@ -126,24 +119,8 @@ impl Instance {
let _meta_client = meta_client.context(IncorrectInternalStateSnafu {
state: "meta client is not provided when building heartbeat task",
})?;
let region_alive_keepers =
region_alive_keepers.context(IncorrectInternalStateSnafu {
state: "region_alive_keepers is not provided when building heartbeat task",
})?;
let _handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler),
Arc::new(OpenRegionHandler::new(
catalog_manager.clone(),
engine_manager.clone(),
region_alive_keepers.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager.clone(),
engine_manager,
region_alive_keepers.clone(),
)),
region_alive_keepers.clone(),
]);
let _handlers_executor =
HandlerGroupExecutor::new(vec![Arc::new(ParseMailboxMessageHandler)]);
todo!("remove this method")
}
@@ -199,7 +176,7 @@ impl Instance {
);
// create remote catalog manager
let (catalog_manager, table_id_provider, region_alive_keepers) = match opts.mode {
let (catalog_manager, table_id_provider) = match opts.mode {
Mode::Standalone => {
let catalog = Arc::new(
catalog::local::LocalCatalogManager::try_new(engine_manager.clone())
@@ -210,35 +187,13 @@ impl Instance {
(
catalog.clone() as CatalogManagerRef,
Some(catalog as TableIdProviderRef),
None,
)
}
Mode::Distributed => {
let meta_client = meta_client.clone().context(IncorrectInternalStateSnafu {
state: "meta client is not provided when creating distributed Datanode",
})?;
let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client));
let region_alive_keepers = Arc::new(RegionAliveKeepers::new(
engine_manager.clone(),
opts.heartbeat.interval_millis,
));
let catalog_manager = Arc::new(RemoteCatalogManager::new(
engine_manager.clone(),
opts.node_id.context(MissingNodeIdSnafu)?,
region_alive_keepers.clone(),
Arc::new(TableMetadataManager::new(kv_backend)),
));
(
catalog_manager as CatalogManagerRef,
None,
Some(region_alive_keepers),
)
}
Mode::Distributed => (
MemoryCatalogManager::with_default_setup() as CatalogManagerRef,
None,
),
};
let factory =
@@ -285,13 +240,7 @@ impl Instance {
greptimedb_telemetry_task,
});
let heartbeat_task = Instance::build_heartbeat_task(
opts,
meta_client,
catalog_manager,
engine_manager,
region_alive_keepers,
)?;
let heartbeat_task = Instance::build_heartbeat_task(opts, meta_client)?;
Ok((instance, heartbeat_task))
}

View File

@@ -15,6 +15,7 @@
#![feature(assert_matches)]
#![feature(trait_upcasting)]
pub mod alive_keeper;
pub mod datanode;
pub mod error;
mod greptimedb_telemetry;
@@ -29,4 +30,5 @@ pub mod server;
pub mod sql;
mod store;
#[cfg(test)]
#[allow(dead_code)]
mod tests;

View File

@@ -12,36 +12,40 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use api::v1::greptime_request::Request as GrpcRequest;
use api::v1::meta::HeartbeatResponse;
use api::v1::query_request::Query;
use api::v1::QueryRequest;
use catalog::local::MemoryCatalogManager;
use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::CatalogManagerRef;
use async_trait::async_trait;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::FunctionRef;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::ident::TableIdent;
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply};
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent};
use common_query::prelude::ScalarUdf;
use common_query::Output;
use common_runtime::Runtime;
use datatypes::prelude::ConcreteDataType;
use query::dataframe::DataFrame;
use query::plan::LogicalPlan;
use query::planner::LogicalPlanner;
use query::query_engine::DescribeResult;
use query::QueryEngine;
use servers::query_handler::grpc::GrpcQueryHandler;
use session::context::QueryContext;
use session::context::{QueryContext, QueryContextRef};
use table::engine::manager::TableEngineManagerRef;
use table::TableRef;
use test_util::MockInstance;
use tokio::sync::mpsc::{self, Receiver};
use tokio::time::Instant;
use crate::heartbeat::handler::close_region::CloseRegionHandler;
use crate::heartbeat::handler::open_region::OpenRegionHandler;
use crate::instance::Instance;
use crate::region_server::RegionServer;
pub(crate) mod test_util;
@@ -50,203 +54,12 @@ struct HandlerTestGuard {
mailbox: Arc<HeartbeatMailbox>,
rx: Receiver<(MessageMeta, InstructionReply)>,
engine_manager_ref: TableEngineManagerRef,
catalog_manager_ref: CatalogManagerRef,
}
#[tokio::test]
async fn test_close_region_handler() {
let HandlerTestGuard {
instance,
mailbox,
mut rx,
engine_manager_ref,
catalog_manager_ref,
..
} = prepare_handler_test("test_close_region_handler").await;
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
CloseRegionHandler::new(
catalog_manager_ref.clone(),
engine_manager_ref.clone(),
Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone(), 5000)),
),
)]));
let _ = prepare_table(instance.inner()).await;
// Closes demo table
handle_instruction(
executor.clone(),
mailbox.clone(),
close_region_instruction(),
)
.await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::CloseRegion(SimpleReply { result: true, .. })
);
assert_test_table_not_found(instance.inner()).await;
// Closes demo table again
handle_instruction(
executor.clone(),
mailbox.clone(),
close_region_instruction(),
)
.await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::CloseRegion(SimpleReply { result: true, .. })
);
// Closes non-exist table
handle_instruction(
executor.clone(),
mailbox.clone(),
Instruction::CloseRegion(RegionIdent {
table_ident: TableIdent {
catalog: "greptime".to_string(),
schema: "public".to_string(),
table: "non-exist".to_string(),
table_id: 1025,
engine: "mito".to_string(),
},
region_number: 0,
cluster_id: 1,
datanode_id: 2,
}),
)
.await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::CloseRegion(SimpleReply { result: true, .. })
);
}
#[tokio::test]
async fn test_open_region_handler() {
let HandlerTestGuard {
instance,
mailbox,
mut rx,
engine_manager_ref,
catalog_manager_ref,
..
} = prepare_handler_test("test_open_region_handler").await;
let region_alive_keepers = Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone(), 5000));
region_alive_keepers.start().await;
let executor = Arc::new(HandlerGroupExecutor::new(vec![
Arc::new(OpenRegionHandler::new(
catalog_manager_ref.clone(),
engine_manager_ref.clone(),
region_alive_keepers.clone(),
)),
Arc::new(CloseRegionHandler::new(
catalog_manager_ref.clone(),
engine_manager_ref.clone(),
region_alive_keepers.clone(),
)),
]));
let instruction = open_region_instruction();
let Instruction::OpenRegion(region_ident) = instruction.clone() else {
unreachable!()
};
let table_ident = &region_ident.table_ident;
let table = prepare_table(instance.inner()).await;
let dummy_catalog_manager = MemoryCatalogManager::with_default_setup();
region_alive_keepers
.register_table(table_ident.clone(), table, dummy_catalog_manager)
.await
.unwrap();
// Opens a opened table
handle_instruction(executor.clone(), mailbox.clone(), instruction.clone()).await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::OpenRegion(SimpleReply { result: true, .. })
);
let keeper = region_alive_keepers
.find_keeper(table_ident.table_id)
.await
.unwrap();
let deadline = keeper.deadline(0).await.unwrap();
assert!(deadline <= Instant::now() + Duration::from_secs(20));
// Opens a non-exist table
let non_exist_table_ident = TableIdent {
catalog: "foo".to_string(),
schema: "non-exist".to_string(),
table: "non-exist".to_string(),
table_id: 2024,
engine: "mito".to_string(),
};
handle_instruction(
executor.clone(),
mailbox.clone(),
Instruction::OpenRegion(RegionIdent {
table_ident: non_exist_table_ident.clone(),
region_number: 0,
cluster_id: 1,
datanode_id: 2,
}),
)
.await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::OpenRegion(SimpleReply { result: false, .. })
);
assert!(region_alive_keepers
.find_keeper(non_exist_table_ident.table_id)
.await
.is_none());
// Closes demo table
handle_instruction(
executor.clone(),
mailbox.clone(),
close_region_instruction(),
)
.await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::CloseRegion(SimpleReply { result: true, .. })
);
assert_test_table_not_found(instance.inner()).await;
assert!(region_alive_keepers
.find_keeper(table_ident.table_id)
.await
.is_none());
// Opens demo table
handle_instruction(executor.clone(), mailbox.clone(), instruction).await;
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::OpenRegion(SimpleReply { result: true, .. })
);
assert_test_table_found(instance.inner()).await;
}
async fn prepare_handler_test(name: &str) -> HandlerTestGuard {
let mock_instance = MockInstance::new(name).await;
let instance = mock_instance.inner();
let engine_manager = instance.sql_handler().table_engine_manager().clone();
let catalog_manager = instance.sql_handler().catalog_manager().clone();
let (tx, rx) = mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
@@ -255,7 +68,6 @@ async fn prepare_handler_test(name: &str) -> HandlerTestGuard {
mailbox,
rx,
engine_manager_ref: engine_manager,
catalog_manager_ref: catalog_manager,
}
}
@@ -346,3 +158,50 @@ async fn assert_test_table_found(instance: &Instance) {
assert!(matches!(output, Output::AffectedRows(2)));
}
pub struct MockQueryEngine;
#[async_trait]
impl QueryEngine for MockQueryEngine {
fn as_any(&self) -> &dyn Any {
self as _
}
fn planner(&self) -> Arc<dyn LogicalPlanner> {
unimplemented!()
}
fn name(&self) -> &str {
"MockQueryEngine"
}
async fn describe(&self, _plan: LogicalPlan) -> query::error::Result<DescribeResult> {
unimplemented!()
}
async fn execute(
&self,
_plan: LogicalPlan,
_query_ctx: QueryContextRef,
) -> query::error::Result<Output> {
unimplemented!()
}
fn register_udf(&self, _udf: ScalarUdf) {}
fn register_aggregate_function(&self, _func: AggregateFunctionMetaRef) {}
fn register_function(&self, _func: FunctionRef) {}
fn read_table(&self, _table: TableRef) -> query::error::Result<DataFrame> {
unimplemented!()
}
}
/// Create a region server without any engine
pub fn mock_region_server() -> RegionServer {
RegionServer::new(
Arc::new(MockQueryEngine),
Arc::new(Runtime::builder().build().unwrap()),
)
}