fix: fix procedure loaders not found issue (#2824)

This commit is contained in:
Weny Xu
2023-11-27 19:50:28 +09:00
committed by GitHub
parent 6100cb335a
commit 7547e7ebdf
9 changed files with 150 additions and 27 deletions

View File

@@ -42,5 +42,6 @@ tonic.workspace = true
[dev-dependencies]
chrono.workspace = true
common-procedure = { workspace = true, features = ["testing"] }
datatypes.workspace = true
hyper = { version = "0.14", features = ["full"] }

View File

@@ -43,9 +43,9 @@ use crate::rpc::ddl::{
TruncateTableTask,
};
use crate::rpc::router::RegionRoute;
pub type DdlManagerRef = Arc<DdlManager>;
/// The [DdlManager] provides the ability to execute Ddl.
pub struct DdlManager {
procedure_manager: ProcedureManagerRef,
datanode_manager: DatanodeManagerRef,
@@ -55,26 +55,31 @@ pub struct DdlManager {
}
impl DdlManager {
pub fn new(
/// Returns a new [DdlManager] with all Ddl [BoxedProcedureLoader](common_procedure::procedure::BoxedProcedureLoader)s registered.
pub fn try_new(
procedure_manager: ProcedureManagerRef,
datanode_clients: DatanodeManagerRef,
cache_invalidator: CacheInvalidatorRef,
table_metadata_manager: TableMetadataManagerRef,
table_meta_allocator: TableMetadataAllocatorRef,
) -> Self {
Self {
) -> Result<Self> {
let manager = Self {
procedure_manager,
datanode_manager: datanode_clients,
cache_invalidator,
table_metadata_manager,
table_meta_allocator,
}
};
manager.register_loaders()?;
Ok(manager)
}
/// Returns the [TableMetadataManagerRef].
pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
&self.table_metadata_manager
}
/// Returns the [DdlContext]
pub fn create_context(&self) -> DdlContext {
DdlContext {
datanode_manager: self.datanode_manager.clone(),
@@ -83,7 +88,7 @@ impl DdlManager {
}
}
pub fn try_start(&self) -> Result<()> {
fn register_loaders(&self) -> Result<()> {
let context = self.create_context();
self.procedure_manager
@@ -142,6 +147,7 @@ impl DdlManager {
}
#[tracing::instrument(skip_all)]
/// Submits and executes an alter table task.
pub async fn submit_alter_table_task(
&self,
cluster_id: u64,
@@ -159,6 +165,7 @@ impl DdlManager {
}
#[tracing::instrument(skip_all)]
/// Submits and executes a create table task.
pub async fn submit_create_table_task(
&self,
cluster_id: u64,
@@ -176,6 +183,7 @@ impl DdlManager {
}
#[tracing::instrument(skip_all)]
/// Submits and executes a drop table task.
pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
@@ -199,6 +207,7 @@ impl DdlManager {
}
#[tracing::instrument(skip_all)]
/// Submits and executes a truncate table task.
pub async fn submit_truncate_table_task(
&self,
cluster_id: u64,
@@ -416,3 +425,80 @@ impl DdlTaskExecutor for DdlManager {
.await
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::meta::Partition;
use common_procedure::local::LocalManager;
use table::metadata::{RawTableInfo, TableId};
use super::DdlManager;
use crate::cache_invalidator::DummyCacheInvalidator;
use crate::datanode_manager::{DatanodeManager, DatanodeRef};
use crate::ddl::alter_table::AlterTableProcedure;
use crate::ddl::create_table::CreateTableProcedure;
use crate::ddl::drop_table::DropTableProcedure;
use crate::ddl::truncate_table::TruncateTableProcedure;
use crate::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
use crate::error::Result;
use crate::key::TableMetadataManager;
use crate::kv_backend::memory::MemoryKvBackend;
use crate::peer::Peer;
use crate::rpc::router::RegionRoute;
use crate::state_store::KvStateStore;
/// A dummy implemented [DatanodeManager].
pub struct DummyDatanodeManager;
#[async_trait::async_trait]
impl DatanodeManager for DummyDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
unimplemented!()
}
}
/// A dummy implemented [TableMetadataAllocator].
pub struct DummyTableMetadataAllocator;
#[async_trait::async_trait]
impl TableMetadataAllocator for DummyTableMetadataAllocator {
async fn create(
&self,
_ctx: &TableMetadataAllocatorContext,
_table_info: &mut RawTableInfo,
_partitions: &[Partition],
) -> Result<(TableId, Vec<RegionRoute>)> {
unimplemented!()
}
}
#[test]
fn test_try_new() {
let kv_backend = Arc::new(MemoryKvBackend::new());
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let state_store = Arc::new(KvStateStore::new(kv_backend));
let procedure_manager = Arc::new(LocalManager::new(Default::default(), state_store));
let _ = DdlManager::try_new(
procedure_manager.clone(),
Arc::new(DummyDatanodeManager),
Arc::new(DummyCacheInvalidator),
table_metadata_manager,
Arc::new(DummyTableMetadataAllocator),
);
let expected_loaders = vec![
CreateTableProcedure::TYPE_NAME,
AlterTableProcedure::TYPE_NAME,
DropTableProcedure::TYPE_NAME,
TruncateTableProcedure::TYPE_NAME,
];
for loader in expected_loaders {
assert!(procedure_manager.contains_loader(loader));
}
}
}

View File

@@ -4,6 +4,10 @@ version.workspace = true
edition.workspace = true
license.workspace = true
[features]
testing=[]
[dependencies]
async-stream.workspace = true
async-trait.workspace = true

View File

@@ -527,6 +527,13 @@ impl LocalManager {
Ok(())
}
#[cfg(any(test, feature = "testing"))]
/// Returns true if contains a specified loader.
pub fn contains_loader(&self, name: &str) -> bool {
let loaders = self.manager_ctx.loaders.lock().unwrap();
loaders.contains_key(name)
}
}
#[async_trait]

View File

@@ -26,6 +26,12 @@ use store_api::storage::RegionNumber;
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Failed to init ddl manager"))]
InitDdlManager {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to invalidate table cache"))]
InvalidateTableCache {
location: Location,
@@ -319,7 +325,9 @@ impl ErrorExt for Error {
Error::ParseSql { source, .. } => source.status_code(),
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::InvalidateTableCache { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}
Error::Table { source, .. }
| Error::CopyTable { source, .. }

View File

@@ -328,13 +328,16 @@ impl Instance {
let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
let cache_invalidator = Arc::new(DummyCacheInvalidator);
let ddl_executor = Arc::new(DdlManager::new(
procedure_manager,
datanode_manager,
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
));
let ddl_executor = Arc::new(
DdlManager::try_new(
procedure_manager,
datanode_manager,
cache_invalidator.clone(),
table_metadata_manager.clone(),
Arc::new(StandaloneTableMetadataCreator::new(kv_backend.clone())),
)
.context(error::InitDdlManagerSnafu)?,
);
let statement_executor = Arc::new(StatementExecutor::new(
catalog_manager.clone(),

View File

@@ -43,6 +43,12 @@ pub enum Error {
region_id: RegionId,
},
#[snafu(display("Failed to init ddl manager"))]
InitDdlManager {
location: Location,
source: common_meta::error::Error,
},
#[snafu(display("Failed to create default catalog and schema"))]
InitMetadata {
location: Location,
@@ -685,7 +691,9 @@ impl ErrorExt for Error {
| Error::UpdateTableRoute { source, .. }
| Error::GetFullTableInfo { source, .. } => source.status_code(),
Error::InitMetadata { source, .. } => source.status_code(),
Error::InitMetadata { source, .. } | Error::InitDdlManager { source, .. } => {
source.status_code()
}
Error::Other { source, .. } => source.status_code(),
}

View File

@@ -28,10 +28,11 @@ use common_meta::sequence::{Sequence, SequenceRef};
use common_meta::state_store::KvStateStore;
use common_procedure::local::{LocalManager, ManagerConfig};
use common_procedure::ProcedureManagerRef;
use snafu::ResultExt;
use crate::cache_invalidator::MetasrvCacheInvalidator;
use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
use crate::error::Result;
use crate::error::{self, Result};
use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
use crate::handler::check_leader_handler::CheckLeaderHandler;
use crate::handler::collect_stats_handler::CollectStatsHandler;
@@ -196,8 +197,7 @@ impl MetaSrvBuilder {
&table_metadata_manager,
(&selector, &selector_ctx),
&table_id_sequence,
);
let _ = ddl_manager.try_start();
)?;
let opening_region_keeper = Arc::new(OpeningRegionKeeper::default());
let handler_group = match handler_group {
@@ -330,7 +330,7 @@ fn build_ddl_manager(
table_metadata_manager: &TableMetadataManagerRef,
(selector, selector_ctx): (&SelectorRef, &SelectorContext),
table_id_sequence: &SequenceRef,
) -> DdlManagerRef {
) -> Result<DdlManagerRef> {
let datanode_clients = datanode_clients.unwrap_or_else(|| {
let datanode_client_channel_config = ChannelConfig::new()
.timeout(Duration::from_millis(
@@ -355,12 +355,15 @@ fn build_ddl_manager(
table_id_sequence.clone(),
));
Arc::new(DdlManager::new(
procedure_manager.clone(),
datanode_clients,
cache_invalidator,
table_metadata_manager.clone(),
table_meta_allocator,
Ok(Arc::new(
DdlManager::try_new(
procedure_manager.clone(),
datanode_clients,
cache_invalidator,
table_metadata_manager.clone(),
table_meta_allocator,
)
.context(error::InitDdlManagerSnafu)?,
))
}

View File

@@ -99,10 +99,10 @@ impl GreptimeDbStandaloneBuilder {
.init()
.await
.unwrap();
procedure_manager.start().await.unwrap();
let instance = Instance::try_new_standalone(
kv_backend,
procedure_manager,
procedure_manager.clone(),
catalog_manager,
plugins,
datanode.region_server(),
@@ -110,6 +110,9 @@ impl GreptimeDbStandaloneBuilder {
.await
.unwrap();
// Ensures all loaders are registered.
procedure_manager.start().await.unwrap();
test_util::prepare_another_catalog_and_schema(&instance).await;
instance.start().await.unwrap();