diff --git a/src/common/meta/src/instruction.rs b/src/common/meta/src/instruction.rs index 8ef3072303..efb1992234 100644 --- a/src/common/meta/src/instruction.rs +++ b/src/common/meta/src/instruction.rs @@ -47,6 +47,33 @@ impl Display for RegionIdent { } } +#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct TableIdent { + pub cluster_id: ClusterId, + pub datanode_id: DatanodeId, + pub catalog: String, + pub schema: String, + pub table: String, + pub table_id: u32, + pub engine: String, +} + +impl Display for TableIdent { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "TableIdent(datanode_id='{}.{}', table_id='{}', table_name='{}.{}.{}', table_engine='{}')", + self.cluster_id, + self.datanode_id, + self.table_id, + self.catalog, + self.schema, + self.table, + self.engine, + ) + } +} + #[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)] pub struct SimpleReply { pub result: bool, @@ -64,6 +91,7 @@ impl Display for SimpleReply { pub enum Instruction { OpenRegion(RegionIdent), CloseRegion(RegionIdent), + InvalidateTableCache(TableIdent), } impl Display for Instruction { @@ -71,6 +99,7 @@ impl Display for Instruction { match self { Self::OpenRegion(region) => write!(f, "Instruction::OpenRegion({})", region), Self::CloseRegion(region) => write!(f, "Instruction::CloseRegion({})", region), + Self::InvalidateTableCache(table) => write!(f, "Instruction::Invalidate({})", table), } } } @@ -80,6 +109,7 @@ impl Display for Instruction { pub enum InstructionReply { OpenRegion(SimpleReply), CloseRegion(SimpleReply), + InvalidateTableCache(SimpleReply), } impl Display for InstructionReply { @@ -87,6 +117,9 @@ impl Display for InstructionReply { match self { Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply), Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply), + Self::InvalidateTableCache(reply) => { + write!(f, "InstructionReply::Invalidate({})", reply) + } } } } diff --git a/src/datanode/src/heartbeat.rs b/src/datanode/src/heartbeat.rs index cb607e4b24..8b193f84e4 100644 --- a/src/datanode/src/heartbeat.rs +++ b/src/datanode/src/heartbeat.rs @@ -41,7 +41,7 @@ pub struct HeartbeatTask { meta_client: Arc, catalog_manager: CatalogManagerRef, interval: u64, - heartbeat_response_handler_exector: HeartbeatResponseHandlerExecutorRef, + resp_handler_executor: HeartbeatResponseHandlerExecutorRef, } impl Drop for HeartbeatTask { @@ -58,7 +58,7 @@ impl HeartbeatTask { server_hostname: Option, meta_client: Arc, catalog_manager: CatalogManagerRef, - heartbeat_response_handler_exector: HeartbeatResponseHandlerExecutorRef, + resp_handler_executor: HeartbeatResponseHandlerExecutorRef, ) -> Self { Self { node_id, @@ -68,7 +68,7 @@ impl HeartbeatTask { meta_client, catalog_manager, interval: 5_000, // default interval is set to 5 secs - heartbeat_response_handler_exector, + resp_handler_executor, } } @@ -95,7 +95,7 @@ impl HeartbeatTask { let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res); if let Err(e) = Self::handle_response(ctx, handler_executor.clone()) { - error!(e;"Error while handling heartbeat response"); + error!(e; "Error while handling heartbeat response"); } if !running.load(Ordering::Acquire) { info!("Heartbeat task shutdown"); @@ -134,7 +134,7 @@ impl HeartbeatTask { let meta_client = self.meta_client.clone(); let catalog_manager_clone = self.catalog_manager.clone(); - let handler_executor = self.heartbeat_response_handler_exector.clone(); + let handler_executor = self.resp_handler_executor.clone(); let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16); let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx)); diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs index ce9061e2bd..08a6b4a047 100644 --- a/src/datanode/src/heartbeat/handler/close_region.rs +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -74,7 +74,7 @@ impl HeartbeatResponseHandler for CloseRegionHandler { .send((meta, CloseRegionHandler::map_result(result))) .await { - error!(e;"Failed to send reply to mailbox"); + error!(e; "Failed to send reply to mailbox"); } }); diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index 5965a474ad..a7d5ccc6fc 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -24,6 +24,12 @@ use store_api::storage::RegionId; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to handle heartbeat response, source: {}", source))] + HandleHeartbeatResponse { + location: Location, + source: common_meta::error::Error, + }, + #[snafu(display("{source}"))] External { #[snafu(backtrace)] @@ -541,6 +547,8 @@ impl ErrorExt for Error { Error::NotSupported { .. } => StatusCode::Unsupported, + Error::HandleHeartbeatResponse { source, .. } => source.status_code(), + Error::RuntimeResource { source, .. } => source.status_code(), Error::ExecutePromql { source, .. } => source.status_code(), diff --git a/src/frontend/src/heartbeat.rs b/src/frontend/src/heartbeat.rs index 4ff1bf1120..c069e5c548 100644 --- a/src/frontend/src/heartbeat.rs +++ b/src/frontend/src/heartbeat.rs @@ -13,30 +13,46 @@ // limitations under the License. use std::sync::Arc; -use std::time::Duration; -use api::v1::meta::{HeartbeatRequest, HeartbeatResponse}; +use api::v1::meta::HeartbeatRequest; +use common_meta::heartbeat::handler::{ + HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef, +}; +use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage}; +use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message; use common_telemetry::tracing::trace; use common_telemetry::{error, info}; use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient}; use snafu::ResultExt; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Receiver; +use tokio::time::{Duration, Instant}; use crate::error; use crate::error::Result; +pub mod handler; + #[derive(Clone)] pub struct HeartbeatTask { meta_client: Arc, report_interval: u64, retry_interval: u64, + resp_handler_executor: HeartbeatResponseHandlerExecutorRef, } impl HeartbeatTask { - pub fn new(meta_client: Arc, report_interval: u64, retry_interval: u64) -> Self { + pub fn new( + meta_client: Arc, + report_interval: u64, + retry_interval: u64, + resp_handler_executor: HeartbeatResponseHandlerExecutorRef, + ) -> Self { HeartbeatTask { meta_client, report_interval, retry_interval, + resp_handler_executor, } } @@ -49,21 +65,30 @@ impl HeartbeatTask { info!("A heartbeat connection has been established with metasrv"); - self.start_handle_resp_stream(resp_stream); + let (outgoing_tx, outgoing_rx) = mpsc::channel(16); + let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx)); - self.start_heartbeat_report(req_sender); + self.start_handle_resp_stream(resp_stream, mailbox); + + self.start_heartbeat_report(req_sender, outgoing_rx); Ok(()) } - fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream) { + fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) { let capture_self = self.clone(); let retry_interval = self.retry_interval; common_runtime::spawn_bg(async move { loop { match resp_stream.message().await { - Ok(Some(resp)) => capture_self.handle_response(resp).await, + Ok(Some(resp)) => { + trace!("Received a heartbeat response: {:?}", resp); + let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp); + if let Err(e) = capture_self.handle_response(ctx) { + error!(e; "Error while handling heartbeat response"); + } + } Ok(None) => break, Err(e) => { error!(e; "Occur error while reading heartbeat response"); @@ -79,27 +104,61 @@ impl HeartbeatTask { }); } - fn start_heartbeat_report(&self, req_sender: HeartbeatSender) { + fn start_heartbeat_report( + &self, + req_sender: HeartbeatSender, + mut outgoing_rx: Receiver, + ) { let report_interval = self.report_interval; common_runtime::spawn_bg(async move { + let sleep = tokio::time::sleep(Duration::from_millis(0)); + tokio::pin!(sleep); + loop { - let req = HeartbeatRequest::default(); + let req = tokio::select! { + message = outgoing_rx.recv() => { + if let Some(message) = message { + match outgoing_message_to_mailbox_message(message) { + Ok(message) => { + let req = HeartbeatRequest { + mailbox_message: Some(message), + ..Default::default() + }; + Some(req) + } + Err(e) => { + error!(e;"Failed to encode mailbox messages!"); + None + } + } + } else { + // Receives None that means Sender was dropped, we need to break the current loop + break + } + } + _ = &mut sleep => { + sleep.as_mut().reset(Instant::now() + Duration::from_secs(report_interval)); + Some(HeartbeatRequest::default()) + } + }; - if let Err(e) = req_sender.send(req.clone()).await { - error!(e; "Failed to send heartbeat to metasrv"); - break; - } else { - trace!("Send a heartbeat request to metasrv, content: {:?}", req); + if let Some(req) = req { + if let Err(e) = req_sender.send(req.clone()).await { + error!(e; "Failed to send heartbeat to metasrv"); + break; + } else { + trace!("Send a heartbeat request to metasrv, content: {:?}", req); + } } - - tokio::time::sleep(Duration::from_secs(report_interval)).await; } }); } - async fn handle_response(&self, resp: HeartbeatResponse) { - trace!("Received a heartbeat response: {:?}", resp); + fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> { + self.resp_handler_executor + .handle(ctx) + .context(error::HandleHeartbeatResponseSnafu) } async fn start_with_retry(&self, retry_interval: Duration) { diff --git a/src/frontend/src/heartbeat/handler.rs b/src/frontend/src/heartbeat/handler.rs new file mode 100644 index 0000000000..8e4a6c80c3 --- /dev/null +++ b/src/frontend/src/heartbeat/handler.rs @@ -0,0 +1,17 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod invalidate_table_cache; +#[cfg(test)] +mod tests; diff --git a/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs new file mode 100644 index 0000000000..9dfc9455c3 --- /dev/null +++ b/src/frontend/src/heartbeat/handler/invalidate_table_cache.rs @@ -0,0 +1,92 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use catalog::helper::TableGlobalKey; +use catalog::remote::KvCacheInvalidatorRef; +use common_meta::error::Result as MetaResult; +use common_meta::heartbeat::handler::{ + HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, +}; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent}; +use common_telemetry::error; + +#[derive(Clone)] +pub struct InvalidateTableCacheHandler { + backend_cache_invalidtor: KvCacheInvalidatorRef, +} + +impl HeartbeatResponseHandler for InvalidateTableCacheHandler { + fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { + matches!( + ctx.incoming_message.as_ref(), + Some((_, Instruction::InvalidateTableCache { .. })) + ) + } + + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult { + // TODO(weny): considers introducing a macro + let Some((meta, Instruction::InvalidateTableCache(table_ident))) = ctx.incoming_message.take() else { + unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'"); + }; + + let mailbox = ctx.mailbox.clone(); + let self_ref = self.clone(); + let TableIdent { + catalog, + schema, + table, + .. + } = table_ident; + + common_runtime::spawn_bg(async move { + self_ref.invalidate_table(&catalog, &schema, &table).await; + + if let Err(e) = mailbox + .send(( + meta, + InstructionReply::InvalidateTableCache(SimpleReply { + result: true, + error: None, + }), + )) + .await + { + error!(e; "Failed to send reply to mailbox"); + } + }); + + Ok(HandleControl::Done) + } +} + +impl InvalidateTableCacheHandler { + pub fn new(backend_cache_invalidtor: KvCacheInvalidatorRef) -> Self { + Self { + backend_cache_invalidtor, + } + } + + async fn invalidate_table(&self, catalog_name: &str, schema_name: &str, table_name: &str) { + let tg_key = TableGlobalKey { + catalog_name: catalog_name.to_string(), + schema_name: schema_name.to_string(), + table_name: table_name.to_string(), + } + .to_string(); + + let tg_key = tg_key.as_bytes(); + + self.backend_cache_invalidtor.invalidate_key(tg_key).await; + } +} diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs new file mode 100644 index 0000000000..9e24984c24 --- /dev/null +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -0,0 +1,129 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use api::v1::meta::HeartbeatResponse; +use catalog::helper::TableGlobalKey; +use catalog::remote::KvCacheInvalidator; +use common_meta::heartbeat::handler::{ + HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, +}; +use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; +use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent}; +use tokio::sync::mpsc; + +use super::invalidate_table_cache::InvalidateTableCacheHandler; + +pub struct MockKvCacheInvalidtor { + inner: Mutex, i32>>, +} + +#[async_trait::async_trait] +impl KvCacheInvalidator for MockKvCacheInvalidtor { + async fn invalidate_key(&self, key: &[u8]) { + self.inner.lock().unwrap().remove(key); + } +} + +#[tokio::test] +async fn test_invalidate_table_cache_handler() { + let table_key = TableGlobalKey { + catalog_name: "test".to_string(), + schema_name: "greptime".to_string(), + table_name: "foo_table".to_string(), + }; + + let inner = HashMap::from([(table_key.to_string().as_bytes().to_vec(), 1)]); + let backend = Arc::new(MockKvCacheInvalidtor { + inner: Mutex::new(inner), + }); + + let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( + InvalidateTableCacheHandler::new(backend.clone()), + )])); + + let (tx, mut rx) = mpsc::channel(8); + let mailbox = Arc::new(HeartbeatMailbox::new(tx)); + + // removes a valid key + handle_instruction( + executor.clone(), + mailbox.clone(), + Instruction::InvalidateTableCache(TableIdent { + cluster_id: 1, + datanode_id: 2, + catalog: "test".to_string(), + schema: "greptime".to_string(), + table: "foo_table".to_string(), + table_id: 0, + engine: "mito".to_string(), + }), + ); + + let (_, reply) = rx.recv().await.unwrap(); + assert_matches!( + reply, + InstructionReply::InvalidateTableCache(SimpleReply { result: true, .. }) + ); + assert!(!backend + .inner + .lock() + .unwrap() + .contains_key(table_key.to_string().as_bytes())); + + // removes a invalid key + handle_instruction( + executor, + mailbox, + Instruction::InvalidateTableCache(TableIdent { + cluster_id: 1, + datanode_id: 2, + catalog: "test".to_string(), + schema: "greptime".to_string(), + table: "not_found".to_string(), + table_id: 0, + engine: "mito".to_string(), + }), + ); + + let (_, reply) = rx.recv().await.unwrap(); + assert_matches!( + reply, + InstructionReply::InvalidateTableCache(SimpleReply { result: true, .. }) + ); +} + +pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta { + MessageMeta { + id, + subject: subject.to_string(), + to: to.to_string(), + from: from.to_string(), + } +} + +fn handle_instruction( + executor: Arc, + mailbox: Arc, + instruction: Instruction, +) { + let response = HeartbeatResponse::default(); + let mut ctx: HeartbeatResponseHandlerContext = + HeartbeatResponseHandlerContext::new(mailbox, response); + ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction)); + executor.handle(ctx).unwrap(); +} diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index a550ac57e1..8927f79b95 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -36,6 +36,8 @@ use common_base::Plugins; use common_catalog::consts::MITO_ENGINE; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; +use common_meta::heartbeat::handler::HandlerGroupExecutor; use common_query::Output; use common_telemetry::logging::{debug, info}; use common_telemetry::timer; @@ -75,6 +77,7 @@ use crate::error::{ }; use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory}; use crate::frontend::FrontendOptions; +use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler; use crate::heartbeat::HeartbeatTask; use crate::instance::standalone::StandaloneGrpcQueryHandler; use crate::metrics; @@ -143,7 +146,7 @@ impl Instance { let mut catalog_manager = FrontendCatalogManager::new( meta_backend.clone(), - meta_backend, + meta_backend.clone(), partition_manager, datanode_clients.clone(), ); @@ -173,7 +176,17 @@ impl Instance { plugins.insert::(statement_executor.clone()); - let heartbeat_task = Some(HeartbeatTask::new(meta_client, 5, 5)); + let handlers_executor = HandlerGroupExecutor::new(vec![ + Arc::new(ParseMailboxMessageHandler::default()), + Arc::new(InvalidateTableCacheHandler::new(meta_backend)), + ]); + + let heartbeat_task = Some(HeartbeatTask::new( + meta_client, + 5, + 5, + Arc::new(handlers_executor), + )); Ok(Instance { catalog_manager,