feat: add invalidate table cache handler (#1633)

* feat: add invalidate table cache handler

* feat: setup invalidate table cache handler for frontend

* test: add test for invalidate table cache handler

* chore: apply suggestions from CR

* chore: apply suggestions from CR

* fix: fix report_interval unit
This commit is contained in:
Weny Xu
2023-05-25 18:45:45 +09:00
committed by GitHub
parent 8a7998cd25
commit 953793143b
9 changed files with 377 additions and 26 deletions

View File

@@ -47,6 +47,33 @@ impl Display for RegionIdent {
}
}
#[derive(Eq, Hash, PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct TableIdent {
pub cluster_id: ClusterId,
pub datanode_id: DatanodeId,
pub catalog: String,
pub schema: String,
pub table: String,
pub table_id: u32,
pub engine: String,
}
impl Display for TableIdent {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(
f,
"TableIdent(datanode_id='{}.{}', table_id='{}', table_name='{}.{}.{}', table_engine='{}')",
self.cluster_id,
self.datanode_id,
self.table_id,
self.catalog,
self.schema,
self.table,
self.engine,
)
}
}
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
pub struct SimpleReply {
pub result: bool,
@@ -64,6 +91,7 @@ impl Display for SimpleReply {
pub enum Instruction {
OpenRegion(RegionIdent),
CloseRegion(RegionIdent),
InvalidateTableCache(TableIdent),
}
impl Display for Instruction {
@@ -71,6 +99,7 @@ impl Display for Instruction {
match self {
Self::OpenRegion(region) => write!(f, "Instruction::OpenRegion({})", region),
Self::CloseRegion(region) => write!(f, "Instruction::CloseRegion({})", region),
Self::InvalidateTableCache(table) => write!(f, "Instruction::Invalidate({})", table),
}
}
}
@@ -80,6 +109,7 @@ impl Display for Instruction {
pub enum InstructionReply {
OpenRegion(SimpleReply),
CloseRegion(SimpleReply),
InvalidateTableCache(SimpleReply),
}
impl Display for InstructionReply {
@@ -87,6 +117,9 @@ impl Display for InstructionReply {
match self {
Self::OpenRegion(reply) => write!(f, "InstructionReply::OpenRegion({})", reply),
Self::CloseRegion(reply) => write!(f, "InstructionReply::CloseRegion({})", reply),
Self::InvalidateTableCache(reply) => {
write!(f, "InstructionReply::Invalidate({})", reply)
}
}
}
}

View File

@@ -41,7 +41,7 @@ pub struct HeartbeatTask {
meta_client: Arc<MetaClient>,
catalog_manager: CatalogManagerRef,
interval: u64,
heartbeat_response_handler_exector: HeartbeatResponseHandlerExecutorRef,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
}
impl Drop for HeartbeatTask {
@@ -58,7 +58,7 @@ impl HeartbeatTask {
server_hostname: Option<String>,
meta_client: Arc<MetaClient>,
catalog_manager: CatalogManagerRef,
heartbeat_response_handler_exector: HeartbeatResponseHandlerExecutorRef,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
Self {
node_id,
@@ -68,7 +68,7 @@ impl HeartbeatTask {
meta_client,
catalog_manager,
interval: 5_000, // default interval is set to 5 secs
heartbeat_response_handler_exector,
resp_handler_executor,
}
}
@@ -95,7 +95,7 @@ impl HeartbeatTask {
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
if let Err(e) = Self::handle_response(ctx, handler_executor.clone()) {
error!(e;"Error while handling heartbeat response");
error!(e; "Error while handling heartbeat response");
}
if !running.load(Ordering::Acquire) {
info!("Heartbeat task shutdown");
@@ -134,7 +134,7 @@ impl HeartbeatTask {
let meta_client = self.meta_client.clone();
let catalog_manager_clone = self.catalog_manager.clone();
let handler_executor = self.heartbeat_response_handler_exector.clone();
let handler_executor = self.resp_handler_executor.clone();
let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));

View File

@@ -74,7 +74,7 @@ impl HeartbeatResponseHandler for CloseRegionHandler {
.send((meta, CloseRegionHandler::map_result(result)))
.await
{
error!(e;"Failed to send reply to mailbox");
error!(e; "Failed to send reply to mailbox");
}
});

View File

@@ -24,6 +24,12 @@ use store_api::storage::RegionId;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to handle heartbeat response, source: {}", source))]
HandleHeartbeatResponse {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("{source}"))]
External {
#[snafu(backtrace)]
@@ -541,6 +547,8 @@ impl ErrorExt for Error {
Error::NotSupported { .. } => StatusCode::Unsupported,
Error::HandleHeartbeatResponse { source, .. } => source.status_code(),
Error::RuntimeResource { source, .. } => source.status_code(),
Error::ExecutePromql { source, .. } => source.status_code(),

View File

@@ -13,30 +13,46 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use api::v1::meta::{HeartbeatRequest, HeartbeatResponse};
use api::v1::meta::HeartbeatRequest;
use common_meta::heartbeat::handler::{
HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
use common_telemetry::tracing::trace;
use common_telemetry::{error, info};
use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
use snafu::ResultExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use tokio::time::{Duration, Instant};
use crate::error;
use crate::error::Result;
pub mod handler;
#[derive(Clone)]
pub struct HeartbeatTask {
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
}
impl HeartbeatTask {
pub fn new(meta_client: Arc<MetaClient>, report_interval: u64, retry_interval: u64) -> Self {
pub fn new(
meta_client: Arc<MetaClient>,
report_interval: u64,
retry_interval: u64,
resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
) -> Self {
HeartbeatTask {
meta_client,
report_interval,
retry_interval,
resp_handler_executor,
}
}
@@ -49,21 +65,30 @@ impl HeartbeatTask {
info!("A heartbeat connection has been established with metasrv");
self.start_handle_resp_stream(resp_stream);
let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
self.start_heartbeat_report(req_sender);
self.start_handle_resp_stream(resp_stream, mailbox);
self.start_heartbeat_report(req_sender, outgoing_rx);
Ok(())
}
fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream) {
fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
let capture_self = self.clone();
let retry_interval = self.retry_interval;
common_runtime::spawn_bg(async move {
loop {
match resp_stream.message().await {
Ok(Some(resp)) => capture_self.handle_response(resp).await,
Ok(Some(resp)) => {
trace!("Received a heartbeat response: {:?}", resp);
let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
if let Err(e) = capture_self.handle_response(ctx) {
error!(e; "Error while handling heartbeat response");
}
}
Ok(None) => break,
Err(e) => {
error!(e; "Occur error while reading heartbeat response");
@@ -79,27 +104,61 @@ impl HeartbeatTask {
});
}
fn start_heartbeat_report(&self, req_sender: HeartbeatSender) {
fn start_heartbeat_report(
&self,
req_sender: HeartbeatSender,
mut outgoing_rx: Receiver<OutgoingMessage>,
) {
let report_interval = self.report_interval;
common_runtime::spawn_bg(async move {
let sleep = tokio::time::sleep(Duration::from_millis(0));
tokio::pin!(sleep);
loop {
let req = HeartbeatRequest::default();
let req = tokio::select! {
message = outgoing_rx.recv() => {
if let Some(message) = message {
match outgoing_message_to_mailbox_message(message) {
Ok(message) => {
let req = HeartbeatRequest {
mailbox_message: Some(message),
..Default::default()
};
Some(req)
}
Err(e) => {
error!(e;"Failed to encode mailbox messages!");
None
}
}
} else {
// Receives None that means Sender was dropped, we need to break the current loop
break
}
}
_ = &mut sleep => {
sleep.as_mut().reset(Instant::now() + Duration::from_secs(report_interval));
Some(HeartbeatRequest::default())
}
};
if let Err(e) = req_sender.send(req.clone()).await {
error!(e; "Failed to send heartbeat to metasrv");
break;
} else {
trace!("Send a heartbeat request to metasrv, content: {:?}", req);
if let Some(req) = req {
if let Err(e) = req_sender.send(req.clone()).await {
error!(e; "Failed to send heartbeat to metasrv");
break;
} else {
trace!("Send a heartbeat request to metasrv, content: {:?}", req);
}
}
tokio::time::sleep(Duration::from_secs(report_interval)).await;
}
});
}
async fn handle_response(&self, resp: HeartbeatResponse) {
trace!("Received a heartbeat response: {:?}", resp);
fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
self.resp_handler_executor
.handle(ctx)
.context(error::HandleHeartbeatResponseSnafu)
}
async fn start_with_retry(&self, retry_interval: Duration) {

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod invalidate_table_cache;
#[cfg(test)]
mod tests;

View File

@@ -0,0 +1,92 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use catalog::helper::TableGlobalKey;
use catalog::remote::KvCacheInvalidatorRef;
use common_meta::error::Result as MetaResult;
use common_meta::heartbeat::handler::{
HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent};
use common_telemetry::error;
#[derive(Clone)]
pub struct InvalidateTableCacheHandler {
backend_cache_invalidtor: KvCacheInvalidatorRef,
}
impl HeartbeatResponseHandler for InvalidateTableCacheHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::InvalidateTableCache { .. }))
)
}
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
// TODO(weny): considers introducing a macro
let Some((meta, Instruction::InvalidateTableCache(table_ident))) = ctx.incoming_message.take() else {
unreachable!("InvalidateTableCacheHandler: should be guarded by 'is_acceptable'");
};
let mailbox = ctx.mailbox.clone();
let self_ref = self.clone();
let TableIdent {
catalog,
schema,
table,
..
} = table_ident;
common_runtime::spawn_bg(async move {
self_ref.invalidate_table(&catalog, &schema, &table).await;
if let Err(e) = mailbox
.send((
meta,
InstructionReply::InvalidateTableCache(SimpleReply {
result: true,
error: None,
}),
))
.await
{
error!(e; "Failed to send reply to mailbox");
}
});
Ok(HandleControl::Done)
}
}
impl InvalidateTableCacheHandler {
pub fn new(backend_cache_invalidtor: KvCacheInvalidatorRef) -> Self {
Self {
backend_cache_invalidtor,
}
}
async fn invalidate_table(&self, catalog_name: &str, schema_name: &str, table_name: &str) {
let tg_key = TableGlobalKey {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: table_name.to_string(),
}
.to_string();
let tg_key = tg_key.as_bytes();
self.backend_cache_invalidtor.invalidate_key(tg_key).await;
}
}

View File

@@ -0,0 +1,129 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::assert_matches::assert_matches;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use api::v1::meta::HeartbeatResponse;
use catalog::helper::TableGlobalKey;
use catalog::remote::KvCacheInvalidator;
use common_meta::heartbeat::handler::{
HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor,
};
use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta};
use common_meta::instruction::{Instruction, InstructionReply, SimpleReply, TableIdent};
use tokio::sync::mpsc;
use super::invalidate_table_cache::InvalidateTableCacheHandler;
pub struct MockKvCacheInvalidtor {
inner: Mutex<HashMap<Vec<u8>, i32>>,
}
#[async_trait::async_trait]
impl KvCacheInvalidator for MockKvCacheInvalidtor {
async fn invalidate_key(&self, key: &[u8]) {
self.inner.lock().unwrap().remove(key);
}
}
#[tokio::test]
async fn test_invalidate_table_cache_handler() {
let table_key = TableGlobalKey {
catalog_name: "test".to_string(),
schema_name: "greptime".to_string(),
table_name: "foo_table".to_string(),
};
let inner = HashMap::from([(table_key.to_string().as_bytes().to_vec(), 1)]);
let backend = Arc::new(MockKvCacheInvalidtor {
inner: Mutex::new(inner),
});
let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new(
InvalidateTableCacheHandler::new(backend.clone()),
)]));
let (tx, mut rx) = mpsc::channel(8);
let mailbox = Arc::new(HeartbeatMailbox::new(tx));
// removes a valid key
handle_instruction(
executor.clone(),
mailbox.clone(),
Instruction::InvalidateTableCache(TableIdent {
cluster_id: 1,
datanode_id: 2,
catalog: "test".to_string(),
schema: "greptime".to_string(),
table: "foo_table".to_string(),
table_id: 0,
engine: "mito".to_string(),
}),
);
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::InvalidateTableCache(SimpleReply { result: true, .. })
);
assert!(!backend
.inner
.lock()
.unwrap()
.contains_key(table_key.to_string().as_bytes()));
// removes a invalid key
handle_instruction(
executor,
mailbox,
Instruction::InvalidateTableCache(TableIdent {
cluster_id: 1,
datanode_id: 2,
catalog: "test".to_string(),
schema: "greptime".to_string(),
table: "not_found".to_string(),
table_id: 0,
engine: "mito".to_string(),
}),
);
let (_, reply) = rx.recv().await.unwrap();
assert_matches!(
reply,
InstructionReply::InvalidateTableCache(SimpleReply { result: true, .. })
);
}
pub fn test_message_meta(id: u64, subject: &str, to: &str, from: &str) -> MessageMeta {
MessageMeta {
id,
subject: subject.to_string(),
to: to.to_string(),
from: from.to_string(),
}
}
fn handle_instruction(
executor: Arc<dyn HeartbeatResponseHandlerExecutor>,
mailbox: Arc<HeartbeatMailbox>,
instruction: Instruction,
) {
let response = HeartbeatResponse::default();
let mut ctx: HeartbeatResponseHandlerContext =
HeartbeatResponseHandlerContext::new(mailbox, response);
ctx.incoming_message = Some((test_message_meta(1, "hi", "foo", "bar"), instruction));
executor.handle(ctx).unwrap();
}

View File

@@ -36,6 +36,8 @@ use common_base::Plugins;
use common_catalog::consts::MITO_ENGINE;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
use common_meta::heartbeat::handler::HandlerGroupExecutor;
use common_query::Output;
use common_telemetry::logging::{debug, info};
use common_telemetry::timer;
@@ -75,6 +77,7 @@ use crate::error::{
};
use crate::expr_factory::{CreateExprFactoryRef, DefaultCreateExprFactory};
use crate::frontend::FrontendOptions;
use crate::heartbeat::handler::invalidate_table_cache::InvalidateTableCacheHandler;
use crate::heartbeat::HeartbeatTask;
use crate::instance::standalone::StandaloneGrpcQueryHandler;
use crate::metrics;
@@ -143,7 +146,7 @@ impl Instance {
let mut catalog_manager = FrontendCatalogManager::new(
meta_backend.clone(),
meta_backend,
meta_backend.clone(),
partition_manager,
datanode_clients.clone(),
);
@@ -173,7 +176,17 @@ impl Instance {
plugins.insert::<StatementExecutorRef>(statement_executor.clone());
let heartbeat_task = Some(HeartbeatTask::new(meta_client, 5, 5));
let handlers_executor = HandlerGroupExecutor::new(vec![
Arc::new(ParseMailboxMessageHandler::default()),
Arc::new(InvalidateTableCacheHandler::new(meta_backend)),
]);
let heartbeat_task = Some(HeartbeatTask::new(
meta_client,
5,
5,
Arc::new(handlers_executor),
));
Ok(Instance {
catalog_manager,