diff --git a/Cargo.lock b/Cargo.lock index 5248a61d4e..fa2584dab2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1517,6 +1517,7 @@ dependencies = [ "api", "arrow-flight", "async-stream", + "async-trait", "common-base", "common-catalog", "common-error", diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index aa37d26a3d..b77f515b97 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -11,6 +11,7 @@ testing = [] api = { workspace = true } arrow-flight.workspace = true async-stream.workspace = true +async-trait.workspace = true common-base = { workspace = true } common-catalog = { workspace = true } common-error = { workspace = true } diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index eeda1510c8..e503555dd2 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -13,12 +13,15 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; +use std::sync::Arc; use std::time::Duration; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; +use common_meta::datanode_manager::{Datanode, DatanodeManager}; use common_meta::peer::Peer; use moka::future::{Cache, CacheBuilder}; +use crate::region::RegionRequester; use crate::Client; pub struct DatanodeClients { @@ -40,6 +43,15 @@ impl Debug for DatanodeClients { } } +#[async_trait::async_trait] +impl DatanodeManager for DatanodeClients { + async fn datanode(&self, datanode: &Peer) -> Arc { + let client = self.get_client(datanode).await; + + Arc::new(RegionRequester::new(client)) + } +} + impl DatanodeClients { pub fn new(config: ChannelConfig) -> Self { Self { diff --git a/src/client/src/region.rs b/src/client/src/region.rs index bef7404b1b..2ebb80ae55 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -14,15 +14,18 @@ use api::v1::region::{region_request, RegionRequest, RegionRequestHeader, RegionResponse}; use api::v1::ResponseHeader; +use async_trait::async_trait; +use common_error::ext::BoxedError; use common_error::status_code::StatusCode; +use common_meta::datanode_manager::{AffectedRows, Datanode}; +use common_meta::error::{self as meta_error, Result as MetaResult}; use common_telemetry::timer; -use snafu::OptionExt; +use snafu::{location, Location, OptionExt}; +use crate::error::Error::FlightGet; use crate::error::{IllegalDatabaseResponseSnafu, Result, ServerSnafu}; use crate::{metrics, Client}; -type AffectedRows = u64; - #[derive(Debug)] pub struct RegionRequester { trace_id: u64, @@ -30,6 +33,24 @@ pub struct RegionRequester { client: Client, } +#[async_trait] +impl Datanode for RegionRequester { + async fn handle(&self, request: region_request::Body) -> MetaResult { + self.handle_inner(request).await.map_err(|err| { + if matches!(err, FlightGet { .. }) { + meta_error::Error::RetryLater { + source: BoxedError::new(err), + } + } else { + meta_error::Error::OperateRegion { + source: BoxedError::new(err), + location: location!(), + } + } + }) + } +} + impl RegionRequester { pub fn new(client: Client) -> Self { // TODO(LFC): Pass in trace_id and span_id from some context when we have it. @@ -40,7 +61,7 @@ impl RegionRequester { } } - pub async fn handle(self, request: region_request::Body) -> Result { + async fn handle_inner(&self, request: region_request::Body) -> Result { let request_type = request.as_ref().to_string(); let request = RegionRequest { @@ -67,6 +88,10 @@ impl RegionRequester { Ok(affected_rows) } + + pub async fn handle(&self, request: region_request::Body) -> Result { + self.handle_inner(request).await + } } fn check_response_header(header: Option) -> Result<()> { diff --git a/src/common/meta/src/cache_invalidator.rs b/src/common/meta/src/cache_invalidator.rs new file mode 100644 index 0000000000..993ac3b098 --- /dev/null +++ b/src/common/meta/src/cache_invalidator.rs @@ -0,0 +1,31 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use crate::error::Result; +use crate::ident::TableIdent; + +/// Places context of invalidating cache. e.g., span id, trace id etc. +pub struct Context { + pub subject: Option, +} + +#[async_trait::async_trait] +pub trait CacheInvalidator: Send + Sync { + // Invalidates table cache + async fn invalidate_table(&self, ctx: &Context, table_ident: TableIdent) -> Result<()>; +} + +pub type CacheInvalidatorRef = Arc; diff --git a/src/common/meta/src/datanode_manager.rs b/src/common/meta/src/datanode_manager.rs new file mode 100644 index 0000000000..18e70f94c9 --- /dev/null +++ b/src/common/meta/src/datanode_manager.rs @@ -0,0 +1,38 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use api::v1::region::region_request; + +use crate::error::Result; +use crate::peer::Peer; + +pub type AffectedRows = u64; + +#[async_trait::async_trait] +pub trait Datanode: Send + Sync { + /// Handles DML, and DDL requests. + async fn handle(&self, request: region_request::Body) -> Result; +} + +pub type DatanodeRef = Arc; + +#[async_trait::async_trait] +pub trait DatanodeManager: Send + Sync { + /// Retrieves a target `datanode`. + async fn datanode(&self, datanode: &Peer) -> DatanodeRef; +} + +pub type DatanodeManagerRef = Arc; diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 97a28fe084..4662f28d20 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -150,6 +150,15 @@ pub enum Error { #[snafu(display("Invalid heartbeat response, location: {}", location))] InvalidHeartbeatResponse { location: Location }, + + #[snafu(display("{}", source))] + OperateRegion { + location: Location, + source: BoxedError, + }, + + #[snafu(display("Retry later, source: {}", source))] + RetryLater { source: BoxedError }, } pub type Result = std::result::Result; @@ -185,8 +194,9 @@ impl ErrorExt for Error { | ConvertRawKey { .. } | DecodeProto { .. } => StatusCode::Unexpected, + RetryLater { source, .. } => source.status_code(), + OperateRegion { source, .. } => source.status_code(), MetaSrv { source, .. } => source.status_code(), - InvalidCatalogValue { source, .. } => source.status_code(), } } @@ -195,3 +205,17 @@ impl ErrorExt for Error { self } } + +impl Error { + /// Creates a new [Error::RetryLater] error from source `err`. + pub fn retry_later(err: E) -> Error { + Error::RetryLater { + source: BoxedError::new(err), + } + } + + /// Determine whether it is a retry later type through [StatusCode] + pub fn is_retry_later(&self) -> bool { + matches!(self, Error::RetryLater { .. }) + } +} diff --git a/src/common/meta/src/lib.rs b/src/common/meta/src/lib.rs index e052afeb2a..f3df2c74eb 100644 --- a/src/common/meta/src/lib.rs +++ b/src/common/meta/src/lib.rs @@ -14,6 +14,8 @@ #![feature(btree_extract_if)] +pub mod cache_invalidator; +pub mod datanode_manager; pub mod error; pub mod heartbeat; // TODO(weny): Removes it diff --git a/src/meta-srv/src/cache_invalidator.rs b/src/meta-srv/src/cache_invalidator.rs new file mode 100644 index 0000000000..be3cab10ec --- /dev/null +++ b/src/meta-srv/src/cache_invalidator.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::meta::MailboxMessage; +use common_error::ext::BoxedError; +use common_meta::cache_invalidator::{CacheInvalidator, Context}; +use common_meta::error::{self as meta_error, Result as MetaResult}; +use common_meta::ident::TableIdent; +use common_meta::instruction::Instruction; +use snafu::ResultExt; + +use crate::metasrv::MetasrvInfo; +use crate::service::mailbox::{BroadcastChannel, MailboxRef}; + +const DEFAULT_SUBJECT: &str = "Invalidate table"; + +pub struct MetasrvCacheInvalidator { + mailbox: MailboxRef, + // Metasrv infos + info: MetasrvInfo, +} + +impl MetasrvCacheInvalidator { + pub fn new(mailbox: MailboxRef, info: MetasrvInfo) -> Self { + Self { mailbox, info } + } +} + +#[async_trait::async_trait] +impl CacheInvalidator for MetasrvCacheInvalidator { + async fn invalidate_table(&self, ctx: &Context, table_ident: TableIdent) -> MetaResult<()> { + let instruction = Instruction::InvalidateTableCache(table_ident); + let subject = &ctx + .subject + .clone() + .unwrap_or_else(|| DEFAULT_SUBJECT.to_string()); + + let msg = &MailboxMessage::json_message( + subject, + &format!("Metasrv@{}", self.info.server_addr), + "Frontend broadcast", + common_time::util::current_time_millis(), + &instruction, + ) + .with_context(|_| meta_error::SerdeJsonSnafu)?; + + self.mailbox + .broadcast(&BroadcastChannel::Frontend, msg) + .await + .map_err(BoxedError::new) + .context(meta_error::MetaSrvSnafu) + } +} diff --git a/src/meta-srv/src/ddl.rs b/src/meta-srv/src/ddl.rs index b7cc193283..6ca38790aa 100644 --- a/src/meta-srv/src/ddl.rs +++ b/src/meta-srv/src/ddl.rs @@ -15,6 +15,8 @@ use std::sync::Arc; use client::client_manager::DatanodeClients; +use common_meta::cache_invalidator::CacheInvalidatorRef; +use common_meta::datanode_manager::DatanodeManagerRef; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_route::TableRouteValue; use common_meta::key::TableMetadataManagerRef; @@ -32,23 +34,24 @@ use crate::error::{ use crate::procedure::alter_table::AlterTableProcedure; use crate::procedure::create_table::CreateTableProcedure; use crate::procedure::drop_table::DropTableProcedure; -use crate::service::mailbox::MailboxRef; pub type DdlManagerRef = Arc; pub struct DdlManager { procedure_manager: ProcedureManagerRef, datanode_clients: Arc, - pub(crate) mailbox: MailboxRef, - pub(crate) server_addr: String, + + pub(crate) cache_invalidator: CacheInvalidatorRef, pub(crate) table_metadata_manager: TableMetadataManagerRef, } #[derive(Clone)] pub(crate) struct DdlContext { + // TODO(weny): removes it pub(crate) datanode_clients: Arc, - pub(crate) mailbox: MailboxRef, - pub(crate) server_addr: String, + + pub(crate) datanode_manager: DatanodeManagerRef, + pub(crate) cache_invalidator: CacheInvalidatorRef, pub(crate) table_metadata_manager: TableMetadataManagerRef, } @@ -56,24 +59,23 @@ impl DdlManager { pub(crate) fn new( procedure_manager: ProcedureManagerRef, datanode_clients: Arc, - mailbox: MailboxRef, - server_addr: String, + cache_invalidator: CacheInvalidatorRef, table_metadata_manager: TableMetadataManagerRef, ) -> Self { Self { procedure_manager, datanode_clients, - mailbox, - server_addr, + cache_invalidator, table_metadata_manager, } } pub(crate) fn create_context(&self) -> DdlContext { DdlContext { + datanode_manager: self.datanode_clients.clone(), datanode_clients: self.datanode_clients.clone(), - mailbox: self.mailbox.clone(), - server_addr: self.server_addr.clone(), + cache_invalidator: self.cache_invalidator.clone(), + table_metadata_manager: self.table_metadata_manager.clone(), } } diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index f608bd90b1..03d815c1d6 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -26,6 +26,19 @@ use crate::pubsub::Message; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error { + #[snafu(display("Failed to invalidate table cache: {}", source))] + InvalidateTableCache { + location: Location, + source: common_meta::error::Error, + }, + + #[snafu(display("Failed to operate region on peer:{}, source: {}", peer, source))] + OperateRegion { + location: Location, + peer: Peer, + source: BoxedError, + }, + #[snafu(display("Failed to list catalogs: {}", source))] ListCatalogs { location: Location, @@ -595,6 +608,7 @@ impl ErrorExt for Error { | Error::ConvertRawTableInfo { .. } | Error::BuildTableMeta { .. } => StatusCode::Unexpected, Error::TableNotFound { .. } => StatusCode::TableNotFound, + Error::InvalidateTableCache { source, .. } => source.status_code(), Error::Table { source, .. } => source.status_code(), Error::RequestDatanode { source, .. } => source.status_code(), Error::InvalidCatalogValue { source, .. } => source.status_code(), @@ -612,6 +626,7 @@ impl ErrorExt for Error { Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted, Error::RegisterProcedureLoader { source, .. } => source.status_code(), + Error::OperateRegion { source, .. } => source.status_code(), Error::TableRouteConversion { source, .. } | Error::ConvertProtoData { source, .. } diff --git a/src/meta-srv/src/lib.rs b/src/meta-srv/src/lib.rs index 31b0a68888..a7329e5353 100644 --- a/src/meta-srv/src/lib.rs +++ b/src/meta-srv/src/lib.rs @@ -16,6 +16,7 @@ #![feature(result_flattening)] pub mod bootstrap; +mod cache_invalidator; pub mod cluster; pub mod ddl; pub mod election; diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index ca3d67dbc4..2f1992bad8 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -97,6 +97,10 @@ impl MetaSrvOptions { } } +pub struct MetasrvInfo { + pub server_addr: String, +} + // Options for datanode. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct DatanodeOptions { diff --git a/src/meta-srv/src/metasrv/builder.rs b/src/meta-srv/src/metasrv/builder.rs index c82d711c51..b027522065 100644 --- a/src/meta-srv/src/metasrv/builder.rs +++ b/src/meta-srv/src/metasrv/builder.rs @@ -22,6 +22,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef}; use common_procedure::local::{LocalManager, ManagerConfig}; use common_procedure::ProcedureManagerRef; +use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef}; use crate::ddl::{DdlManager, DdlManagerRef}; use crate::error::Result; @@ -38,7 +39,7 @@ use crate::lock::memory::MemLock; use crate::lock::DistLockRef; use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef}; use crate::metasrv::{ - ElectionRef, MetaSrv, MetaSrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ, + ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ, }; use crate::procedure::region_failover::RegionFailoverManager; use crate::procedure::state_store::MetaStateStore; @@ -329,12 +330,17 @@ fn build_ddl_manager( .tcp_nodelay(options.datanode.client_options.tcp_nodelay); Arc::new(DatanodeClients::new(datanode_client_channel_config)) }); + let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new( + mailbox.clone(), + MetasrvInfo { + server_addr: options.server_addr.clone(), + }, + )); // TODO(weny): considers to modify the default config of procedure manager Arc::new(DdlManager::new( procedure_manager.clone(), datanode_clients, - mailbox.clone(), - options.server_addr.clone(), + cache_invalidator, table_metadata_manager.clone(), )) } diff --git a/src/meta-srv/src/procedure/alter_table.rs b/src/meta-srv/src/procedure/alter_table.rs index 8e5df33181..916f1ea551 100644 --- a/src/meta-srv/src/procedure/alter_table.rs +++ b/src/meta-srv/src/procedure/alter_table.rs @@ -14,11 +14,10 @@ use std::vec; -use api::v1::meta::MailboxMessage; use async_trait::async_trait; use client::Database; +use common_meta::cache_invalidator::Context; use common_meta::ident::TableIdent; -use common_meta::instruction::Instruction; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_name::TableNameKey; use common_meta::key::table_route::TableRouteValue; @@ -40,7 +39,6 @@ use table::requests::{AlterKind, AlterTableRequest}; use crate::ddl::DdlContext; use crate::error::{self, Result, TableMetadataManagerSnafu}; use crate::procedure::utils::handle_request_datanode_error; -use crate::service::mailbox::BroadcastChannel; // TODO(weny): removes in following PRs. #[allow(dead_code)] @@ -265,23 +263,18 @@ impl AlterTableProcedure { table_id: self.data.table_id(), engine: self.data.table_info().meta.engine.to_string(), }; - let instruction = Instruction::InvalidateTableCache(table_ident); - - let msg = &MailboxMessage::json_message( - "Invalidate table cache by alter table procedure", - &format!("Metasrv@{}", self.context.server_addr), - "Frontend broadcast", - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| error::SerializeToJsonSnafu { - input: instruction.to_string(), - })?; self.context - .mailbox - .broadcast(&BroadcastChannel::Frontend, msg) - .await?; + .cache_invalidator + .invalidate_table( + &Context { + subject: Some("Invalidate table cache by alter table procedure".to_string()), + }, + table_ident, + ) + .await + .context(error::InvalidateTableCacheSnafu)?; + self.data.state = AlterTableState::DatanodeAlterTable; Ok(Status::executing(true)) } diff --git a/src/meta-srv/src/procedure/create_table.rs b/src/meta-srv/src/procedure/create_table.rs index 8fbe4323f9..3b536428da 100644 --- a/src/meta-srv/src/procedure/create_table.rs +++ b/src/meta-srv/src/procedure/create_table.rs @@ -16,15 +16,9 @@ use api::v1::region::region_request::Body as PbRegionRequest; use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest}; use api::v1::SemanticType; use async_trait::async_trait; -use client::region::RegionRequester; -use client::Database; -use common_catalog::consts::MITO2_ENGINE; -use common_error::ext::ErrorExt; -use common_error::status_code::StatusCode; use common_meta::key::table_name::TableNameKey; use common_meta::rpc::ddl::CreateTableTask; use common_meta::rpc::router::{find_leader_regions, find_leaders, RegionRoute}; -use common_meta::table_name::TableName; use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; use common_telemetry::info; @@ -36,10 +30,10 @@ use strum::AsRefStr; use table::engine::TableReference; use table::metadata::{RawTableInfo, TableId}; -use super::utils::{handle_request_datanode_error, handle_retry_error}; use crate::ddl::DdlContext; use crate::error::{self, PrimaryKeyNotFoundSnafu, Result, TableMetadataManagerSnafu}; use crate::metrics; +use crate::procedure::utils::{handle_operate_region_error, handle_retry_error}; pub struct CreateTableProcedure { context: DdlContext, @@ -69,10 +63,6 @@ impl CreateTableProcedure { }) } - fn table_name(&self) -> TableName { - self.creator.data.task.table_name() - } - pub fn table_info(&self) -> &RawTableInfo { &self.creator.data.task.table_info } @@ -111,11 +101,7 @@ impl CreateTableProcedure { return Ok(Status::Done); } - self.creator.data.state = if expr.engine == MITO2_ENGINE { - CreateTableState::DatanodeCreateRegions - } else { - CreateTableState::DatanodeCreateTable - }; + self.creator.data.state = CreateTableState::DatanodeCreateRegions; Ok(Status::executing(true)) } @@ -190,7 +176,7 @@ impl CreateTableProcedure { let mut create_region_tasks = Vec::with_capacity(leaders.len()); for datanode in leaders { - let clients = self.context.datanode_clients.clone(); + let manager = self.context.datanode_manager.clone(); let regions = find_leader_regions(region_routes, &datanode); let requests = regions @@ -209,11 +195,10 @@ impl CreateTableProcedure { create_region_tasks.push(async move { for request in requests { - let client = clients.get_client(&datanode).await; - let requester = RegionRequester::new(client); + let requester = manager.datanode(&datanode).await; if let Err(err) = requester.handle(request).await { - return Err(handle_request_datanode_error(datanode)(err)); + return Err(handle_operate_region_error(datanode)(err)); } } Ok(()) @@ -244,44 +229,6 @@ impl CreateTableProcedure { Ok(Status::Done) } - - async fn on_datanode_create_table(&mut self) -> Result { - let region_routes = &self.creator.data.region_routes; - let table_name = self.table_name(); - let clients = self.context.datanode_clients.clone(); - let leaders = find_leaders(region_routes); - let mut joins = Vec::with_capacity(leaders.len()); - let table_id = self.table_id(); - - for datanode in leaders { - let client = clients.get_client(&datanode).await; - let client = Database::new(&table_name.catalog_name, &table_name.schema_name, client); - - let regions = find_leader_regions(region_routes, &datanode); - let mut create_expr_for_region = self.creator.data.task.create_table.clone(); - create_expr_for_region.region_numbers = regions; - create_expr_for_region.table_id = Some(api::v1::TableId { id: table_id }); - - joins.push(common_runtime::spawn_bg(async move { - if let Err(err) = client.create(create_expr_for_region).await { - if err.status_code() != StatusCode::TableAlreadyExists { - return Err(handle_request_datanode_error(datanode)(err)); - } - } - Ok(()) - })); - } - - let _r = join_all(joins) - .await - .into_iter() - .map(|e| e.context(error::JoinSnafu).flatten()) - .collect::>>()?; - - self.creator.data.state = CreateTableState::CreateMetadata; - - Ok(Status::executing(true)) - } } #[async_trait] @@ -300,7 +247,6 @@ impl Procedure for CreateTableProcedure { match state { CreateTableState::Prepare => self.on_prepare().await, - CreateTableState::DatanodeCreateTable => self.on_datanode_create_table().await, CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await, CreateTableState::CreateMetadata => self.on_create_metadata().await, } @@ -344,9 +290,7 @@ impl TableCreator { enum CreateTableState { /// Prepares to create the table Prepare, - /// Datanode creates the table - DatanodeCreateTable, - /// Create regions on the Datanode + /// Creates regions on the Datanode DatanodeCreateRegions, /// Creates metadata CreateMetadata, @@ -372,6 +316,7 @@ mod test { use std::sync::{Arc, Mutex}; use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr}; + use common_catalog::consts::MITO2_ENGINE; use super::*; use crate::procedure::utils::mock::EchoRegionServer; diff --git a/src/meta-srv/src/procedure/drop_table.rs b/src/meta-srv/src/procedure/drop_table.rs index 9a71a548df..29428fd31b 100644 --- a/src/meta-srv/src/procedure/drop_table.rs +++ b/src/meta-srv/src/procedure/drop_table.rs @@ -12,17 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -use api::v1::meta::MailboxMessage; use api::v1::region::{region_request, DropRequest as PbDropRegionRequest}; -use api::v1::DropTableExpr; use async_trait::async_trait; use client::region::RegionRequester; -use client::Database; -use common_catalog::consts::MITO2_ENGINE; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use common_meta::cache_invalidator::Context; use common_meta::ident::TableIdent; -use common_meta::instruction::Instruction; use common_meta::key::table_info::TableInfoValue; use common_meta::key::table_name::TableNameKey; use common_meta::key::table_route::TableRouteValue; @@ -47,7 +43,7 @@ use crate::ddl::DdlContext; use crate::error::{self, Result, TableMetadataManagerSnafu}; use crate::metrics; use crate::procedure::utils::handle_request_datanode_error; -use crate::service::mailbox::BroadcastChannel; + pub struct DropTableProcedure { context: DdlContext, data: DropTableData, @@ -132,29 +128,19 @@ impl DropTableProcedure { table_id: self.data.task.table_id, engine: engine.to_string(), }; - let instruction = Instruction::InvalidateTableCache(table_ident); - - let msg = &MailboxMessage::json_message( - "Invalidate Table Cache by dropping table procedure", - &format!("Metasrv@{}", self.context.server_addr), - "Frontend broadcast", - common_time::util::current_time_millis(), - &instruction, - ) - .with_context(|_| error::SerializeToJsonSnafu { - input: instruction.to_string(), - })?; self.context - .mailbox - .broadcast(&BroadcastChannel::Frontend, msg) - .await?; + .cache_invalidator + .invalidate_table( + &Context { + subject: Some("Invalidate Table Cache by dropping table procedure".to_string()), + }, + table_ident, + ) + .await + .context(error::InvalidateTableCacheSnafu)?; - self.data.state = if engine == MITO2_ENGINE { - DropTableState::DatanodeDropRegions - } else { - DropTableState::DatanodeDropTable - }; + self.data.state = DropTableState::DatanodeDropRegions; Ok(Status::executing(true)) } @@ -203,50 +189,6 @@ impl DropTableProcedure { Ok(Status::Done) } - - /// Executes drop table instruction on datanode. - async fn on_datanode_drop_table(&mut self) -> Result { - let region_routes = &self.data.region_routes(); - - let table_ref = self.data.table_ref(); - let table_id = self.data.task.table_id; - - let clients = self.context.datanode_clients.clone(); - let leaders = find_leaders(region_routes); - let mut joins = Vec::with_capacity(leaders.len()); - - let expr = DropTableExpr { - catalog_name: table_ref.catalog.to_string(), - schema_name: table_ref.schema.to_string(), - table_name: table_ref.table.to_string(), - table_id: Some(api::v1::TableId { id: table_id }), - }; - - for datanode in leaders { - debug!("Dropping table {table_ref} on Datanode {datanode:?}"); - - let client = clients.get_client(&datanode).await; - let client = Database::new(table_ref.catalog, table_ref.schema, client); - let expr = expr.clone(); - joins.push(common_runtime::spawn_bg(async move { - if let Err(err) = client.drop_table(expr).await { - // TODO(weny): add tests for `TableNotFound` - if err.status_code() != StatusCode::TableNotFound { - return Err(handle_request_datanode_error(datanode)(err)); - } - } - Ok(()) - })); - } - - let _r = join_all(joins) - .await - .into_iter() - .map(|e| e.context(error::JoinSnafu).flatten()) - .collect::>>()?; - - Ok(Status::Done) - } } #[async_trait] @@ -267,7 +209,6 @@ impl Procedure for DropTableProcedure { DropTableState::Prepare => self.on_prepare().await, DropTableState::RemoveMetadata => self.on_remove_metadata().await, DropTableState::InvalidateTableCache => self.on_broadcast().await, - DropTableState::DatanodeDropTable => self.on_datanode_drop_table().await, DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await, } .map_err(handle_retry_error) @@ -343,9 +284,7 @@ enum DropTableState { RemoveMetadata, /// Invalidates Table Cache InvalidateTableCache, - /// Datanode drops the table - DatanodeDropTable, - /// Drop regions on Datanode + /// Drops regions on Datanode DatanodeDropRegions, } diff --git a/src/meta-srv/src/procedure/utils.rs b/src/meta-srv/src/procedure/utils.rs index c85c4e7639..1ebc611254 100644 --- a/src/meta-srv/src/procedure/utils.rs +++ b/src/meta-srv/src/procedure/utils.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use common_error::ext::BoxedError; use common_meta::peer::Peer; use common_procedure::error::Error as ProcedureError; use snafu::{location, Location}; @@ -35,6 +36,25 @@ pub fn handle_request_datanode_error(datanode: Peer) -> impl FnOnce(client::erro } } +pub fn handle_operate_region_error( + datanode: Peer, +) -> impl FnOnce(common_meta::error::Error) -> Error { + move |err| { + if matches!(err, common_meta::error::Error::RetryLater { .. }) { + error::RetryLaterSnafu { + reason: format!("Failed to execute operation on datanode, source: {}", err), + } + .build() + } else { + error::Error::OperateRegion { + location: location!(), + peer: datanode, + source: BoxedError::new(err), + } + } + } +} + pub fn handle_retry_error(e: Error) -> ProcedureError { if matches!(e, error::Error::RetryLater { .. }) { ProcedureError::retry_later(e) @@ -145,8 +165,10 @@ pub mod test_data { use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType}; use table::requests::TableOptions; + use crate::cache_invalidator::MetasrvCacheInvalidator; use crate::ddl::DdlContext; use crate::handler::{HeartbeatMailbox, Pushers}; + use crate::metasrv::MetasrvInfo; use crate::sequence::Sequence; use crate::service::store::kv::KvBackendAdapter; use crate::service::store::memory::MemStore; @@ -220,10 +242,16 @@ pub mod test_data { let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence); let kv_backend = KvBackendAdapter::wrap(kv_store); + let clients = Arc::new(DatanodeClients::default()); DdlContext { - datanode_clients: Arc::new(DatanodeClients::default()), - mailbox, - server_addr: "127.0.0.1:4321".to_string(), + datanode_clients: clients.clone(), + datanode_manager: clients, + cache_invalidator: Arc::new(MetasrvCacheInvalidator::new( + mailbox, + MetasrvInfo { + server_addr: "127.0.0.1:4321".to_string(), + }, + )), table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)), } }