diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs index f60e574a1a..9cdd84e490 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/remote.rs @@ -15,14 +15,11 @@ use std::sync::Arc; pub use client::{CachedMetaKvBackend, MetaKvBackend}; -pub use manager::RemoteCatalogManager; mod client; -mod manager; #[cfg(feature = "testing")] pub mod mock; -pub mod region_alive_keeper; #[async_trait::async_trait] pub trait KvCacheInvalidator: Send + Sync { diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs deleted file mode 100644 index ddbd9f71ec..0000000000 --- a/src/catalog/src/remote/manager.rs +++ /dev/null @@ -1,436 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::sync::Arc; - -use async_trait::async_trait; -use common_catalog::consts::MITO_ENGINE; -use common_meta::ident::TableIdent; -use common_meta::key::catalog_name::CatalogNameKey; -use common_meta::key::datanode_table::DatanodeTableValue; -use common_meta::key::schema_name::SchemaNameKey; -use common_meta::key::TableMetadataManagerRef; -use common_telemetry::{error, info, warn}; -use futures_util::TryStreamExt; -use metrics::increment_gauge; -use snafu::{ensure, OptionExt, ResultExt}; -use table::engine::manager::TableEngineManagerRef; -use table::engine::EngineContext; -use table::requests::OpenTableRequest; -use table::TableRef; -use tokio::sync::Mutex; - -use crate::error::{ - OpenTableSnafu, ParallelOpenTableSnafu, Result, TableEngineNotFoundSnafu, TableExistsSnafu, - TableMetadataManagerSnafu, TableNotFoundSnafu, UnimplementedSnafu, -}; -use crate::local::MemoryCatalogManager; -use crate::remote::region_alive_keeper::RegionAliveKeepers; -use crate::{ - handle_system_table_request, CatalogManager, DeregisterSchemaRequest, DeregisterTableRequest, - RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, -}; - -/// Catalog manager based on metasrv. -pub struct RemoteCatalogManager { - node_id: u64, - engine_manager: TableEngineManagerRef, - system_table_requests: Mutex>, - region_alive_keepers: Arc, - memory_catalog_manager: Arc, - table_metadata_manager: TableMetadataManagerRef, -} - -impl RemoteCatalogManager { - pub fn new( - engine_manager: TableEngineManagerRef, - node_id: u64, - region_alive_keepers: Arc, - table_metadata_manager: TableMetadataManagerRef, - ) -> Self { - Self { - engine_manager, - node_id, - system_table_requests: Default::default(), - region_alive_keepers, - memory_catalog_manager: MemoryCatalogManager::with_default_setup(), - table_metadata_manager, - } - } - - async fn initiate_catalogs(&self) -> Result<()> { - let tables = self - .table_metadata_manager - .datanode_table_manager() - .tables(self.node_id) - .try_collect::>() - .await - .context(TableMetadataManagerSnafu)?; - - let joins = tables - .into_iter() - .map(|datanode_table_value| { - let engine_manager = self.engine_manager.clone(); - let memory_catalog_manager = self.memory_catalog_manager.clone(); - let table_metadata_manager = self.table_metadata_manager.clone(); - let region_alive_keepers = self.region_alive_keepers.clone(); - common_runtime::spawn_bg(async move { - let table_id = datanode_table_value.table_id; - if let Err(e) = open_and_register_table( - engine_manager, - datanode_table_value, - memory_catalog_manager, - table_metadata_manager, - region_alive_keepers, - ) - .await - { - // Note that we don't return error here if table opened failed. This is because - // we don't want those broken tables to impede the startup of Datanode. - // However, this could be changed in the future. - error!(e; "Failed to open or register table, id = {table_id}") - } - }) - }) - .collect::>(); - - let _ = futures::future::try_join_all(joins) - .await - .context(ParallelOpenTableSnafu)?; - Ok(()) - } -} - -async fn open_and_register_table( - engine_manager: TableEngineManagerRef, - datanode_table_value: DatanodeTableValue, - memory_catalog_manager: Arc, - table_metadata_manager: TableMetadataManagerRef, - region_alive_keepers: Arc, -) -> Result<()> { - let context = EngineContext {}; - - let table_id = datanode_table_value.table_id; - let region_numbers = datanode_table_value.regions; - - let table_info_value = table_metadata_manager - .table_info_manager() - .get(table_id) - .await - .context(TableMetadataManagerSnafu)? - .context(TableNotFoundSnafu { - table_info: format!("table id: {table_id}"), - })?; - let table_info = &table_info_value.table_info; - let catalog_name = table_info.catalog_name.clone(); - let schema_name = table_info.schema_name.clone(); - let table_name = table_info.name.clone(); - - let request = OpenTableRequest { - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: table_name.clone(), - table_id, - region_numbers: region_numbers.clone(), - }; - let engine = - engine_manager - .engine(&table_info.meta.engine) - .context(TableEngineNotFoundSnafu { - engine_name: &table_info.meta.engine, - })?; - - let table_ident = TableIdent { - catalog: catalog_name, - schema: schema_name, - table: table_name, - table_id, - engine: table_info.meta.engine.clone(), - }; - - let table = engine - .open_table(&context, request) - .await - .with_context(|_| OpenTableSnafu { - table_info: table_ident.to_string(), - })? - .with_context(|| TableNotFoundSnafu { - table_info: table_ident.to_string(), - })?; - info!("Successfully opened table, {table_ident}"); - - if !memory_catalog_manager - .catalog_exist(&table_ident.catalog) - .await? - { - memory_catalog_manager.register_catalog_sync(table_ident.catalog.clone())?; - } - - if !memory_catalog_manager - .schema_exist(&table_ident.catalog, &table_ident.schema) - .await? - { - memory_catalog_manager.register_schema_sync(RegisterSchemaRequest { - catalog: table_ident.catalog.clone(), - schema: table_ident.schema.clone(), - })?; - } - - let request = RegisterTableRequest { - catalog: table_ident.catalog.clone(), - schema: table_ident.schema.clone(), - table_name: table_ident.table.clone(), - table_id, - table, - }; - let registered = - register_table(&memory_catalog_manager, ®ion_alive_keepers, request).await?; - ensure!( - registered, - TableExistsSnafu { - table: table_ident.to_string(), - } - ); - info!("Successfully registered table, {table_ident}"); - Ok(()) -} - -async fn register_table( - memory_catalog_manager: &Arc, - region_alive_keepers: &Arc, - request: RegisterTableRequest, -) -> Result { - let table = request.table.clone(); - - let registered = memory_catalog_manager.register_table_sync(request)?; - - if registered { - let table_info = table.table_info(); - let table_ident = TableIdent { - catalog: table_info.catalog_name.clone(), - schema: table_info.schema_name.clone(), - table: table_info.name.clone(), - table_id: table_info.table_id(), - engine: table_info.meta.engine.clone(), - }; - region_alive_keepers - .register_table(table_ident, table, memory_catalog_manager.clone()) - .await?; - } - - Ok(registered) -} - -#[async_trait] -impl CatalogManager for RemoteCatalogManager { - async fn start(&self) -> Result<()> { - self.initiate_catalogs().await?; - - let mut system_table_requests = self.system_table_requests.lock().await; - let engine = self - .engine_manager - .engine(MITO_ENGINE) - .context(TableEngineNotFoundSnafu { - engine_name: MITO_ENGINE, - })?; - handle_system_table_request(self, engine, &mut system_table_requests).await?; - info!("All system table opened"); - Ok(()) - } - - async fn register_table(&self, request: RegisterTableRequest) -> Result { - register_table( - &self.memory_catalog_manager, - &self.region_alive_keepers, - request, - ) - .await - } - - async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<()> { - let Some(table) = self - .memory_catalog_manager - .table(&request.catalog, &request.schema, &request.table_name) - .await? - else { - return Ok(()); - }; - - let table_info = table.table_info(); - let table_ident = TableIdent { - catalog: request.catalog.clone(), - schema: request.schema.clone(), - table: request.table_name.clone(), - table_id: table_info.ident.table_id, - engine: table_info.meta.engine.clone(), - }; - if let Some(keeper) = self - .region_alive_keepers - .deregister_table(&table_ident) - .await - { - warn!( - "Table {} is deregistered from region alive keepers", - keeper.table_ident(), - ); - } - - self.memory_catalog_manager.deregister_table(request).await - } - - async fn register_schema(&self, request: RegisterSchemaRequest) -> Result { - self.memory_catalog_manager.register_schema_sync(request) - } - - async fn deregister_schema(&self, _request: DeregisterSchemaRequest) -> Result { - UnimplementedSnafu { - operation: "deregister schema", - } - .fail() - } - - async fn rename_table(&self, request: RenameTableRequest) -> Result { - self.memory_catalog_manager.rename_table(request).await - } - - async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { - let catalog_name = request.create_table_request.catalog_name.clone(); - let schema_name = request.create_table_request.schema_name.clone(); - - let mut requests = self.system_table_requests.lock().await; - requests.push(request); - increment_gauge!( - crate::metrics::METRIC_CATALOG_MANAGER_TABLE_COUNT, - 1.0, - &[crate::metrics::db_label(&catalog_name, &schema_name)], - ); - Ok(()) - } - - async fn schema_exist(&self, catalog: &str, schema: &str) -> Result { - if !self.catalog_exist(catalog).await? { - return Ok(false); - } - - if self - .memory_catalog_manager - .schema_exist(catalog, schema) - .await? - { - return Ok(true); - } - - let remote_schema_exists = self - .table_metadata_manager - .schema_manager() - .exist(SchemaNameKey::new(catalog, schema)) - .await - .context(TableMetadataManagerSnafu)?; - // Create schema locally if remote schema exists. Since local schema is managed by memory - // catalog manager, creating a local schema is relatively cheap (just a HashMap). - // Besides, if this method ("schema_exist) is called, it's very likely that someone wants to - // create a table in this schema. We should create the schema now. - if remote_schema_exists - && self - .memory_catalog_manager - .register_schema(RegisterSchemaRequest { - catalog: catalog.to_string(), - schema: schema.to_string(), - }) - .await? - { - info!("register schema '{catalog}/{schema}' on demand"); - } - - Ok(remote_schema_exists) - } - - async fn table( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - ) -> Result> { - self.memory_catalog_manager - .table(catalog_name, schema_name, table_name) - .await - } - - async fn catalog_exist(&self, catalog: &str) -> Result { - if self.memory_catalog_manager.catalog_exist(catalog).await? { - return Ok(true); - } - - let key = CatalogNameKey::new(catalog); - let remote_catalog_exists = self - .table_metadata_manager - .catalog_manager() - .exist(key) - .await - .context(TableMetadataManagerSnafu)?; - - // Create catalog locally if remote catalog exists. Since local catalog is managed by memory - // catalog manager, creating a local catalog is relatively cheap (just a HashMap). - // Besides, if this method ("catalog_exist) is called, it's very likely that someone wants to - // create a table in this catalog. We should create the catalog now. - if remote_catalog_exists - && self - .memory_catalog_manager - .clone() - .register_catalog(catalog.to_string()) - .await? - { - info!("register catalog '{catalog}' on demand"); - } - - Ok(remote_catalog_exists) - } - - async fn table_exist(&self, catalog: &str, schema: &str, table: &str) -> Result { - if !self.catalog_exist(catalog).await? { - return Ok(false); - } - - if !self.schema_exist(catalog, schema).await? { - return Ok(false); - } - - self.memory_catalog_manager - .table_exist(catalog, schema, table) - .await - } - - async fn catalog_names(&self) -> Result> { - self.memory_catalog_manager.catalog_names().await - } - - async fn schema_names(&self, catalog_name: &str) -> Result> { - self.memory_catalog_manager.schema_names(catalog_name).await - } - - async fn table_names(&self, catalog_name: &str, schema_name: &str) -> Result> { - self.memory_catalog_manager - .table_names(catalog_name, schema_name) - .await - } - - async fn register_catalog(self: Arc, name: String) -> Result { - self.memory_catalog_manager.register_catalog_sync(name) - } - - fn as_any(&self) -> &dyn Any { - self - } -} diff --git a/src/catalog/src/remote/region_alive_keeper.rs b/src/catalog/src/remote/region_alive_keeper.rs deleted file mode 100644 index fb7c6b151f..0000000000 --- a/src/catalog/src/remote/region_alive_keeper.rs +++ /dev/null @@ -1,865 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; -use std::future::Future; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; - -use async_trait::async_trait; -use common_meta::error::InvalidProtoMsgSnafu; -use common_meta::heartbeat::handler::{ - HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, -}; -use common_meta::ident::TableIdent; -use common_meta::RegionIdent; -use common_telemetry::{debug, error, info, warn}; -use snafu::{OptionExt, ResultExt}; -use store_api::storage::{RegionId, RegionNumber}; -use table::engine::manager::TableEngineManagerRef; -use table::engine::{CloseTableResult, EngineContext, TableEngineRef}; -use table::metadata::TableId; -use table::requests::CloseTableRequest; -use table::TableRef; -use tokio::sync::{mpsc, oneshot, Mutex}; -use tokio::task::JoinHandle; -use tokio::time::{Duration, Instant}; - -use crate::error::{Result, TableEngineNotFoundSnafu}; -use crate::local::MemoryCatalogManager; -use crate::DeregisterTableRequest; - -/// [RegionAliveKeepers] manages all [RegionAliveKeeper] in a scope of tables. -pub struct RegionAliveKeepers { - table_engine_manager: TableEngineManagerRef, - keepers: Arc>>>, - heartbeat_interval_millis: u64, - started: AtomicBool, - - /// The epoch when [RegionAliveKeepers] is created. It's used to get a monotonically non-decreasing - /// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically - /// non-decreasing). The heartbeat request will carry the duration since this epoch, and the - /// duration acts like an "invariant point" for region's keep alive lease. - epoch: Instant, -} - -impl RegionAliveKeepers { - pub fn new( - table_engine_manager: TableEngineManagerRef, - heartbeat_interval_millis: u64, - ) -> Self { - Self { - table_engine_manager, - keepers: Arc::new(Mutex::new(HashMap::new())), - heartbeat_interval_millis, - started: AtomicBool::new(false), - epoch: Instant::now(), - } - } - - pub async fn find_keeper(&self, table_id: TableId) -> Option> { - self.keepers.lock().await.get(&table_id).cloned() - } - - pub async fn register_table( - &self, - table_ident: TableIdent, - table: TableRef, - catalog_manager: Arc, - ) -> Result<()> { - let table_id = table_ident.table_id; - let keeper = self.find_keeper(table_id).await; - if keeper.is_some() { - return Ok(()); - } - - let table_engine = self - .table_engine_manager - .engine(&table_ident.engine) - .context(TableEngineNotFoundSnafu { - engine_name: &table_ident.engine, - })?; - - let keeper = Arc::new(RegionAliveKeeper::new( - table_engine, - catalog_manager, - table_ident.clone(), - self.heartbeat_interval_millis, - )); - for r in table.table_info().meta.region_numbers.iter() { - keeper.register_region(*r).await; - } - - let mut keepers = self.keepers.lock().await; - let _ = keepers.insert(table_id, keeper.clone()); - - if self.started.load(Ordering::Relaxed) { - keeper.start().await; - - info!("RegionAliveKeeper for table {table_ident} is started!"); - } else { - info!("RegionAliveKeeper for table {table_ident} is registered but not started yet!"); - } - Ok(()) - } - - pub async fn deregister_table( - &self, - table_ident: &TableIdent, - ) -> Option> { - let table_id = table_ident.table_id; - self.keepers.lock().await.remove(&table_id).map(|x| { - info!("Deregister RegionAliveKeeper for table {table_ident}"); - x - }) - } - - pub async fn register_region(&self, region_ident: &RegionIdent) { - let table_id = region_ident.table_ident.table_id; - let Some(keeper) = self.find_keeper(table_id).await else { - // Alive keeper could be affected by lagging msg, just warn and ignore. - warn!("Alive keeper for region {region_ident} is not found!"); - return; - }; - keeper.register_region(region_ident.region_number).await - } - - pub async fn deregister_region(&self, region_ident: &RegionIdent) { - let table_id = region_ident.table_ident.table_id; - let Some(keeper) = self.find_keeper(table_id).await else { - // Alive keeper could be affected by lagging msg, just warn and ignore. - warn!("Alive keeper for region {region_ident} is not found!"); - return; - }; - let _ = keeper.deregister_region(region_ident.region_number).await; - } - - pub async fn start(&self) { - let keepers = self.keepers.lock().await; - for keeper in keepers.values() { - keeper.start().await; - } - self.started.store(true, Ordering::Relaxed); - - info!( - "RegionAliveKeepers for tables {:?} are started!", - keepers.keys().map(|x| x.to_string()).collect::>(), - ); - } - - pub fn epoch(&self) -> Instant { - self.epoch - } -} - -#[async_trait] -impl HeartbeatResponseHandler for RegionAliveKeepers { - fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { - ctx.response.region_lease.is_some() - } - - async fn handle( - &self, - ctx: &mut HeartbeatResponseHandlerContext, - ) -> common_meta::error::Result { - let region_lease = ctx - .response - .region_lease - .as_ref() - .context(InvalidProtoMsgSnafu { - err_msg: "'region_lease' is missing in heartbeat response", - })?; - let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch); - let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds); - for raw_region_id in ®ion_lease.region_ids { - let region_id = RegionId::from_u64(*raw_region_id); - let table_id = region_id.table_id(); - let Some(keeper) = self.keepers.lock().await.get(&table_id).cloned() else { - // Alive keeper could be affected by lagging msg, just warn and ignore. - warn!("Alive keeper for table {table_id} is not found!"); - continue; - }; - - // TODO(jeremy): refactor this, use region_id - keeper - .keep_lived(vec![region_id.region_number()], deadline) - .await; - } - Ok(HandleControl::Continue) - } -} - -/// [RegionAliveKeeper] starts a countdown for each region in a table. When deadline is reached, -/// the region will be closed. -/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its -/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this -/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to -/// countdown. -pub struct RegionAliveKeeper { - catalog_manager: Arc, - table_engine: TableEngineRef, - table_ident: TableIdent, - countdown_task_handles: Arc>>>, - heartbeat_interval_millis: u64, - started: AtomicBool, -} - -impl RegionAliveKeeper { - fn new( - table_engine: TableEngineRef, - catalog_manager: Arc, - table_ident: TableIdent, - heartbeat_interval_millis: u64, - ) -> Self { - Self { - catalog_manager, - table_engine, - table_ident, - countdown_task_handles: Arc::new(Mutex::new(HashMap::new())), - heartbeat_interval_millis, - started: AtomicBool::new(false), - } - } - - async fn find_handle(&self, region: &RegionNumber) -> Option> { - self.countdown_task_handles - .lock() - .await - .get(region) - .cloned() - } - - async fn register_region(&self, region: RegionNumber) { - if self.find_handle(®ion).await.is_some() { - return; - } - - let countdown_task_handles = Arc::downgrade(&self.countdown_task_handles); - let on_task_finished = async move { - if let Some(x) = countdown_task_handles.upgrade() { - let _ = x.lock().await.remove(®ion); - } // Else the countdown task handles map could be dropped because the keeper is dropped. - }; - let catalog_manager = self.catalog_manager.clone(); - let ident = self.table_ident.clone(); - let handle = Arc::new(CountdownTaskHandle::new( - self.table_engine.clone(), - self.table_ident.clone(), - region, - move |result: Option| { - if matches!(result, Some(CloseTableResult::Released(_))) { - let result = catalog_manager.deregister_table_sync(DeregisterTableRequest { - catalog: ident.catalog.to_string(), - schema: ident.schema.to_string(), - table_name: ident.table.to_string(), - }); - - info!( - "Deregister table: {} after countdown task finished, result: {result:?}", - ident.table_id - ); - } else { - debug!("Countdown task returns: {result:?}"); - } - on_task_finished - }, - )); - - let mut handles = self.countdown_task_handles.lock().await; - let _ = handles.insert(region, handle.clone()); - - if self.started.load(Ordering::Relaxed) { - handle.start(self.heartbeat_interval_millis).await; - - info!( - "Region alive countdown for region {region} in table {} is started!", - self.table_ident - ); - } else { - info!( - "Region alive countdown for region {region} in table {} is registered but not started yet!", - self.table_ident - ); - } - } - - async fn deregister_region(&self, region: RegionNumber) -> Option> { - self.countdown_task_handles - .lock() - .await - .remove(®ion) - .map(|x| { - info!( - "Deregister alive countdown for region {region} in table {}", - self.table_ident - ); - x - }) - } - - async fn start(&self) { - let handles = self.countdown_task_handles.lock().await; - for handle in handles.values() { - handle.start(self.heartbeat_interval_millis).await; - } - - self.started.store(true, Ordering::Relaxed); - info!( - "Region alive countdowns for regions {:?} in table {} are started!", - handles.keys().copied().collect::>(), - self.table_ident - ); - } - - async fn keep_lived(&self, designated_regions: Vec, deadline: Instant) { - for region in designated_regions { - if let Some(handle) = self.find_handle(®ion).await { - handle.reset_deadline(deadline).await; - } - // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it. - } - } - - pub async fn deadline(&self, region: RegionNumber) -> Option { - let mut deadline = None; - if let Some(handle) = self.find_handle(®ion).await { - let (s, r) = oneshot::channel(); - if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() { - deadline = r.await.ok() - } - } - deadline - } - - pub fn table_ident(&self) -> &TableIdent { - &self.table_ident - } -} - -#[derive(Debug)] -enum CountdownCommand { - Start(u64), - Reset(Instant), - Deadline(oneshot::Sender), -} - -struct CountdownTaskHandle { - tx: mpsc::Sender, - handler: JoinHandle<()>, - table_ident: TableIdent, - region: RegionNumber, -} - -impl CountdownTaskHandle { - /// Creates a new [CountdownTaskHandle] and starts the countdown task. - /// # Params - /// - `on_task_finished`: a callback to be invoked when the task is finished. Note that it will not - /// be invoked if the task is cancelled (by dropping the handle). This is because we want something - /// meaningful to be done when the task is finished, e.g. deregister the handle from the map. - /// While dropping the handle does not necessarily mean the task is finished. - fn new( - table_engine: TableEngineRef, - table_ident: TableIdent, - region: RegionNumber, - on_task_finished: impl FnOnce(Option) -> Fut + Send + 'static, - ) -> Self - where - Fut: Future + Send, - { - let (tx, rx) = mpsc::channel(1024); - - let mut countdown_task = CountdownTask { - table_engine, - table_ident: table_ident.clone(), - region, - rx, - }; - let handler = common_runtime::spawn_bg(async move { - let result = countdown_task.run().await; - on_task_finished(result).await; - }); - - Self { - tx, - handler, - table_ident, - region, - } - } - - async fn start(&self, heartbeat_interval_millis: u64) { - if let Err(e) = self - .tx - .send(CountdownCommand::Start(heartbeat_interval_millis)) - .await - { - warn!( - "Failed to start region alive keeper countdown: {e}. \ - Maybe the task is stopped due to region been closed." - ); - } - } - - async fn reset_deadline(&self, deadline: Instant) { - if let Err(e) = self.tx.send(CountdownCommand::Reset(deadline)).await { - warn!( - "Failed to reset region alive keeper deadline: {e}. \ - Maybe the task is stopped due to region been closed." - ); - } - } -} - -impl Drop for CountdownTaskHandle { - fn drop(&mut self) { - debug!( - "Aborting region alive countdown task for region {} in table {}", - self.region, self.table_ident, - ); - self.handler.abort(); - } -} - -struct CountdownTask { - table_engine: TableEngineRef, - table_ident: TableIdent, - region: RegionNumber, - rx: mpsc::Receiver, -} - -impl CountdownTask { - // returns true if - async fn run(&mut self) -> Option { - // 30 years. See `Instant::far_future`. - let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); - - // Make sure the alive countdown is not gonna happen before heartbeat task is started (the - // "start countdown" command will be sent from heartbeat task). - let countdown = tokio::time::sleep_until(far_future); - tokio::pin!(countdown); - - let region = &self.region; - let table_ident = &self.table_ident; - loop { - tokio::select! { - command = self.rx.recv() => { - match command { - Some(CountdownCommand::Start(heartbeat_interval_millis)) => { - // Set first deadline in 4 heartbeats (roughly after 20 seconds from now if heartbeat - // interval is set to default 5 seconds), to make Datanode and Metasrv more tolerable to - // network or other jitters during startup. - let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4; - countdown.set(tokio::time::sleep_until(first_deadline)); - }, - Some(CountdownCommand::Reset(deadline)) => { - if countdown.deadline() < deadline { - debug!( - "Reset deadline of region {region} of table {table_ident} to approximately {} seconds later", - (deadline - Instant::now()).as_secs_f32(), - ); - countdown.set(tokio::time::sleep_until(deadline)); - } - // Else the countdown could be either: - // - not started yet; - // - during startup protection; - // - received a lagging heartbeat message. - // All can be safely ignored. - }, - None => { - info!( - "The handle of countdown task for region {region} of table {table_ident} \ - is dropped, RegionAliveKeeper out." - ); - break; - }, - Some(CountdownCommand::Deadline(tx)) => { - let _ = tx.send(countdown.deadline()); - } - } - } - () = &mut countdown => { - let result = self.close_region().await; - warn!( - "Region {region} of table {table_ident} is closed, result: {result:?}. \ - RegionAliveKeeper out.", - ); - return Some(result); - } - } - } - None - } - - async fn close_region(&self) -> CloseTableResult { - let ctx = EngineContext::default(); - let region = self.region; - let table_ident = &self.table_ident; - loop { - let request = CloseTableRequest { - catalog_name: table_ident.catalog.clone(), - schema_name: table_ident.schema.clone(), - table_name: table_ident.table.clone(), - table_id: table_ident.table_id, - region_numbers: vec![region], - flush: true, - }; - match self.table_engine.close_table(&ctx, request).await { - Ok(result) => return result, - // If region is failed to close, immediately retry. Maybe we should panic instead? - Err(e) => error!(e; - "Failed to close region {region} of table {table_ident}. \ - For the integrity of data, retry closing and retry without wait.", - ), - } - } - } -} - -#[cfg(test)] -mod test { - use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::Arc; - - use api::v1::meta::{HeartbeatResponse, RegionLease}; - use common_meta::heartbeat::mailbox::HeartbeatMailbox; - use datatypes::schema::RawSchema; - use table::engine::manager::MemoryTableEngineManager; - use table::engine::TableEngine; - use table::requests::{CreateTableRequest, TableOptions}; - use table::test_util::EmptyTable; - - use super::*; - use crate::remote::mock::MockTableEngine; - - async fn prepare_keepers() -> (TableIdent, RegionAliveKeepers) { - let table_engine = Arc::new(MockTableEngine::default()); - let table_engine_manager = Arc::new(MemoryTableEngineManager::new(table_engine)); - let keepers = RegionAliveKeepers::new(table_engine_manager, 5000); - - let catalog = "my_catalog"; - let schema = "my_schema"; - let table = "my_table"; - let table_ident = TableIdent { - catalog: catalog.to_string(), - schema: schema.to_string(), - table: table.to_string(), - table_id: 1, - engine: "MockTableEngine".to_string(), - }; - let table = EmptyTable::table(CreateTableRequest { - id: 1, - catalog_name: catalog.to_string(), - schema_name: schema.to_string(), - table_name: table.to_string(), - desc: None, - schema: RawSchema { - column_schemas: vec![], - timestamp_index: None, - version: 0, - }, - region_numbers: vec![1, 2, 3], - primary_key_indices: vec![], - create_if_not_exists: false, - table_options: TableOptions::default(), - engine: "MockTableEngine".to_string(), - }); - let catalog_manager = MemoryCatalogManager::new_with_table(table.clone()); - keepers - .register_table(table_ident.clone(), table, catalog_manager) - .await - .unwrap(); - assert!(keepers - .keepers - .lock() - .await - .contains_key(&table_ident.table_id)); - - (table_ident, keepers) - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_handle_heartbeat_response() { - let (table_ident, keepers) = prepare_keepers().await; - - keepers.start().await; - let startup_protection_until = Instant::now() + Duration::from_secs(21); - - let duration_since_epoch = (Instant::now() - keepers.epoch).as_millis() as _; - let lease_seconds = 100; - let response = HeartbeatResponse { - region_lease: Some(RegionLease { - region_ids: vec![ - RegionId::new(table_ident.table_id, 1).as_u64(), - RegionId::new(table_ident.table_id, 3).as_u64(), - ], // Not extending region 2's lease time. - duration_since_epoch, - lease_seconds, - }), - ..Default::default() - }; - let keep_alive_until = keepers.epoch - + Duration::from_millis(duration_since_epoch) - + Duration::from_secs(lease_seconds); - - let (tx, _) = mpsc::channel(8); - let mailbox = Arc::new(HeartbeatMailbox::new(tx)); - let mut ctx = HeartbeatResponseHandlerContext::new(mailbox, response); - - assert!(keepers.handle(&mut ctx).await.unwrap() == HandleControl::Continue); - - // sleep to wait for background task spawned in `handle` - tokio::time::sleep(Duration::from_secs(1)).await; - - async fn test( - keeper: &Arc, - region_number: RegionNumber, - startup_protection_until: Instant, - keep_alive_until: Instant, - is_kept_live: bool, - ) { - let deadline = keeper.deadline(region_number).await.unwrap(); - if is_kept_live { - assert!(deadline > startup_protection_until && deadline == keep_alive_until); - } else { - assert!(deadline <= startup_protection_until); - } - } - - let keeper = &keepers - .keepers - .lock() - .await - .get(&table_ident.table_id) - .cloned() - .unwrap(); - - // Test region 1 and 3 is kept lived. Their deadlines are updated to desired instant. - test(keeper, 1, startup_protection_until, keep_alive_until, true).await; - test(keeper, 3, startup_protection_until, keep_alive_until, true).await; - - // Test region 2 is not kept lived. It's deadline is not updated: still during startup protection period. - test(keeper, 2, startup_protection_until, keep_alive_until, false).await; - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_region_alive_keepers() { - let (table_ident, keepers) = prepare_keepers().await; - - keepers - .register_region(&RegionIdent { - cluster_id: 1, - datanode_id: 1, - table_ident: table_ident.clone(), - region_number: 4, - }) - .await; - - keepers.start().await; - for keeper in keepers.keepers.lock().await.values() { - let regions = { - let handles = keeper.countdown_task_handles.lock().await; - handles.keys().copied().collect::>() - }; - for region in regions { - // assert countdown tasks are started - let deadline = keeper.deadline(region).await.unwrap(); - assert!(deadline <= Instant::now() + Duration::from_secs(20)); - } - } - - keepers - .deregister_region(&RegionIdent { - cluster_id: 1, - datanode_id: 1, - table_ident: table_ident.clone(), - region_number: 1, - }) - .await; - let mut regions = keepers - .find_keeper(table_ident.table_id) - .await - .unwrap() - .countdown_task_handles - .lock() - .await - .keys() - .copied() - .collect::>(); - regions.sort(); - assert_eq!(regions, vec![2, 3, 4]); - - let keeper = keepers.deregister_table(&table_ident).await.unwrap(); - assert!(Arc::try_unwrap(keeper).is_ok(), "keeper is not dropped"); - assert!(keepers.keepers.lock().await.is_empty()); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_region_alive_keeper() { - let table_engine = Arc::new(MockTableEngine::default()); - let table_ident = TableIdent { - catalog: "my_catalog".to_string(), - schema: "my_schema".to_string(), - table: "my_table".to_string(), - table_id: 1024, - engine: "mito".to_string(), - }; - let catalog_manager = MemoryCatalogManager::with_default_setup(); - let keeper = RegionAliveKeeper::new(table_engine, catalog_manager, table_ident, 1000); - - let region = 1; - assert!(keeper.find_handle(®ion).await.is_none()); - keeper.register_region(region).await; - let _ = keeper.find_handle(®ion).await.unwrap(); - - let ten_seconds_later = || Instant::now() + Duration::from_secs(10); - - keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await; - assert!(keeper.find_handle(&2).await.is_none()); - assert!(keeper.find_handle(&3).await.is_none()); - - let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29); - // assert if keeper is not started, keep_lived is of no use - assert!(keeper.deadline(region).await.unwrap() > far_future); - - keeper.start().await; - keeper.keep_lived(vec![1, 2, 3], ten_seconds_later()).await; - // assert keep_lived works if keeper is started - assert!(keeper.deadline(region).await.unwrap() <= ten_seconds_later()); - - let handle = keeper.deregister_region(region).await.unwrap(); - assert!(Arc::try_unwrap(handle).is_ok(), "handle is not dropped"); - assert!(keeper.find_handle(®ion).await.is_none()); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_countdown_task_handle() { - let table_engine = Arc::new(MockTableEngine::default()); - let table_ident = TableIdent { - catalog: "my_catalog".to_string(), - schema: "my_schema".to_string(), - table: "my_table".to_string(), - table_id: 1024, - engine: "mito".to_string(), - }; - let finished = Arc::new(AtomicBool::new(false)); - let finished_clone = finished.clone(); - let handle = CountdownTaskHandle::new( - table_engine.clone(), - table_ident.clone(), - 1, - |_| async move { finished_clone.store(true, Ordering::Relaxed) }, - ); - let tx = handle.tx.clone(); - - // assert countdown task is running - tx.send(CountdownCommand::Start(5000)).await.unwrap(); - assert!(!finished.load(Ordering::Relaxed)); - - drop(handle); - tokio::time::sleep(Duration::from_secs(1)).await; - - // assert countdown task is stopped - assert!(tx - .try_send(CountdownCommand::Reset( - Instant::now() + Duration::from_secs(10) - )) - .is_err()); - // assert `on_task_finished` is not called (because the task is aborted by the handle's drop) - assert!(!finished.load(Ordering::Relaxed)); - - let finished = Arc::new(AtomicBool::new(false)); - let finished_clone = finished.clone(); - let handle = CountdownTaskHandle::new(table_engine, table_ident, 1, |_| async move { - finished_clone.store(true, Ordering::Relaxed) - }); - handle.tx.send(CountdownCommand::Start(100)).await.unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - // assert `on_task_finished` is called when task is finished normally - assert!(finished.load(Ordering::Relaxed)); - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_countdown_task_run() { - let ctx = &EngineContext::default(); - let catalog = "my_catalog"; - let schema = "my_schema"; - let table = "my_table"; - let table_id = 1; - let request = CreateTableRequest { - id: table_id, - catalog_name: catalog.to_string(), - schema_name: schema.to_string(), - table_name: table.to_string(), - desc: None, - schema: RawSchema { - column_schemas: vec![], - timestamp_index: None, - version: 0, - }, - region_numbers: vec![], - primary_key_indices: vec![], - create_if_not_exists: false, - table_options: TableOptions::default(), - engine: "mito".to_string(), - }; - - let table_engine = Arc::new(MockTableEngine::default()); - let _ = table_engine.create_table(ctx, request).await.unwrap(); - - let table_ident = TableIdent { - catalog: catalog.to_string(), - schema: schema.to_string(), - table: table.to_string(), - table_id, - engine: "mito".to_string(), - }; - let (tx, rx) = mpsc::channel(10); - let mut task = CountdownTask { - table_engine: table_engine.clone(), - table_ident, - region: 1, - rx, - }; - let _handle = common_runtime::spawn_bg(async move { - task.run().await; - }); - - async fn deadline(tx: &mpsc::Sender) -> Instant { - let (s, r) = oneshot::channel(); - tx.send(CountdownCommand::Deadline(s)).await.unwrap(); - r.await.unwrap() - } - - // if countdown task is not started, its deadline is set to far future - assert!(deadline(&tx).await > Instant::now() + Duration::from_secs(86400 * 365 * 29)); - - // start countdown in 250ms * 4 = 1s - tx.send(CountdownCommand::Start(250)).await.unwrap(); - // assert deadline is correctly set - assert!(deadline(&tx).await <= Instant::now() + Duration::from_secs(1)); - - // reset countdown in 1.5s - tx.send(CountdownCommand::Reset( - Instant::now() + Duration::from_millis(1500), - )) - .await - .unwrap(); - - // assert the table is closed after deadline is reached - assert!(table_engine.table_exists(ctx, table_id)); - // spare 500ms for the task to close the table - tokio::time::sleep(Duration::from_millis(2000)).await; - assert!(!table_engine.table_exists(ctx, table_id)); - } -} diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs deleted file mode 100644 index d7d943c467..0000000000 --- a/src/catalog/tests/remote_catalog_tests.rs +++ /dev/null @@ -1,453 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![feature(assert_matches)] - -#[cfg(test)] -mod tests { - use std::assert_matches::assert_matches; - use std::collections::HashSet; - use std::sync::Arc; - use std::time::Duration; - - use catalog::remote::mock::MockTableEngine; - use catalog::remote::region_alive_keeper::RegionAliveKeepers; - use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; - use catalog::{CatalogManager, RegisterSchemaRequest, RegisterTableRequest}; - use common_catalog::consts::{ - DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, INFORMATION_SCHEMA_NAME, MITO_ENGINE, - }; - use common_meta::helper::CatalogValue; - use common_meta::ident::TableIdent; - use common_meta::key::catalog_name::CatalogNameKey; - use common_meta::key::TableMetadataManager; - use common_meta::kv_backend::memory::MemoryKvBackend; - use common_meta::kv_backend::KvBackend; - use common_meta::rpc::store::{CompareAndPutRequest, PutRequest}; - use datatypes::schema::RawSchema; - use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef}; - use table::engine::{EngineContext, TableEngineRef}; - use table::requests::CreateTableRequest; - use table::test_util::EmptyTable; - use tokio::time::Instant; - - struct TestingComponents { - catalog_manager: Arc, - table_engine_manager: TableEngineManagerRef, - region_alive_keepers: Arc, - } - - impl TestingComponents { - fn table_engine(&self) -> TableEngineRef { - self.table_engine_manager.engine(MITO_ENGINE).unwrap() - } - } - - #[tokio::test] - async fn test_cached_backend() { - let backend = CachedMetaKvBackend::wrap(Arc::new(MemoryKvBackend::default())); - - let default_catalog_key = CatalogNameKey::new(DEFAULT_CATALOG_NAME).to_string(); - - let req = PutRequest::new() - .with_key(default_catalog_key.as_bytes()) - .with_value(CatalogValue.as_bytes().unwrap()); - backend.put(req).await.unwrap(); - - let ret = backend.get(b"__catalog_name/greptime").await.unwrap(); - let _ = ret.unwrap(); - - let req = CompareAndPutRequest::new() - .with_key(b"__catalog_name/greptime".to_vec()) - .with_expect(CatalogValue.as_bytes().unwrap()) - .with_value(b"123".to_vec()); - let _ = backend.compare_and_put(req).await.unwrap(); - - let ret = backend.get(b"__catalog_name/greptime").await.unwrap(); - assert_eq!(b"123", ret.as_ref().unwrap().value.as_slice()); - - let req = PutRequest::new() - .with_key(b"__catalog_name/greptime".to_vec()) - .with_value(b"1234".to_vec()); - let _ = backend.put(req).await; - - let ret = backend.get(b"__catalog_name/greptime").await.unwrap(); - assert_eq!(b"1234", ret.unwrap().value.as_slice()); - - backend - .delete(b"__catalog_name/greptime", false) - .await - .unwrap(); - - let ret = backend.get(b"__catalog_name/greptime").await.unwrap(); - assert!(ret.is_none()); - } - - async fn prepare_components(node_id: u64) -> TestingComponents { - let backend = Arc::new(MemoryKvBackend::default()); - - let req = PutRequest::new() - .with_key(b"__catalog_name/greptime".to_vec()) - .with_value(b"".to_vec()); - backend.put(req).await.unwrap(); - - let req = PutRequest::new() - .with_key(b"__schema_name/greptime-public".to_vec()) - .with_value(b"".to_vec()); - backend.put(req).await.unwrap(); - - let cached_backend = Arc::new(CachedMetaKvBackend::wrap(backend)); - - let table_engine = Arc::new(MockTableEngine::default()); - let engine_manager = Arc::new(MemoryTableEngineManager::alias( - MITO_ENGINE.to_string(), - table_engine, - )); - - let region_alive_keepers = Arc::new(RegionAliveKeepers::new(engine_manager.clone(), 5000)); - - let catalog_manager = RemoteCatalogManager::new( - engine_manager.clone(), - node_id, - region_alive_keepers.clone(), - Arc::new(TableMetadataManager::new(cached_backend)), - ); - catalog_manager.start().await.unwrap(); - - TestingComponents { - catalog_manager: Arc::new(catalog_manager), - table_engine_manager: engine_manager, - region_alive_keepers, - } - } - - #[tokio::test] - async fn test_remote_catalog_default() { - common_telemetry::init_default_ut_logging(); - let node_id = 42; - let TestingComponents { - catalog_manager, .. - } = prepare_components(node_id).await; - assert_eq!( - vec![DEFAULT_CATALOG_NAME.to_string()], - catalog_manager.catalog_names().await.unwrap() - ); - - let mut schema_names = catalog_manager - .schema_names(DEFAULT_CATALOG_NAME) - .await - .unwrap(); - schema_names.sort_unstable(); - assert_eq!( - vec![ - INFORMATION_SCHEMA_NAME.to_string(), - DEFAULT_SCHEMA_NAME.to_string() - ], - schema_names - ); - } - - #[tokio::test] - async fn test_remote_catalog_register_nonexistent() { - common_telemetry::init_default_ut_logging(); - let node_id = 42; - let components = prepare_components(node_id).await; - - // register a new table with an nonexistent catalog - let catalog_name = "nonexistent_catalog".to_string(); - let schema_name = "nonexistent_schema".to_string(); - let table_name = "fail_table".to_string(); - // this schema has no effect - let table_schema = RawSchema::new(vec![]); - let table = components - .table_engine() - .create_table( - &EngineContext {}, - CreateTableRequest { - id: 1, - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: table_name.clone(), - desc: None, - schema: table_schema, - region_numbers: vec![0], - primary_key_indices: vec![], - create_if_not_exists: false, - table_options: Default::default(), - engine: MITO_ENGINE.to_string(), - }, - ) - .await - .unwrap(); - let reg_req = RegisterTableRequest { - catalog: catalog_name, - schema: schema_name, - table_name, - table_id: 1, - table, - }; - let res = components.catalog_manager.register_table(reg_req).await; - - // because nonexistent_catalog does not exist yet. - assert_matches!( - res.err().unwrap(), - catalog::error::Error::CatalogNotFound { .. } - ); - } - - #[tokio::test] - async fn test_register_table() { - let node_id = 42; - let components = prepare_components(node_id).await; - let mut schema_names = components - .catalog_manager - .schema_names(DEFAULT_CATALOG_NAME) - .await - .unwrap(); - schema_names.sort_unstable(); - assert_eq!( - vec![ - INFORMATION_SCHEMA_NAME.to_string(), - DEFAULT_SCHEMA_NAME.to_string(), - ], - schema_names - ); - - // register a new table with an nonexistent catalog - let catalog_name = DEFAULT_CATALOG_NAME.to_string(); - let schema_name = DEFAULT_SCHEMA_NAME.to_string(); - let table_name = "test_table".to_string(); - let table_id = 1; - // this schema has no effect - let table_schema = RawSchema::new(vec![]); - let table = components - .table_engine() - .create_table( - &EngineContext {}, - CreateTableRequest { - id: table_id, - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: table_name.clone(), - desc: None, - schema: table_schema, - region_numbers: vec![0], - primary_key_indices: vec![], - create_if_not_exists: false, - table_options: Default::default(), - engine: MITO_ENGINE.to_string(), - }, - ) - .await - .unwrap(); - let reg_req = RegisterTableRequest { - catalog: catalog_name, - schema: schema_name, - table_name: table_name.clone(), - table_id, - table, - }; - assert!(components - .catalog_manager - .register_table(reg_req) - .await - .unwrap()); - assert_eq!( - vec![table_name], - components - .catalog_manager - .table_names(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) - .await - .unwrap() - ); - } - - #[tokio::test] - async fn test_register_catalog_schema_table() { - let node_id = 42; - let components = prepare_components(node_id).await; - - let catalog_name = "test_catalog".to_string(); - let schema_name = "nonexistent_schema".to_string(); - - // register catalog to catalog manager - assert!(components - .catalog_manager - .clone() - .register_catalog(catalog_name.clone()) - .await - .is_ok()); - assert_eq!( - HashSet::::from_iter(vec![ - DEFAULT_CATALOG_NAME.to_string(), - catalog_name.clone() - ]), - HashSet::from_iter(components.catalog_manager.catalog_names().await.unwrap()) - ); - - let table_to_register = components - .table_engine() - .create_table( - &EngineContext {}, - CreateTableRequest { - id: 2, - catalog_name: catalog_name.clone(), - schema_name: schema_name.clone(), - table_name: "".to_string(), - desc: None, - schema: RawSchema::new(vec![]), - region_numbers: vec![0], - primary_key_indices: vec![], - create_if_not_exists: false, - table_options: Default::default(), - engine: MITO_ENGINE.to_string(), - }, - ) - .await - .unwrap(); - - let reg_req = RegisterTableRequest { - catalog: catalog_name.clone(), - schema: schema_name.clone(), - table_name: " fail_table".to_string(), - table_id: 2, - table: table_to_register, - }; - // this register will fail since schema does not exist yet - assert_matches!( - components - .catalog_manager - .register_table(reg_req.clone()) - .await - .unwrap_err(), - catalog::error::Error::SchemaNotFound { .. } - ); - - let register_schema_request = RegisterSchemaRequest { - catalog: catalog_name.to_string(), - schema: schema_name.to_string(), - }; - assert!(components - .catalog_manager - .register_schema(register_schema_request) - .await - .expect("Register schema should not fail")); - assert!(components - .catalog_manager - .register_table(reg_req) - .await - .unwrap()); - - assert_eq!( - HashSet::from([schema_name.clone(), INFORMATION_SCHEMA_NAME.to_string()]), - components - .catalog_manager - .schema_names(&catalog_name) - .await - .unwrap() - .into_iter() - .collect() - ) - } - - #[tokio::test] - async fn test_register_table_before_and_after_region_alive_keeper_started() { - let components = prepare_components(42).await; - let catalog_manager = &components.catalog_manager; - let region_alive_keepers = &components.region_alive_keepers; - - let table_before = TableIdent { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: "table_before".to_string(), - table_id: 1, - engine: MITO_ENGINE.to_string(), - }; - let request = RegisterTableRequest { - catalog: table_before.catalog.clone(), - schema: table_before.schema.clone(), - table_name: table_before.table.clone(), - table_id: table_before.table_id, - table: EmptyTable::table(CreateTableRequest { - id: table_before.table_id, - catalog_name: table_before.catalog.clone(), - schema_name: table_before.schema.clone(), - table_name: table_before.table.clone(), - desc: None, - schema: RawSchema::new(vec![]), - region_numbers: vec![0], - primary_key_indices: vec![], - create_if_not_exists: false, - table_options: Default::default(), - engine: MITO_ENGINE.to_string(), - }), - }; - assert!(catalog_manager.register_table(request).await.unwrap()); - - let keeper = region_alive_keepers - .find_keeper(table_before.table_id) - .await - .unwrap(); - let deadline = keeper.deadline(0).await.unwrap(); - let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 29); - // assert region alive countdown is not started - assert!(deadline > far_future); - - region_alive_keepers.start().await; - - let table_after = TableIdent { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table: "table_after".to_string(), - table_id: 2, - engine: MITO_ENGINE.to_string(), - }; - let request = RegisterTableRequest { - catalog: table_after.catalog.clone(), - schema: table_after.schema.clone(), - table_name: table_after.table.clone(), - table_id: table_after.table_id, - table: EmptyTable::table(CreateTableRequest { - id: table_after.table_id, - catalog_name: table_after.catalog.clone(), - schema_name: table_after.schema.clone(), - table_name: table_after.table.clone(), - desc: None, - schema: RawSchema::new(vec![]), - region_numbers: vec![0], - primary_key_indices: vec![], - create_if_not_exists: false, - table_options: Default::default(), - engine: MITO_ENGINE.to_string(), - }), - }; - assert!(catalog_manager.register_table(request).await.unwrap()); - - let keeper = region_alive_keepers - .find_keeper(table_after.table_id) - .await - .unwrap(); - let deadline = keeper.deadline(0).await.unwrap(); - // assert countdown is started for the table registered after [RegionAliveKeepers] started - assert!(deadline <= Instant::now() + Duration::from_secs(20)); - - let keeper = region_alive_keepers - .find_keeper(table_before.table_id) - .await - .unwrap(); - let deadline = keeper.deadline(0).await.unwrap(); - // assert countdown is started for the table registered before [RegionAliveKeepers] started, too - assert!(deadline <= Instant::now() + Duration::from_secs(20)); - } -} diff --git a/src/datanode/src/alive_keeper.rs b/src/datanode/src/alive_keeper.rs new file mode 100644 index 0000000000..37d5ee0985 --- /dev/null +++ b/src/datanode/src/alive_keeper.rs @@ -0,0 +1,467 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use async_trait::async_trait; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_meta::error::InvalidProtoMsgSnafu; +use common_meta::heartbeat::handler::{ + HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, +}; +use common_telemetry::{debug, error, info, trace, warn}; +use snafu::OptionExt; +use store_api::region_request::{RegionCloseRequest, RegionRequest}; +use store_api::storage::RegionId; +#[cfg(test)] +use tokio::sync::oneshot; +use tokio::sync::{mpsc, Mutex}; +use tokio::task::JoinHandle; +use tokio::time::{Duration, Instant}; + +use crate::region_server::RegionServer; + +const MAX_CLOSE_RETRY_TIMES: usize = 10; + +/// [RegionAliveKeeper] manages all [CountdownTaskHandle]s. +/// +/// [RegionAliveKeeper] starts a [CountdownTask] for each region. When deadline is reached, +/// the region will be closed. +/// The deadline is controlled by Metasrv. It works like "lease" for regions: a Datanode submits its +/// opened regions to Metasrv, in heartbeats. If Metasrv decides some region could be resided in this +/// Datanode, it will "extend" the region's "lease", with a deadline for [RegionAliveKeeper] to +/// countdown. +pub struct RegionAliveKeeper { + region_server: RegionServer, + tasks: Arc>>>, + heartbeat_interval_millis: u64, + started: AtomicBool, + + /// The epoch when [RegionAliveKeepers] is created. It's used to get a monotonically non-decreasing + /// elapsed time when submitting heartbeats to Metasrv (because [Instant] is monotonically + /// non-decreasing). The heartbeat request will carry the duration since this epoch, and the + /// duration acts like an "invariant point" for region's keep alive lease. + epoch: Instant, +} + +impl RegionAliveKeeper { + pub fn new(region_server: RegionServer, heartbeat_interval_millis: u64) -> Self { + Self { + region_server, + tasks: Arc::new(Mutex::new(HashMap::new())), + heartbeat_interval_millis, + started: AtomicBool::new(false), + epoch: Instant::now(), + } + } + + async fn find_handle(&self, region_id: RegionId) -> Option> { + self.tasks.lock().await.get(®ion_id).cloned() + } + + pub async fn register_region(&self, region_id: RegionId) { + if self.find_handle(region_id).await.is_some() { + return; + } + + let tasks = Arc::downgrade(&self.tasks); + let on_task_finished = async move { + if let Some(x) = tasks.upgrade() { + let _ = x.lock().await.remove(®ion_id); + } // Else the countdown task handles map could be dropped because the keeper is dropped. + }; + let handle = Arc::new(CountdownTaskHandle::new( + self.region_server.clone(), + region_id, + move |result: Option| { + info!( + "Deregister region: {region_id} after countdown task finished, result: {result:?}", + ); + on_task_finished + }, + )); + + let mut handles = self.tasks.lock().await; + let _ = handles.insert(region_id, handle.clone()); + + if self.started.load(Ordering::Relaxed) { + handle.start(self.heartbeat_interval_millis).await; + + info!("Region alive countdown for region {region_id} is started!",); + } else { + info!( + "Region alive countdown for region {region_id} is registered but not started yet!", + ); + } + } + + pub async fn deregister_region(&self, region_id: RegionId) { + if self.tasks.lock().await.remove(®ion_id).is_some() { + info!("Deregister alive countdown for region {region_id}") + } + } + + async fn keep_lived(&self, designated_regions: Vec, deadline: Instant) { + for region_id in designated_regions { + if let Some(handle) = self.find_handle(region_id).await { + handle.reset_deadline(deadline).await; + } + // Else the region alive keeper might be triggered by lagging messages, we can safely ignore it. + } + } + + #[cfg(test)] + async fn deadline(&self, region_id: RegionId) -> Option { + let mut deadline = None; + if let Some(handle) = self.find_handle(region_id).await { + let (s, r) = oneshot::channel(); + if handle.tx.send(CountdownCommand::Deadline(s)).await.is_ok() { + deadline = r.await.ok() + } + } + deadline + } + + pub async fn start(&self) { + let tasks = self.tasks.lock().await; + for task in tasks.values() { + task.start(self.heartbeat_interval_millis).await; + } + self.started.store(true, Ordering::Relaxed); + + info!( + "RegionAliveKeeper is started with region {:?}", + tasks.keys().map(|x| x.to_string()).collect::>(), + ); + } + + pub fn epoch(&self) -> Instant { + self.epoch + } +} + +#[async_trait] +impl HeartbeatResponseHandler for RegionAliveKeeper { + fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { + ctx.response.region_lease.is_some() + } + + async fn handle( + &self, + ctx: &mut HeartbeatResponseHandlerContext, + ) -> common_meta::error::Result { + let region_lease = ctx + .response + .region_lease + .as_ref() + .context(InvalidProtoMsgSnafu { + err_msg: "'region_lease' is missing in heartbeat response", + })?; + let start_instant = self.epoch + Duration::from_millis(region_lease.duration_since_epoch); + let deadline = start_instant + Duration::from_secs(region_lease.lease_seconds); + let region_ids = region_lease + .region_ids + .iter() + .map(|id| RegionId::from_u64(*id)) + .collect(); + self.keep_lived(region_ids, deadline).await; + Ok(HandleControl::Continue) + } +} + +#[derive(Debug)] +enum CountdownCommand { + /// Start this countdown task. The first deadline will be set to + /// 4 * `heartbeat_interval_millis` + Start(u64), + /// Reset countdown deadline to the given instance. + Reset(Instant), + /// Returns the current deadline of the countdown task. + #[cfg(test)] + Deadline(oneshot::Sender), +} + +struct CountdownTaskHandle { + tx: mpsc::Sender, + handler: JoinHandle<()>, + region_id: RegionId, +} + +impl CountdownTaskHandle { + /// Creates a new [CountdownTaskHandle] and starts the countdown task. + /// # Params + /// - `on_task_finished`: a callback to be invoked when the task is finished. Note that it will not + /// be invoked if the task is cancelled (by dropping the handle). This is because we want something + /// meaningful to be done when the task is finished, e.g. deregister the handle from the map. + /// While dropping the handle does not necessarily mean the task is finished. + fn new( + region_server: RegionServer, + region_id: RegionId, + on_task_finished: impl FnOnce(Option) -> Fut + Send + 'static, + ) -> Self + where + Fut: Future + Send, + { + let (tx, rx) = mpsc::channel(1024); + + let mut countdown_task = CountdownTask { + region_server, + region_id, + rx, + }; + let handler = common_runtime::spawn_bg(async move { + let result = countdown_task.run().await; + on_task_finished(result).await; + }); + + Self { + tx, + handler, + region_id, + } + } + + async fn start(&self, heartbeat_interval_millis: u64) { + if let Err(e) = self + .tx + .send(CountdownCommand::Start(heartbeat_interval_millis)) + .await + { + warn!( + "Failed to start region alive keeper countdown: {e}. \ + Maybe the task is stopped due to region been closed." + ); + } + } + + #[cfg(test)] + async fn deadline(&self) -> Option { + let (tx, rx) = oneshot::channel(); + if self.tx.send(CountdownCommand::Deadline(tx)).await.is_ok() { + return rx.await.ok(); + } + None + } + + async fn reset_deadline(&self, deadline: Instant) { + if let Err(e) = self.tx.send(CountdownCommand::Reset(deadline)).await { + warn!( + "Failed to reset region alive keeper deadline: {e}. \ + Maybe the task is stopped due to region been closed." + ); + } + } +} + +impl Drop for CountdownTaskHandle { + fn drop(&mut self) { + debug!( + "Aborting region alive countdown task for region {}", + self.region_id + ); + self.handler.abort(); + } +} + +struct CountdownTask { + region_server: RegionServer, + region_id: RegionId, + rx: mpsc::Receiver, +} + +impl CountdownTask { + // returns true if region closed successfully + async fn run(&mut self) -> Option { + // 30 years. See `Instant::far_future`. + let far_future = Instant::now() + Duration::from_secs(86400 * 365 * 30); + + // Make sure the alive countdown is not gonna happen before heartbeat task is started (the + // "start countdown" command will be sent from heartbeat task). + let countdown = tokio::time::sleep_until(far_future); + tokio::pin!(countdown); + + let region_id = self.region_id; + loop { + tokio::select! { + command = self.rx.recv() => { + match command { + Some(CountdownCommand::Start(heartbeat_interval_millis)) => { + // Set first deadline in 4 heartbeats (roughly after 20 seconds from now if heartbeat + // interval is set to default 5 seconds), to make Datanode and Metasrv more tolerable to + // network or other jitters during startup. + let first_deadline = Instant::now() + Duration::from_millis(heartbeat_interval_millis) * 4; + countdown.set(tokio::time::sleep_until(first_deadline)); + }, + Some(CountdownCommand::Reset(deadline)) => { + if countdown.deadline() < deadline { + trace!( + "Reset deadline of region {region_id} to approximately {} seconds later", + (deadline - Instant::now()).as_secs_f32(), + ); + countdown.set(tokio::time::sleep_until(deadline)); + } + // Else the countdown could be either: + // - not started yet; + // - during startup protection; + // - received a lagging heartbeat message. + // All can be safely ignored. + }, + None => { + info!( + "The handle of countdown task for region {region_id}\ + is dropped, RegionAliveKeeper out." + ); + break; + }, + #[cfg(test)] + Some(CountdownCommand::Deadline(tx)) => { + let _ = tx.send(countdown.deadline()); + } + } + } + () = &mut countdown => { + let result = self.close_region().await; + info!( + "Region {region_id} is closed, result: {result:?}. \ + RegionAliveKeeper out.", + ); + return Some(result); + } + } + } + None + } + + /// Returns if the region is closed successfully. + async fn close_region(&self) -> bool { + for retry in 0..MAX_CLOSE_RETRY_TIMES { + let request = RegionRequest::Close(RegionCloseRequest {}); + match self + .region_server + .handle_request(self.region_id, request) + .await + { + Ok(_) => return true, + Err(e) if e.status_code() == StatusCode::RegionNotFound => return true, + // If region is failed to close, immediately retry. Maybe we should panic instead? + Err(e) => error!(e; + "Retry {retry}, failed to close region {}. \ + For the integrity of data, retry closing and retry without wait.", + self.region_id, + ), + } + } + false + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::tests::mock_region_server; + + #[tokio::test(flavor = "multi_thread")] + async fn region_alive_keeper() { + let region_server = mock_region_server(); + let alive_keeper = RegionAliveKeeper::new(region_server, 300); + let region_id = RegionId::new(1, 2); + + // register a region before starting + alive_keeper.register_region(region_id).await; + assert!(alive_keeper.find_handle(region_id).await.is_some()); + + alive_keeper.start().await; + + // started alive keeper should assign deadline to this region + let deadline = alive_keeper.deadline(region_id).await.unwrap(); + assert!(deadline >= Instant::now()); + + // extend lease then sleep + alive_keeper + .keep_lived(vec![region_id], Instant::now() + Duration::from_millis(500)) + .await; + tokio::time::sleep(Duration::from_millis(500)).await; + assert!(alive_keeper.find_handle(region_id).await.is_some()); + let deadline = alive_keeper.deadline(region_id).await.unwrap(); + assert!(deadline >= Instant::now()); + + // sleep to wait lease expired + tokio::time::sleep(Duration::from_millis(1000)).await; + assert!(alive_keeper.find_handle(region_id).await.is_none()); + } + + #[tokio::test(flavor = "multi_thread")] + async fn countdown_task() { + let region_server = mock_region_server(); + + let (tx, rx) = oneshot::channel(); + + let countdown_handle = CountdownTaskHandle::new( + region_server, + RegionId::new(9999, 2), + |result: Option| async move { + tx.send((Instant::now(), result)).unwrap(); + }, + ); + + // if countdown task is not started, its deadline is set to far future + assert!( + countdown_handle.deadline().await.unwrap() + > Instant::now() + Duration::from_secs(86400 * 365 * 29) + ); + + // the first deadline should be set to 4 * heartbeat_interval_millis + // we assert it to be greater than 3 * heartbeat_interval_millis to avoid flaky test + let heartbeat_interval_millis = 100; + countdown_handle.start(heartbeat_interval_millis).await; + assert!( + countdown_handle.deadline().await.unwrap() + > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3) + ); + + // reset deadline + // a nearer deadline will be ignored + countdown_handle + .reset_deadline(Instant::now() + Duration::from_millis(heartbeat_interval_millis)) + .await; + assert!( + countdown_handle.deadline().await.unwrap() + > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 3) + ); + + // only a farther deadline will be accepted + countdown_handle + .reset_deadline(Instant::now() + Duration::from_millis(heartbeat_interval_millis * 5)) + .await; + assert!( + countdown_handle.deadline().await.unwrap() + > Instant::now() + Duration::from_millis(heartbeat_interval_millis * 4) + ); + + // wait for countdown task to finish + let before_await = Instant::now(); + let (finish_instant, result) = rx.await.unwrap(); + // the mock region server cannot close the region + assert_eq!(result, Some(false)); + // this task should be finished after 5 * heartbeat_interval_millis + // we assert 4 times here + assert!( + finish_instant > before_await + Duration::from_millis(heartbeat_interval_millis * 4) + ); + } +} diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index c10e02aab9..3651419a88 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use std::time::Duration; use api::v1::meta::{HeartbeatRequest, Peer, RegionStat}; -use catalog::remote::region_alive_keeper::RegionAliveKeepers; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::{ HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, @@ -27,11 +26,11 @@ use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::{debug, error, info, trace, warn}; use meta_client::client::{HeartbeatSender, MetaClient}; use snafu::{OptionExt, ResultExt}; -use table::engine::manager::MemoryTableEngineManager; use tokio::sync::mpsc; use tokio::time::Instant; use self::handler::RegionHeartbeatResponseHandler; +use crate::alive_keeper::RegionAliveKeeper; use crate::datanode::DatanodeOptions; use crate::error::{ self, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, Result, @@ -51,7 +50,7 @@ pub struct HeartbeatTask { region_server: RegionServer, interval: u64, resp_handler_executor: HeartbeatResponseHandlerExecutorRef, - region_alive_keepers: Arc, + region_alive_keeper: Arc, } impl Drop for HeartbeatTask { @@ -77,14 +76,14 @@ impl HeartbeatTask { let region_server = region_server.unwrap(); - let region_alive_keepers = Arc::new(RegionAliveKeepers::new( - Arc::new(MemoryTableEngineManager::new_empty()), + let region_alive_keeper = Arc::new(RegionAliveKeeper::new( + region_server.clone(), opts.heartbeat.interval_millis, )); let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![ Arc::new(ParseMailboxMessageHandler), Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())), - region_alive_keepers.clone(), + region_alive_keeper.clone(), ])); Ok(Self { @@ -98,7 +97,7 @@ impl HeartbeatTask { region_server, interval: opts.heartbeat.interval_millis, resp_handler_executor, - region_alive_keepers, + region_alive_keeper, }) } @@ -163,7 +162,7 @@ impl HeartbeatTask { let addr = resolve_addr(&self.server_addr, &self.server_hostname); info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."); - self.region_alive_keepers.start().await; + self.region_alive_keeper.start().await; let meta_client = self.meta_client.clone(); let region_server_clone = self.region_server.clone(); @@ -181,7 +180,7 @@ impl HeartbeatTask { ) .await?; - let epoch = self.region_alive_keepers.epoch(); + let epoch = self.region_alive_keeper.epoch(); common_runtime::spawn_bg(async move { let sleep = tokio::time::sleep(Duration::from_millis(0)); tokio::pin!(sleep); diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 8743d6d26a..320b0c0ccc 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -30,9 +30,6 @@ use store_api::storage::RegionId; use crate::error::Result; use crate::region_server::RegionServer; -pub mod close_region; -pub mod open_region; - /// Handler for [Instruction::OpenRegion] and [Instruction::CloseRegion]. #[derive(Clone)] pub struct RegionHeartbeatResponseHandler { diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs deleted file mode 100644 index b77d11aaff..0000000000 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ /dev/null @@ -1,235 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use catalog::remote::region_alive_keeper::RegionAliveKeepers; -use catalog::{CatalogManagerRef, DeregisterTableRequest}; -use common_catalog::format_full_table_name; -use common_meta::error::Result as MetaResult; -use common_meta::heartbeat::handler::{ - HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, -}; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; -use common_meta::RegionIdent; -use common_telemetry::{error, info, warn}; -use snafu::ResultExt; -use store_api::storage::RegionNumber; -use table::engine::manager::TableEngineManagerRef; -use table::engine::{CloseTableResult, EngineContext, TableReference}; -use table::requests::CloseTableRequest; - -use crate::error::{self, Result}; - -#[derive(Clone)] -pub struct CloseRegionHandler { - catalog_manager: CatalogManagerRef, - table_engine_manager: TableEngineManagerRef, - region_alive_keepers: Arc, -} - -#[async_trait] -impl HeartbeatResponseHandler for CloseRegionHandler { - fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { - matches!( - ctx.incoming_message.as_ref(), - Some((_, Instruction::CloseRegion { .. })) - ) - } - - async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { - let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() - else { - unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'"); - }; - - let mailbox = ctx.mailbox.clone(); - let self_ref = Arc::new(self.clone()); - let _handle = common_runtime::spawn_bg(async move { - let result = self_ref.close_region_inner(region_ident).await; - - if let Err(e) = mailbox - .send((meta, CloseRegionHandler::map_result(result))) - .await - { - error!(e; "Failed to send reply to mailbox"); - } - }); - - Ok(HandleControl::Done) - } -} - -impl CloseRegionHandler { - pub fn new( - catalog_manager: CatalogManagerRef, - table_engine_manager: TableEngineManagerRef, - region_alive_keepers: Arc, - ) -> Self { - Self { - catalog_manager, - table_engine_manager, - region_alive_keepers, - } - } - - fn map_result(result: Result) -> InstructionReply { - result.map_or_else( - |error| { - InstructionReply::CloseRegion(SimpleReply { - result: false, - error: Some(error.to_string()), - }) - }, - |result| { - InstructionReply::CloseRegion(SimpleReply { - result, - error: None, - }) - }, - ) - } - - /// Returns true if a table or target regions have been closed. - async fn regions_closed( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - region_numbers: &[RegionNumber], - ) -> Result { - if let Some(table) = self - .catalog_manager - .table(catalog_name, schema_name, table_name) - .await - .context(error::AccessCatalogSnafu)? - { - for r in region_numbers { - let region_exist = - table - .contains_region(*r) - .with_context(|_| error::CheckRegionSnafu { - table_name: format_full_table_name( - catalog_name, - schema_name, - table_name, - ), - region_number: *r, - })?; - if region_exist { - return Ok(false); - } - } - } - // Returns true if table not exist - Ok(true) - } - - async fn close_region_inner(&self, region_ident: RegionIdent) -> Result { - let table_ident = ®ion_ident.table_ident; - let engine_name = &table_ident.engine; - let engine = self - .table_engine_manager - .engine(engine_name) - .context(error::TableEngineNotFoundSnafu { engine_name })?; - let ctx = EngineContext::default(); - - let table_ref = &TableReference::full( - &table_ident.catalog, - &table_ident.schema, - &table_ident.table, - ); - let region_numbers = vec![region_ident.region_number]; - if self - .regions_closed( - table_ref.catalog, - table_ref.schema, - table_ref.table, - ®ion_numbers, - ) - .await? - { - return Ok(true); - } - - if engine - .get_table(&ctx, region_ident.table_ident.table_id) - .with_context(|_| error::GetTableSnafu { - table_name: table_ref.to_string(), - })? - .is_some() - { - return match engine - .close_table( - &ctx, - CloseTableRequest { - catalog_name: table_ref.catalog.to_string(), - schema_name: table_ref.schema.to_string(), - table_name: table_ref.table.to_string(), - region_numbers: region_numbers.clone(), - table_id: region_ident.table_ident.table_id, - flush: true, - }, - ) - .await - .with_context(|_| error::CloseTableSnafu { - table_name: table_ref.to_string(), - region_numbers: region_numbers.clone(), - })? { - CloseTableResult::NotFound | CloseTableResult::Released(_) => { - // Deregister table if The table released. - self.deregister_table(table_ref).await?; - - let _ = self - .region_alive_keepers - .deregister_table(table_ident) - .await; - - Ok(true) - } - CloseTableResult::PartialClosed(regions) => { - // Requires caller to update the region_numbers - info!( - "Close partial regions: {:?} in table: {}", - regions, table_ref - ); - - self.region_alive_keepers - .deregister_region(®ion_ident) - .await; - - Ok(true) - } - }; - } - - warn!("Trying to close a non-existing table: {}", table_ref); - // Table doesn't exist - Ok(true) - } - - async fn deregister_table(&self, table_ref: &TableReference<'_>) -> Result<()> { - self.catalog_manager - .deregister_table(DeregisterTableRequest { - catalog: table_ref.catalog.to_string(), - schema: table_ref.schema.to_string(), - table_name: table_ref.table.to_string(), - }) - .await - .with_context(|_| error::DeregisterTableSnafu { - table_name: table_ref.to_string(), - }) - } -} diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs deleted file mode 100644 index 3a78323ace..0000000000 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ /dev/null @@ -1,250 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use async_trait::async_trait; -use catalog::error::{Error as CatalogError, Result as CatalogResult}; -use catalog::remote::region_alive_keeper::RegionAliveKeepers; -use catalog::{CatalogManagerRef, RegisterSchemaRequest, RegisterTableRequest}; -use common_catalog::format_full_table_name; -use common_meta::error::Result as MetaResult; -use common_meta::heartbeat::handler::{ - HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, -}; -use common_meta::instruction::{Instruction, InstructionReply, SimpleReply}; -use common_telemetry::{error, warn}; -use snafu::ResultExt; -use store_api::storage::RegionNumber; -use table::engine::manager::TableEngineManagerRef; -use table::engine::EngineContext; -use table::requests::OpenTableRequest; -use table::Table; - -use crate::error::{self, Result}; - -#[derive(Clone)] -pub struct OpenRegionHandler { - catalog_manager: CatalogManagerRef, - table_engine_manager: TableEngineManagerRef, - region_alive_keepers: Arc, -} - -#[async_trait] -impl HeartbeatResponseHandler for OpenRegionHandler { - fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { - matches!( - ctx.incoming_message, - Some((_, Instruction::OpenRegion { .. })) - ) - } - - async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { - let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() - else { - unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'"); - }; - - let mailbox = ctx.mailbox.clone(); - let self_ref = Arc::new(self.clone()); - - let region_alive_keepers = self.region_alive_keepers.clone(); - let _handle = common_runtime::spawn_bg(async move { - let table_ident = ®ion_ident.table_ident; - let request = OpenTableRequest { - catalog_name: table_ident.catalog.clone(), - schema_name: table_ident.schema.clone(), - table_name: table_ident.table.clone(), - table_id: table_ident.table_id, - region_numbers: vec![region_ident.region_number], - }; - let result = self_ref - .open_region_inner(table_ident.engine.clone(), request) - .await; - - if matches!(result, Ok(true)) { - region_alive_keepers.register_region(®ion_ident).await; - } - - if let Err(e) = mailbox - .send((meta, OpenRegionHandler::map_result(result))) - .await - { - error!(e; "Failed to send reply to mailbox"); - } - }); - Ok(HandleControl::Done) - } -} - -impl OpenRegionHandler { - pub fn new( - catalog_manager: CatalogManagerRef, - table_engine_manager: TableEngineManagerRef, - region_alive_keepers: Arc, - ) -> Self { - Self { - catalog_manager, - table_engine_manager, - region_alive_keepers, - } - } - - fn map_result(result: Result) -> InstructionReply { - result.map_or_else( - |error| { - InstructionReply::OpenRegion(SimpleReply { - result: false, - error: Some(error.to_string()), - }) - }, - |result| { - InstructionReply::OpenRegion(SimpleReply { - result, - error: None, - }) - }, - ) - } - - /// Returns true if a table or target regions have been opened. - async fn regions_opened( - &self, - catalog_name: &str, - schema_name: &str, - table_name: &str, - region_numbers: &[RegionNumber], - ) -> Result { - if let Some(table) = self - .catalog_manager - .table(catalog_name, schema_name, table_name) - .await - .context(error::AccessCatalogSnafu)? - { - for r in region_numbers { - let region_exist = - table - .contains_region(*r) - .with_context(|_| error::CheckRegionSnafu { - table_name: format_full_table_name( - catalog_name, - schema_name, - table_name, - ), - region_number: *r, - })?; - if !region_exist { - warn!( - "Failed to check table: {}, region: {} does not exist", - format_full_table_name(catalog_name, schema_name, table_name,), - r - ); - return Ok(false); - } - } - return Ok(true); - } - Ok(false) - } - - async fn register_table( - &self, - request: &OpenTableRequest, - table: Arc, - ) -> CatalogResult { - if !self - .catalog_manager - .catalog_exist(&request.catalog_name) - .await? - { - self.catalog_manager - .clone() - .register_catalog(request.catalog_name.to_string()) - .await?; - } - - if !self - .catalog_manager - .schema_exist(&request.catalog_name, &request.schema_name) - .await? - { - self.catalog_manager - .register_schema(RegisterSchemaRequest { - catalog: request.catalog_name.to_string(), - schema: request.schema_name.to_string(), - }) - .await?; - } - - let request = RegisterTableRequest { - catalog: request.catalog_name.to_string(), - schema: request.schema_name.to_string(), - table_name: request.table_name.to_string(), - table_id: request.table_id, - table, - }; - self.catalog_manager.register_table(request).await - } - - async fn open_region_inner(&self, engine: String, request: OpenTableRequest) -> Result { - let OpenTableRequest { - catalog_name, - schema_name, - table_name, - region_numbers, - .. - } = &request; - let engine = - self.table_engine_manager - .engine(&engine) - .context(error::TableEngineNotFoundSnafu { - engine_name: &engine, - })?; - let ctx = EngineContext::default(); - - if self - .regions_opened(catalog_name, schema_name, table_name, region_numbers) - .await? - { - return Ok(true); - } - - if let Some(table) = engine - .open_table(&ctx, request.clone()) - .await - .with_context(|_| error::OpenTableSnafu { - table_name: format_full_table_name(catalog_name, schema_name, table_name), - })? - { - let result = self.register_table(&request, table).await; - - match result { - Ok(_) | Err(CatalogError::TableExists { .. }) => Ok(true), - e => e.with_context(|_| error::RegisterTableSnafu { - table_name: format_full_table_name(catalog_name, schema_name, table_name), - }), - } - } else { - // Case 1: - // TODO(weny): Fix/Cleanup the broken table manifest - // The manifest writing operation should be atomic. - // Therefore, we won't meet this case, in theory. - - // Case 2: The target region was not found in table meta - - // Case 3: The table not exist - Ok(false) - } - } -} diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 0693f7ca48..9ce77c91a4 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -18,8 +18,7 @@ use std::time::Duration; use std::{fs, path}; use api::v1::meta::Role; -use catalog::remote::region_alive_keeper::RegionAliveKeepers; -use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; +use catalog::local::MemoryCatalogManager; use catalog::CatalogManagerRef; use common_base::Plugins; use common_catalog::consts::DEFAULT_CATALOG_NAME; @@ -29,7 +28,6 @@ use common_greptimedb_telemetry::GreptimeDBTelemetryTask; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use common_meta::heartbeat::handler::HandlerGroupExecutor; -use common_meta::key::TableMetadataManager; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::store::state_store::ObjectStateStore; use common_procedure::ProcedureManagerRef; @@ -51,7 +49,7 @@ use storage::scheduler::{LocalScheduler, SchedulerConfig}; use storage::EngineImpl; use store_api::logstore::LogStore; use store_api::path_utils::{CLUSTER_DIR, WAL_DIR}; -use table::engine::manager::{MemoryTableEngineManager, TableEngineManagerRef}; +use table::engine::manager::MemoryTableEngineManager; use table::engine::{TableEngine, TableEngineProcedureRef}; use table::requests::FlushTableRequest; use table::table::TableIdProviderRef; @@ -63,8 +61,6 @@ use crate::error::{ ShutdownInstanceSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; use crate::greptimedb_telemetry::get_greptimedb_telemetry_task; -use crate::heartbeat::handler::close_region::CloseRegionHandler; -use crate::heartbeat::handler::open_region::OpenRegionHandler; use crate::heartbeat::HeartbeatTask; use crate::row_inserter::RowInserter; use crate::sql::{SqlHandler, SqlRequest}; @@ -115,9 +111,6 @@ impl Instance { fn build_heartbeat_task( opts: &DatanodeOptions, meta_client: Option>, - catalog_manager: CatalogManagerRef, - engine_manager: TableEngineManagerRef, - region_alive_keepers: Option>, ) -> Result> { Ok(match opts.mode { Mode::Standalone => None, @@ -126,24 +119,8 @@ impl Instance { let _meta_client = meta_client.context(IncorrectInternalStateSnafu { state: "meta client is not provided when building heartbeat task", })?; - let region_alive_keepers = - region_alive_keepers.context(IncorrectInternalStateSnafu { - state: "region_alive_keepers is not provided when building heartbeat task", - })?; - let _handlers_executor = HandlerGroupExecutor::new(vec![ - Arc::new(ParseMailboxMessageHandler), - Arc::new(OpenRegionHandler::new( - catalog_manager.clone(), - engine_manager.clone(), - region_alive_keepers.clone(), - )), - Arc::new(CloseRegionHandler::new( - catalog_manager.clone(), - engine_manager, - region_alive_keepers.clone(), - )), - region_alive_keepers.clone(), - ]); + let _handlers_executor = + HandlerGroupExecutor::new(vec![Arc::new(ParseMailboxMessageHandler)]); todo!("remove this method") } @@ -199,7 +176,7 @@ impl Instance { ); // create remote catalog manager - let (catalog_manager, table_id_provider, region_alive_keepers) = match opts.mode { + let (catalog_manager, table_id_provider) = match opts.mode { Mode::Standalone => { let catalog = Arc::new( catalog::local::LocalCatalogManager::try_new(engine_manager.clone()) @@ -210,35 +187,13 @@ impl Instance { ( catalog.clone() as CatalogManagerRef, Some(catalog as TableIdProviderRef), - None, ) } - Mode::Distributed => { - let meta_client = meta_client.clone().context(IncorrectInternalStateSnafu { - state: "meta client is not provided when creating distributed Datanode", - })?; - - let kv_backend = Arc::new(CachedMetaKvBackend::new(meta_client)); - - let region_alive_keepers = Arc::new(RegionAliveKeepers::new( - engine_manager.clone(), - opts.heartbeat.interval_millis, - )); - - let catalog_manager = Arc::new(RemoteCatalogManager::new( - engine_manager.clone(), - opts.node_id.context(MissingNodeIdSnafu)?, - region_alive_keepers.clone(), - Arc::new(TableMetadataManager::new(kv_backend)), - )); - - ( - catalog_manager as CatalogManagerRef, - None, - Some(region_alive_keepers), - ) - } + Mode::Distributed => ( + MemoryCatalogManager::with_default_setup() as CatalogManagerRef, + None, + ), }; let factory = @@ -285,13 +240,7 @@ impl Instance { greptimedb_telemetry_task, }); - let heartbeat_task = Instance::build_heartbeat_task( - opts, - meta_client, - catalog_manager, - engine_manager, - region_alive_keepers, - )?; + let heartbeat_task = Instance::build_heartbeat_task(opts, meta_client)?; Ok((instance, heartbeat_task)) } diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index f5b313188b..9e04bf59bf 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -15,6 +15,7 @@ #![feature(assert_matches)] #![feature(trait_upcasting)] +pub mod alive_keeper; pub mod datanode; pub mod error; mod greptimedb_telemetry; @@ -29,4 +30,5 @@ pub mod server; pub mod sql; mod store; #[cfg(test)] +#[allow(dead_code)] mod tests; diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 00a256b5e4..5969afb320 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -12,36 +12,40 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::assert_matches::assert_matches; +use std::any::Any; use std::sync::Arc; -use std::time::Duration; use api::v1::greptime_request::Request as GrpcRequest; use api::v1::meta::HeartbeatResponse; use api::v1::query_request::Query; use api::v1::QueryRequest; -use catalog::local::MemoryCatalogManager; -use catalog::remote::region_alive_keeper::RegionAliveKeepers; -use catalog::CatalogManagerRef; +use async_trait::async_trait; +use common_function::scalars::aggregate::AggregateFunctionMetaRef; +use common_function::scalars::FunctionRef; use common_meta::heartbeat::handler::{ - HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, + HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, }; use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; use common_meta::ident::TableIdent; -use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply}; +use common_meta::instruction::{Instruction, InstructionReply, RegionIdent}; +use common_query::prelude::ScalarUdf; use common_query::Output; +use common_runtime::Runtime; use datatypes::prelude::ConcreteDataType; +use query::dataframe::DataFrame; +use query::plan::LogicalPlan; +use query::planner::LogicalPlanner; +use query::query_engine::DescribeResult; +use query::QueryEngine; use servers::query_handler::grpc::GrpcQueryHandler; -use session::context::QueryContext; +use session::context::{QueryContext, QueryContextRef}; use table::engine::manager::TableEngineManagerRef; use table::TableRef; use test_util::MockInstance; use tokio::sync::mpsc::{self, Receiver}; -use tokio::time::Instant; -use crate::heartbeat::handler::close_region::CloseRegionHandler; -use crate::heartbeat::handler::open_region::OpenRegionHandler; use crate::instance::Instance; +use crate::region_server::RegionServer; pub(crate) mod test_util; @@ -50,203 +54,12 @@ struct HandlerTestGuard { mailbox: Arc, rx: Receiver<(MessageMeta, InstructionReply)>, engine_manager_ref: TableEngineManagerRef, - catalog_manager_ref: CatalogManagerRef, -} - -#[tokio::test] -async fn test_close_region_handler() { - let HandlerTestGuard { - instance, - mailbox, - mut rx, - engine_manager_ref, - catalog_manager_ref, - .. - } = prepare_handler_test("test_close_region_handler").await; - - let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( - CloseRegionHandler::new( - catalog_manager_ref.clone(), - engine_manager_ref.clone(), - Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone(), 5000)), - ), - )])); - - let _ = prepare_table(instance.inner()).await; - - // Closes demo table - handle_instruction( - executor.clone(), - mailbox.clone(), - close_region_instruction(), - ) - .await; - let (_, reply) = rx.recv().await.unwrap(); - assert_matches!( - reply, - InstructionReply::CloseRegion(SimpleReply { result: true, .. }) - ); - - assert_test_table_not_found(instance.inner()).await; - - // Closes demo table again - handle_instruction( - executor.clone(), - mailbox.clone(), - close_region_instruction(), - ) - .await; - let (_, reply) = rx.recv().await.unwrap(); - assert_matches!( - reply, - InstructionReply::CloseRegion(SimpleReply { result: true, .. }) - ); - - // Closes non-exist table - handle_instruction( - executor.clone(), - mailbox.clone(), - Instruction::CloseRegion(RegionIdent { - table_ident: TableIdent { - catalog: "greptime".to_string(), - schema: "public".to_string(), - table: "non-exist".to_string(), - table_id: 1025, - engine: "mito".to_string(), - }, - region_number: 0, - cluster_id: 1, - datanode_id: 2, - }), - ) - .await; - let (_, reply) = rx.recv().await.unwrap(); - assert_matches!( - reply, - InstructionReply::CloseRegion(SimpleReply { result: true, .. }) - ); -} - -#[tokio::test] -async fn test_open_region_handler() { - let HandlerTestGuard { - instance, - mailbox, - mut rx, - engine_manager_ref, - catalog_manager_ref, - .. - } = prepare_handler_test("test_open_region_handler").await; - - let region_alive_keepers = Arc::new(RegionAliveKeepers::new(engine_manager_ref.clone(), 5000)); - region_alive_keepers.start().await; - - let executor = Arc::new(HandlerGroupExecutor::new(vec![ - Arc::new(OpenRegionHandler::new( - catalog_manager_ref.clone(), - engine_manager_ref.clone(), - region_alive_keepers.clone(), - )), - Arc::new(CloseRegionHandler::new( - catalog_manager_ref.clone(), - engine_manager_ref.clone(), - region_alive_keepers.clone(), - )), - ])); - - let instruction = open_region_instruction(); - let Instruction::OpenRegion(region_ident) = instruction.clone() else { - unreachable!() - }; - let table_ident = ®ion_ident.table_ident; - - let table = prepare_table(instance.inner()).await; - - let dummy_catalog_manager = MemoryCatalogManager::with_default_setup(); - region_alive_keepers - .register_table(table_ident.clone(), table, dummy_catalog_manager) - .await - .unwrap(); - - // Opens a opened table - handle_instruction(executor.clone(), mailbox.clone(), instruction.clone()).await; - let (_, reply) = rx.recv().await.unwrap(); - assert_matches!( - reply, - InstructionReply::OpenRegion(SimpleReply { result: true, .. }) - ); - - let keeper = region_alive_keepers - .find_keeper(table_ident.table_id) - .await - .unwrap(); - let deadline = keeper.deadline(0).await.unwrap(); - assert!(deadline <= Instant::now() + Duration::from_secs(20)); - - // Opens a non-exist table - let non_exist_table_ident = TableIdent { - catalog: "foo".to_string(), - schema: "non-exist".to_string(), - table: "non-exist".to_string(), - table_id: 2024, - engine: "mito".to_string(), - }; - handle_instruction( - executor.clone(), - mailbox.clone(), - Instruction::OpenRegion(RegionIdent { - table_ident: non_exist_table_ident.clone(), - region_number: 0, - cluster_id: 1, - datanode_id: 2, - }), - ) - .await; - let (_, reply) = rx.recv().await.unwrap(); - assert_matches!( - reply, - InstructionReply::OpenRegion(SimpleReply { result: false, .. }) - ); - - assert!(region_alive_keepers - .find_keeper(non_exist_table_ident.table_id) - .await - .is_none()); - - // Closes demo table - handle_instruction( - executor.clone(), - mailbox.clone(), - close_region_instruction(), - ) - .await; - let (_, reply) = rx.recv().await.unwrap(); - assert_matches!( - reply, - InstructionReply::CloseRegion(SimpleReply { result: true, .. }) - ); - assert_test_table_not_found(instance.inner()).await; - - assert!(region_alive_keepers - .find_keeper(table_ident.table_id) - .await - .is_none()); - - // Opens demo table - handle_instruction(executor.clone(), mailbox.clone(), instruction).await; - let (_, reply) = rx.recv().await.unwrap(); - assert_matches!( - reply, - InstructionReply::OpenRegion(SimpleReply { result: true, .. }) - ); - assert_test_table_found(instance.inner()).await; } async fn prepare_handler_test(name: &str) -> HandlerTestGuard { let mock_instance = MockInstance::new(name).await; let instance = mock_instance.inner(); let engine_manager = instance.sql_handler().table_engine_manager().clone(); - let catalog_manager = instance.sql_handler().catalog_manager().clone(); let (tx, rx) = mpsc::channel(8); let mailbox = Arc::new(HeartbeatMailbox::new(tx)); @@ -255,7 +68,6 @@ async fn prepare_handler_test(name: &str) -> HandlerTestGuard { mailbox, rx, engine_manager_ref: engine_manager, - catalog_manager_ref: catalog_manager, } } @@ -346,3 +158,50 @@ async fn assert_test_table_found(instance: &Instance) { assert!(matches!(output, Output::AffectedRows(2))); } + +pub struct MockQueryEngine; + +#[async_trait] +impl QueryEngine for MockQueryEngine { + fn as_any(&self) -> &dyn Any { + self as _ + } + + fn planner(&self) -> Arc { + unimplemented!() + } + + fn name(&self) -> &str { + "MockQueryEngine" + } + + async fn describe(&self, _plan: LogicalPlan) -> query::error::Result { + unimplemented!() + } + + async fn execute( + &self, + _plan: LogicalPlan, + _query_ctx: QueryContextRef, + ) -> query::error::Result { + unimplemented!() + } + + fn register_udf(&self, _udf: ScalarUdf) {} + + fn register_aggregate_function(&self, _func: AggregateFunctionMetaRef) {} + + fn register_function(&self, _func: FunctionRef) {} + + fn read_table(&self, _table: TableRef) -> query::error::Result { + unimplemented!() + } +} + +/// Create a region server without any engine +pub fn mock_region_server() -> RegionServer { + RegionServer::new( + Arc::new(MockQueryEngine), + Arc::new(Runtime::builder().build().unwrap()), + ) +}