mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-22 16:00:38 +00:00
refactor: refactor: ddl context (#2301)
* refactor: refactor: ddl context * refactor: remove unused code * chore: apply suggestions from CR
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1517,6 +1517,7 @@ dependencies = [
|
||||
"api",
|
||||
"arrow-flight",
|
||||
"async-stream",
|
||||
"async-trait",
|
||||
"common-base",
|
||||
"common-catalog",
|
||||
"common-error",
|
||||
|
||||
@@ -11,6 +11,7 @@ testing = []
|
||||
api = { workspace = true }
|
||||
arrow-flight.workspace = true
|
||||
async-stream.workspace = true
|
||||
async-trait.workspace = true
|
||||
common-base = { workspace = true }
|
||||
common-catalog = { workspace = true }
|
||||
common-error = { workspace = true }
|
||||
|
||||
@@ -13,12 +13,15 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
|
||||
use common_meta::datanode_manager::{Datanode, DatanodeManager};
|
||||
use common_meta::peer::Peer;
|
||||
use moka::future::{Cache, CacheBuilder};
|
||||
|
||||
use crate::region::RegionRequester;
|
||||
use crate::Client;
|
||||
|
||||
pub struct DatanodeClients {
|
||||
@@ -40,6 +43,15 @@ impl Debug for DatanodeClients {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl DatanodeManager for DatanodeClients {
|
||||
async fn datanode(&self, datanode: &Peer) -> Arc<dyn Datanode> {
|
||||
let client = self.get_client(datanode).await;
|
||||
|
||||
Arc::new(RegionRequester::new(client))
|
||||
}
|
||||
}
|
||||
|
||||
impl DatanodeClients {
|
||||
pub fn new(config: ChannelConfig) -> Self {
|
||||
Self {
|
||||
|
||||
@@ -14,15 +14,18 @@
|
||||
|
||||
use api::v1::region::{region_request, RegionRequest, RegionRequestHeader, RegionResponse};
|
||||
use api::v1::ResponseHeader;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_meta::datanode_manager::{AffectedRows, Datanode};
|
||||
use common_meta::error::{self as meta_error, Result as MetaResult};
|
||||
use common_telemetry::timer;
|
||||
use snafu::OptionExt;
|
||||
use snafu::{location, Location, OptionExt};
|
||||
|
||||
use crate::error::Error::FlightGet;
|
||||
use crate::error::{IllegalDatabaseResponseSnafu, Result, ServerSnafu};
|
||||
use crate::{metrics, Client};
|
||||
|
||||
type AffectedRows = u64;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct RegionRequester {
|
||||
trace_id: u64,
|
||||
@@ -30,6 +33,24 @@ pub struct RegionRequester {
|
||||
client: Client,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Datanode for RegionRequester {
|
||||
async fn handle(&self, request: region_request::Body) -> MetaResult<AffectedRows> {
|
||||
self.handle_inner(request).await.map_err(|err| {
|
||||
if matches!(err, FlightGet { .. }) {
|
||||
meta_error::Error::RetryLater {
|
||||
source: BoxedError::new(err),
|
||||
}
|
||||
} else {
|
||||
meta_error::Error::OperateRegion {
|
||||
source: BoxedError::new(err),
|
||||
location: location!(),
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl RegionRequester {
|
||||
pub fn new(client: Client) -> Self {
|
||||
// TODO(LFC): Pass in trace_id and span_id from some context when we have it.
|
||||
@@ -40,7 +61,7 @@ impl RegionRequester {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn handle(self, request: region_request::Body) -> Result<AffectedRows> {
|
||||
async fn handle_inner(&self, request: region_request::Body) -> Result<AffectedRows> {
|
||||
let request_type = request.as_ref().to_string();
|
||||
|
||||
let request = RegionRequest {
|
||||
@@ -67,6 +88,10 @@ impl RegionRequester {
|
||||
|
||||
Ok(affected_rows)
|
||||
}
|
||||
|
||||
pub async fn handle(&self, request: region_request::Body) -> Result<AffectedRows> {
|
||||
self.handle_inner(request).await
|
||||
}
|
||||
}
|
||||
|
||||
fn check_response_header(header: Option<ResponseHeader>) -> Result<()> {
|
||||
|
||||
31
src/common/meta/src/cache_invalidator.rs
Normal file
31
src/common/meta/src/cache_invalidator.rs
Normal file
@@ -0,0 +1,31 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::ident::TableIdent;
|
||||
|
||||
/// Places context of invalidating cache. e.g., span id, trace id etc.
|
||||
pub struct Context {
|
||||
pub subject: Option<String>,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait CacheInvalidator: Send + Sync {
|
||||
// Invalidates table cache
|
||||
async fn invalidate_table(&self, ctx: &Context, table_ident: TableIdent) -> Result<()>;
|
||||
}
|
||||
|
||||
pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
|
||||
38
src/common/meta/src/datanode_manager.rs
Normal file
38
src/common/meta/src/datanode_manager.rs
Normal file
@@ -0,0 +1,38 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::region::region_request;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::peer::Peer;
|
||||
|
||||
pub type AffectedRows = u64;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait Datanode: Send + Sync {
|
||||
/// Handles DML, and DDL requests.
|
||||
async fn handle(&self, request: region_request::Body) -> Result<AffectedRows>;
|
||||
}
|
||||
|
||||
pub type DatanodeRef = Arc<dyn Datanode>;
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait DatanodeManager: Send + Sync {
|
||||
/// Retrieves a target `datanode`.
|
||||
async fn datanode(&self, datanode: &Peer) -> DatanodeRef;
|
||||
}
|
||||
|
||||
pub type DatanodeManagerRef = Arc<dyn DatanodeManager>;
|
||||
@@ -150,6 +150,15 @@ pub enum Error {
|
||||
|
||||
#[snafu(display("Invalid heartbeat response, location: {}", location))]
|
||||
InvalidHeartbeatResponse { location: Location },
|
||||
|
||||
#[snafu(display("{}", source))]
|
||||
OperateRegion {
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Retry later, source: {}", source))]
|
||||
RetryLater { source: BoxedError },
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -185,8 +194,9 @@ impl ErrorExt for Error {
|
||||
| ConvertRawKey { .. }
|
||||
| DecodeProto { .. } => StatusCode::Unexpected,
|
||||
|
||||
RetryLater { source, .. } => source.status_code(),
|
||||
OperateRegion { source, .. } => source.status_code(),
|
||||
MetaSrv { source, .. } => source.status_code(),
|
||||
|
||||
InvalidCatalogValue { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
@@ -195,3 +205,17 @@ impl ErrorExt for Error {
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Error {
|
||||
/// Creates a new [Error::RetryLater] error from source `err`.
|
||||
pub fn retry_later<E: ErrorExt + Send + Sync + 'static>(err: E) -> Error {
|
||||
Error::RetryLater {
|
||||
source: BoxedError::new(err),
|
||||
}
|
||||
}
|
||||
|
||||
/// Determine whether it is a retry later type through [StatusCode]
|
||||
pub fn is_retry_later(&self) -> bool {
|
||||
matches!(self, Error::RetryLater { .. })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
#![feature(btree_extract_if)]
|
||||
|
||||
pub mod cache_invalidator;
|
||||
pub mod datanode_manager;
|
||||
pub mod error;
|
||||
pub mod heartbeat;
|
||||
// TODO(weny): Removes it
|
||||
|
||||
64
src/meta-srv/src/cache_invalidator.rs
Normal file
64
src/meta-srv/src/cache_invalidator.rs
Normal file
@@ -0,0 +1,64 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::cache_invalidator::{CacheInvalidator, Context};
|
||||
use common_meta::error::{self as meta_error, Result as MetaResult};
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::instruction::Instruction;
|
||||
use snafu::ResultExt;
|
||||
|
||||
use crate::metasrv::MetasrvInfo;
|
||||
use crate::service::mailbox::{BroadcastChannel, MailboxRef};
|
||||
|
||||
const DEFAULT_SUBJECT: &str = "Invalidate table";
|
||||
|
||||
pub struct MetasrvCacheInvalidator {
|
||||
mailbox: MailboxRef,
|
||||
// Metasrv infos
|
||||
info: MetasrvInfo,
|
||||
}
|
||||
|
||||
impl MetasrvCacheInvalidator {
|
||||
pub fn new(mailbox: MailboxRef, info: MetasrvInfo) -> Self {
|
||||
Self { mailbox, info }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl CacheInvalidator for MetasrvCacheInvalidator {
|
||||
async fn invalidate_table(&self, ctx: &Context, table_ident: TableIdent) -> MetaResult<()> {
|
||||
let instruction = Instruction::InvalidateTableCache(table_ident);
|
||||
let subject = &ctx
|
||||
.subject
|
||||
.clone()
|
||||
.unwrap_or_else(|| DEFAULT_SUBJECT.to_string());
|
||||
|
||||
let msg = &MailboxMessage::json_message(
|
||||
subject,
|
||||
&format!("Metasrv@{}", self.info.server_addr),
|
||||
"Frontend broadcast",
|
||||
common_time::util::current_time_millis(),
|
||||
&instruction,
|
||||
)
|
||||
.with_context(|_| meta_error::SerdeJsonSnafu)?;
|
||||
|
||||
self.mailbox
|
||||
.broadcast(&BroadcastChannel::Frontend, msg)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::MetaSrvSnafu)
|
||||
}
|
||||
}
|
||||
@@ -15,6 +15,8 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use client::client_manager::DatanodeClients;
|
||||
use common_meta::cache_invalidator::CacheInvalidatorRef;
|
||||
use common_meta::datanode_manager::DatanodeManagerRef;
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
use common_meta::key::TableMetadataManagerRef;
|
||||
@@ -32,23 +34,24 @@ use crate::error::{
|
||||
use crate::procedure::alter_table::AlterTableProcedure;
|
||||
use crate::procedure::create_table::CreateTableProcedure;
|
||||
use crate::procedure::drop_table::DropTableProcedure;
|
||||
use crate::service::mailbox::MailboxRef;
|
||||
|
||||
pub type DdlManagerRef = Arc<DdlManager>;
|
||||
|
||||
pub struct DdlManager {
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
datanode_clients: Arc<DatanodeClients>,
|
||||
pub(crate) mailbox: MailboxRef,
|
||||
pub(crate) server_addr: String,
|
||||
|
||||
pub(crate) cache_invalidator: CacheInvalidatorRef,
|
||||
pub(crate) table_metadata_manager: TableMetadataManagerRef,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct DdlContext {
|
||||
// TODO(weny): removes it
|
||||
pub(crate) datanode_clients: Arc<DatanodeClients>,
|
||||
pub(crate) mailbox: MailboxRef,
|
||||
pub(crate) server_addr: String,
|
||||
|
||||
pub(crate) datanode_manager: DatanodeManagerRef,
|
||||
pub(crate) cache_invalidator: CacheInvalidatorRef,
|
||||
pub(crate) table_metadata_manager: TableMetadataManagerRef,
|
||||
}
|
||||
|
||||
@@ -56,24 +59,23 @@ impl DdlManager {
|
||||
pub(crate) fn new(
|
||||
procedure_manager: ProcedureManagerRef,
|
||||
datanode_clients: Arc<DatanodeClients>,
|
||||
mailbox: MailboxRef,
|
||||
server_addr: String,
|
||||
cache_invalidator: CacheInvalidatorRef,
|
||||
table_metadata_manager: TableMetadataManagerRef,
|
||||
) -> Self {
|
||||
Self {
|
||||
procedure_manager,
|
||||
datanode_clients,
|
||||
mailbox,
|
||||
server_addr,
|
||||
cache_invalidator,
|
||||
table_metadata_manager,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn create_context(&self) -> DdlContext {
|
||||
DdlContext {
|
||||
datanode_manager: self.datanode_clients.clone(),
|
||||
datanode_clients: self.datanode_clients.clone(),
|
||||
mailbox: self.mailbox.clone(),
|
||||
server_addr: self.server_addr.clone(),
|
||||
cache_invalidator: self.cache_invalidator.clone(),
|
||||
|
||||
table_metadata_manager: self.table_metadata_manager.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,19 @@ use crate::pubsub::Message;
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Failed to invalidate table cache: {}", source))]
|
||||
InvalidateTableCache {
|
||||
location: Location,
|
||||
source: common_meta::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to operate region on peer:{}, source: {}", peer, source))]
|
||||
OperateRegion {
|
||||
location: Location,
|
||||
peer: Peer,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to list catalogs: {}", source))]
|
||||
ListCatalogs {
|
||||
location: Location,
|
||||
@@ -595,6 +608,7 @@ impl ErrorExt for Error {
|
||||
| Error::ConvertRawTableInfo { .. }
|
||||
| Error::BuildTableMeta { .. } => StatusCode::Unexpected,
|
||||
Error::TableNotFound { .. } => StatusCode::TableNotFound,
|
||||
Error::InvalidateTableCache { source, .. } => source.status_code(),
|
||||
Error::Table { source, .. } => source.status_code(),
|
||||
Error::RequestDatanode { source, .. } => source.status_code(),
|
||||
Error::InvalidCatalogValue { source, .. } => source.status_code(),
|
||||
@@ -612,6 +626,7 @@ impl ErrorExt for Error {
|
||||
Error::RegionFailoverCandidatesNotFound { .. } => StatusCode::RuntimeResourcesExhausted,
|
||||
|
||||
Error::RegisterProcedureLoader { source, .. } => source.status_code(),
|
||||
Error::OperateRegion { source, .. } => source.status_code(),
|
||||
|
||||
Error::TableRouteConversion { source, .. }
|
||||
| Error::ConvertProtoData { source, .. }
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
#![feature(result_flattening)]
|
||||
|
||||
pub mod bootstrap;
|
||||
mod cache_invalidator;
|
||||
pub mod cluster;
|
||||
pub mod ddl;
|
||||
pub mod election;
|
||||
|
||||
@@ -97,6 +97,10 @@ impl MetaSrvOptions {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct MetasrvInfo {
|
||||
pub server_addr: String,
|
||||
}
|
||||
|
||||
// Options for datanode.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
pub struct DatanodeOptions {
|
||||
|
||||
@@ -22,6 +22,7 @@ use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
|
||||
use common_procedure::local::{LocalManager, ManagerConfig};
|
||||
use common_procedure::ProcedureManagerRef;
|
||||
|
||||
use crate::cache_invalidator::MetasrvCacheInvalidator;
|
||||
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
|
||||
use crate::ddl::{DdlManager, DdlManagerRef};
|
||||
use crate::error::Result;
|
||||
@@ -38,7 +39,7 @@ use crate::lock::memory::MemLock;
|
||||
use crate::lock::DistLockRef;
|
||||
use crate::metadata_service::{DefaultMetadataService, MetadataServiceRef};
|
||||
use crate::metasrv::{
|
||||
ElectionRef, MetaSrv, MetaSrvOptions, SelectorContext, SelectorRef, TABLE_ID_SEQ,
|
||||
ElectionRef, MetaSrv, MetaSrvOptions, MetasrvInfo, SelectorContext, SelectorRef, TABLE_ID_SEQ,
|
||||
};
|
||||
use crate::procedure::region_failover::RegionFailoverManager;
|
||||
use crate::procedure::state_store::MetaStateStore;
|
||||
@@ -329,12 +330,17 @@ fn build_ddl_manager(
|
||||
.tcp_nodelay(options.datanode.client_options.tcp_nodelay);
|
||||
Arc::new(DatanodeClients::new(datanode_client_channel_config))
|
||||
});
|
||||
let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
|
||||
mailbox.clone(),
|
||||
MetasrvInfo {
|
||||
server_addr: options.server_addr.clone(),
|
||||
},
|
||||
));
|
||||
// TODO(weny): considers to modify the default config of procedure manager
|
||||
Arc::new(DdlManager::new(
|
||||
procedure_manager.clone(),
|
||||
datanode_clients,
|
||||
mailbox.clone(),
|
||||
options.server_addr.clone(),
|
||||
cache_invalidator,
|
||||
table_metadata_manager.clone(),
|
||||
))
|
||||
}
|
||||
|
||||
@@ -14,11 +14,10 @@
|
||||
|
||||
use std::vec;
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use async_trait::async_trait;
|
||||
use client::Database;
|
||||
use common_meta::cache_invalidator::Context;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::instruction::Instruction;
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
@@ -40,7 +39,6 @@ use table::requests::{AlterKind, AlterTableRequest};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result, TableMetadataManagerSnafu};
|
||||
use crate::procedure::utils::handle_request_datanode_error;
|
||||
use crate::service::mailbox::BroadcastChannel;
|
||||
|
||||
// TODO(weny): removes in following PRs.
|
||||
#[allow(dead_code)]
|
||||
@@ -265,23 +263,18 @@ impl AlterTableProcedure {
|
||||
table_id: self.data.table_id(),
|
||||
engine: self.data.table_info().meta.engine.to_string(),
|
||||
};
|
||||
let instruction = Instruction::InvalidateTableCache(table_ident);
|
||||
|
||||
let msg = &MailboxMessage::json_message(
|
||||
"Invalidate table cache by alter table procedure",
|
||||
&format!("Metasrv@{}", self.context.server_addr),
|
||||
"Frontend broadcast",
|
||||
common_time::util::current_time_millis(),
|
||||
&instruction,
|
||||
)
|
||||
.with_context(|_| error::SerializeToJsonSnafu {
|
||||
input: instruction.to_string(),
|
||||
})?;
|
||||
|
||||
self.context
|
||||
.mailbox
|
||||
.broadcast(&BroadcastChannel::Frontend, msg)
|
||||
.await?;
|
||||
.cache_invalidator
|
||||
.invalidate_table(
|
||||
&Context {
|
||||
subject: Some("Invalidate table cache by alter table procedure".to_string()),
|
||||
},
|
||||
table_ident,
|
||||
)
|
||||
.await
|
||||
.context(error::InvalidateTableCacheSnafu)?;
|
||||
|
||||
self.data.state = AlterTableState::DatanodeAlterTable;
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
|
||||
@@ -16,15 +16,9 @@ use api::v1::region::region_request::Body as PbRegionRequest;
|
||||
use api::v1::region::{ColumnDef, CreateRequest as PbCreateRegionRequest};
|
||||
use api::v1::SemanticType;
|
||||
use async_trait::async_trait;
|
||||
use client::region::RegionRequester;
|
||||
use client::Database;
|
||||
use common_catalog::consts::MITO2_ENGINE;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::rpc::ddl::CreateTableTask;
|
||||
use common_meta::rpc::router::{find_leader_regions, find_leaders, RegionRoute};
|
||||
use common_meta::table_name::TableName;
|
||||
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
|
||||
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
|
||||
use common_telemetry::info;
|
||||
@@ -36,10 +30,10 @@ use strum::AsRefStr;
|
||||
use table::engine::TableReference;
|
||||
use table::metadata::{RawTableInfo, TableId};
|
||||
|
||||
use super::utils::{handle_request_datanode_error, handle_retry_error};
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::error::{self, PrimaryKeyNotFoundSnafu, Result, TableMetadataManagerSnafu};
|
||||
use crate::metrics;
|
||||
use crate::procedure::utils::{handle_operate_region_error, handle_retry_error};
|
||||
|
||||
pub struct CreateTableProcedure {
|
||||
context: DdlContext,
|
||||
@@ -69,10 +63,6 @@ impl CreateTableProcedure {
|
||||
})
|
||||
}
|
||||
|
||||
fn table_name(&self) -> TableName {
|
||||
self.creator.data.task.table_name()
|
||||
}
|
||||
|
||||
pub fn table_info(&self) -> &RawTableInfo {
|
||||
&self.creator.data.task.table_info
|
||||
}
|
||||
@@ -111,11 +101,7 @@ impl CreateTableProcedure {
|
||||
return Ok(Status::Done);
|
||||
}
|
||||
|
||||
self.creator.data.state = if expr.engine == MITO2_ENGINE {
|
||||
CreateTableState::DatanodeCreateRegions
|
||||
} else {
|
||||
CreateTableState::DatanodeCreateTable
|
||||
};
|
||||
self.creator.data.state = CreateTableState::DatanodeCreateRegions;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
@@ -190,7 +176,7 @@ impl CreateTableProcedure {
|
||||
let mut create_region_tasks = Vec::with_capacity(leaders.len());
|
||||
|
||||
for datanode in leaders {
|
||||
let clients = self.context.datanode_clients.clone();
|
||||
let manager = self.context.datanode_manager.clone();
|
||||
|
||||
let regions = find_leader_regions(region_routes, &datanode);
|
||||
let requests = regions
|
||||
@@ -209,11 +195,10 @@ impl CreateTableProcedure {
|
||||
|
||||
create_region_tasks.push(async move {
|
||||
for request in requests {
|
||||
let client = clients.get_client(&datanode).await;
|
||||
let requester = RegionRequester::new(client);
|
||||
let requester = manager.datanode(&datanode).await;
|
||||
|
||||
if let Err(err) = requester.handle(request).await {
|
||||
return Err(handle_request_datanode_error(datanode)(err));
|
||||
return Err(handle_operate_region_error(datanode)(err));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -244,44 +229,6 @@ impl CreateTableProcedure {
|
||||
|
||||
Ok(Status::Done)
|
||||
}
|
||||
|
||||
async fn on_datanode_create_table(&mut self) -> Result<Status> {
|
||||
let region_routes = &self.creator.data.region_routes;
|
||||
let table_name = self.table_name();
|
||||
let clients = self.context.datanode_clients.clone();
|
||||
let leaders = find_leaders(region_routes);
|
||||
let mut joins = Vec::with_capacity(leaders.len());
|
||||
let table_id = self.table_id();
|
||||
|
||||
for datanode in leaders {
|
||||
let client = clients.get_client(&datanode).await;
|
||||
let client = Database::new(&table_name.catalog_name, &table_name.schema_name, client);
|
||||
|
||||
let regions = find_leader_regions(region_routes, &datanode);
|
||||
let mut create_expr_for_region = self.creator.data.task.create_table.clone();
|
||||
create_expr_for_region.region_numbers = regions;
|
||||
create_expr_for_region.table_id = Some(api::v1::TableId { id: table_id });
|
||||
|
||||
joins.push(common_runtime::spawn_bg(async move {
|
||||
if let Err(err) = client.create(create_expr_for_region).await {
|
||||
if err.status_code() != StatusCode::TableAlreadyExists {
|
||||
return Err(handle_request_datanode_error(datanode)(err));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}));
|
||||
}
|
||||
|
||||
let _r = join_all(joins)
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|e| e.context(error::JoinSnafu).flatten())
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
self.creator.data.state = CreateTableState::CreateMetadata;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -300,7 +247,6 @@ impl Procedure for CreateTableProcedure {
|
||||
|
||||
match state {
|
||||
CreateTableState::Prepare => self.on_prepare().await,
|
||||
CreateTableState::DatanodeCreateTable => self.on_datanode_create_table().await,
|
||||
CreateTableState::DatanodeCreateRegions => self.on_datanode_create_regions().await,
|
||||
CreateTableState::CreateMetadata => self.on_create_metadata().await,
|
||||
}
|
||||
@@ -344,9 +290,7 @@ impl TableCreator {
|
||||
enum CreateTableState {
|
||||
/// Prepares to create the table
|
||||
Prepare,
|
||||
/// Datanode creates the table
|
||||
DatanodeCreateTable,
|
||||
/// Create regions on the Datanode
|
||||
/// Creates regions on the Datanode
|
||||
DatanodeCreateRegions,
|
||||
/// Creates metadata
|
||||
CreateMetadata,
|
||||
@@ -372,6 +316,7 @@ mod test {
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use api::v1::{ColumnDataType, ColumnDef as PbColumnDef, CreateTableExpr};
|
||||
use common_catalog::consts::MITO2_ENGINE;
|
||||
|
||||
use super::*;
|
||||
use crate::procedure::utils::mock::EchoRegionServer;
|
||||
|
||||
@@ -12,17 +12,13 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::meta::MailboxMessage;
|
||||
use api::v1::region::{region_request, DropRequest as PbDropRegionRequest};
|
||||
use api::v1::DropTableExpr;
|
||||
use async_trait::async_trait;
|
||||
use client::region::RegionRequester;
|
||||
use client::Database;
|
||||
use common_catalog::consts::MITO2_ENGINE;
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_meta::cache_invalidator::Context;
|
||||
use common_meta::ident::TableIdent;
|
||||
use common_meta::instruction::Instruction;
|
||||
use common_meta::key::table_info::TableInfoValue;
|
||||
use common_meta::key::table_name::TableNameKey;
|
||||
use common_meta::key::table_route::TableRouteValue;
|
||||
@@ -47,7 +43,7 @@ use crate::ddl::DdlContext;
|
||||
use crate::error::{self, Result, TableMetadataManagerSnafu};
|
||||
use crate::metrics;
|
||||
use crate::procedure::utils::handle_request_datanode_error;
|
||||
use crate::service::mailbox::BroadcastChannel;
|
||||
|
||||
pub struct DropTableProcedure {
|
||||
context: DdlContext,
|
||||
data: DropTableData,
|
||||
@@ -132,29 +128,19 @@ impl DropTableProcedure {
|
||||
table_id: self.data.task.table_id,
|
||||
engine: engine.to_string(),
|
||||
};
|
||||
let instruction = Instruction::InvalidateTableCache(table_ident);
|
||||
|
||||
let msg = &MailboxMessage::json_message(
|
||||
"Invalidate Table Cache by dropping table procedure",
|
||||
&format!("Metasrv@{}", self.context.server_addr),
|
||||
"Frontend broadcast",
|
||||
common_time::util::current_time_millis(),
|
||||
&instruction,
|
||||
)
|
||||
.with_context(|_| error::SerializeToJsonSnafu {
|
||||
input: instruction.to_string(),
|
||||
})?;
|
||||
|
||||
self.context
|
||||
.mailbox
|
||||
.broadcast(&BroadcastChannel::Frontend, msg)
|
||||
.await?;
|
||||
.cache_invalidator
|
||||
.invalidate_table(
|
||||
&Context {
|
||||
subject: Some("Invalidate Table Cache by dropping table procedure".to_string()),
|
||||
},
|
||||
table_ident,
|
||||
)
|
||||
.await
|
||||
.context(error::InvalidateTableCacheSnafu)?;
|
||||
|
||||
self.data.state = if engine == MITO2_ENGINE {
|
||||
DropTableState::DatanodeDropRegions
|
||||
} else {
|
||||
DropTableState::DatanodeDropTable
|
||||
};
|
||||
self.data.state = DropTableState::DatanodeDropRegions;
|
||||
|
||||
Ok(Status::executing(true))
|
||||
}
|
||||
@@ -203,50 +189,6 @@ impl DropTableProcedure {
|
||||
|
||||
Ok(Status::Done)
|
||||
}
|
||||
|
||||
/// Executes drop table instruction on datanode.
|
||||
async fn on_datanode_drop_table(&mut self) -> Result<Status> {
|
||||
let region_routes = &self.data.region_routes();
|
||||
|
||||
let table_ref = self.data.table_ref();
|
||||
let table_id = self.data.task.table_id;
|
||||
|
||||
let clients = self.context.datanode_clients.clone();
|
||||
let leaders = find_leaders(region_routes);
|
||||
let mut joins = Vec::with_capacity(leaders.len());
|
||||
|
||||
let expr = DropTableExpr {
|
||||
catalog_name: table_ref.catalog.to_string(),
|
||||
schema_name: table_ref.schema.to_string(),
|
||||
table_name: table_ref.table.to_string(),
|
||||
table_id: Some(api::v1::TableId { id: table_id }),
|
||||
};
|
||||
|
||||
for datanode in leaders {
|
||||
debug!("Dropping table {table_ref} on Datanode {datanode:?}");
|
||||
|
||||
let client = clients.get_client(&datanode).await;
|
||||
let client = Database::new(table_ref.catalog, table_ref.schema, client);
|
||||
let expr = expr.clone();
|
||||
joins.push(common_runtime::spawn_bg(async move {
|
||||
if let Err(err) = client.drop_table(expr).await {
|
||||
// TODO(weny): add tests for `TableNotFound`
|
||||
if err.status_code() != StatusCode::TableNotFound {
|
||||
return Err(handle_request_datanode_error(datanode)(err));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}));
|
||||
}
|
||||
|
||||
let _r = join_all(joins)
|
||||
.await
|
||||
.into_iter()
|
||||
.map(|e| e.context(error::JoinSnafu).flatten())
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
Ok(Status::Done)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -267,7 +209,6 @@ impl Procedure for DropTableProcedure {
|
||||
DropTableState::Prepare => self.on_prepare().await,
|
||||
DropTableState::RemoveMetadata => self.on_remove_metadata().await,
|
||||
DropTableState::InvalidateTableCache => self.on_broadcast().await,
|
||||
DropTableState::DatanodeDropTable => self.on_datanode_drop_table().await,
|
||||
DropTableState::DatanodeDropRegions => self.on_datanode_drop_regions().await,
|
||||
}
|
||||
.map_err(handle_retry_error)
|
||||
@@ -343,9 +284,7 @@ enum DropTableState {
|
||||
RemoveMetadata,
|
||||
/// Invalidates Table Cache
|
||||
InvalidateTableCache,
|
||||
/// Datanode drops the table
|
||||
DatanodeDropTable,
|
||||
/// Drop regions on Datanode
|
||||
/// Drops regions on Datanode
|
||||
DatanodeDropRegions,
|
||||
}
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::peer::Peer;
|
||||
use common_procedure::error::Error as ProcedureError;
|
||||
use snafu::{location, Location};
|
||||
@@ -35,6 +36,25 @@ pub fn handle_request_datanode_error(datanode: Peer) -> impl FnOnce(client::erro
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_operate_region_error(
|
||||
datanode: Peer,
|
||||
) -> impl FnOnce(common_meta::error::Error) -> Error {
|
||||
move |err| {
|
||||
if matches!(err, common_meta::error::Error::RetryLater { .. }) {
|
||||
error::RetryLaterSnafu {
|
||||
reason: format!("Failed to execute operation on datanode, source: {}", err),
|
||||
}
|
||||
.build()
|
||||
} else {
|
||||
error::Error::OperateRegion {
|
||||
location: location!(),
|
||||
peer: datanode,
|
||||
source: BoxedError::new(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_retry_error(e: Error) -> ProcedureError {
|
||||
if matches!(e, error::Error::RetryLater { .. }) {
|
||||
ProcedureError::retry_later(e)
|
||||
@@ -145,8 +165,10 @@ pub mod test_data {
|
||||
use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
|
||||
use table::requests::TableOptions;
|
||||
|
||||
use crate::cache_invalidator::MetasrvCacheInvalidator;
|
||||
use crate::ddl::DdlContext;
|
||||
use crate::handler::{HeartbeatMailbox, Pushers};
|
||||
use crate::metasrv::MetasrvInfo;
|
||||
use crate::sequence::Sequence;
|
||||
use crate::service::store::kv::KvBackendAdapter;
|
||||
use crate::service::store::memory::MemStore;
|
||||
@@ -220,10 +242,16 @@ pub mod test_data {
|
||||
let mailbox = HeartbeatMailbox::create(Pushers::default(), mailbox_sequence);
|
||||
|
||||
let kv_backend = KvBackendAdapter::wrap(kv_store);
|
||||
let clients = Arc::new(DatanodeClients::default());
|
||||
DdlContext {
|
||||
datanode_clients: Arc::new(DatanodeClients::default()),
|
||||
mailbox,
|
||||
server_addr: "127.0.0.1:4321".to_string(),
|
||||
datanode_clients: clients.clone(),
|
||||
datanode_manager: clients,
|
||||
cache_invalidator: Arc::new(MetasrvCacheInvalidator::new(
|
||||
mailbox,
|
||||
MetasrvInfo {
|
||||
server_addr: "127.0.0.1:4321".to_string(),
|
||||
},
|
||||
)),
|
||||
table_metadata_manager: Arc::new(TableMetadataManager::new(kv_backend)),
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user