test: add plugins to datanode initialization

This commit is contained in:
shuiyisong
2023-06-12 17:32:49 +08:00
parent 91456daf99
commit b39682075e
4 changed files with 25 additions and 6 deletions

View File

@@ -382,7 +382,7 @@ pub struct Datanode {
impl Datanode {
pub async fn new(opts: DatanodeOptions) -> Result<Datanode> {
let instance = Arc::new(Instance::with_opts(&opts).await?);
let instance = Arc::new(Instance::with_opts(&opts, Default::default()).await?);
let services = match opts.mode {
Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?),
Mode::Standalone => None,

View File

@@ -22,6 +22,7 @@ use catalog::remote::region_alive_keeper::RegionAliveKeepers;
use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager};
use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest};
use common_base::paths::{CLUSTER_DIR, WAL_DIR};
use common_base::Plugins;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_error::prelude::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -85,7 +86,7 @@ pub struct Instance {
pub type InstanceRef = Arc<Instance>;
impl Instance {
pub async fn with_opts(opts: &DatanodeOptions) -> Result<Self> {
pub async fn with_opts(opts: &DatanodeOptions, plugins: Arc<Plugins>) -> Result<Self> {
let meta_client = match opts.mode {
Mode::Standalone => None,
Mode::Distributed => {
@@ -102,13 +103,14 @@ impl Instance {
let compaction_scheduler = create_compaction_scheduler(opts);
Self::new(opts, meta_client, compaction_scheduler).await
Self::new(opts, meta_client, compaction_scheduler, plugins).await
}
pub(crate) async fn new(
opts: &DatanodeOptions,
meta_client: Option<Arc<MetaClient>>,
compaction_scheduler: CompactionSchedulerRef<RaftEngineLogStore>,
plugins: Arc<Plugins>,
) -> Result<Self> {
let object_store = store::new_object_store(&opts.storage.store).await?;
let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?);
@@ -234,7 +236,14 @@ impl Instance {
}
};
let factory = QueryEngineFactory::new(catalog_manager.clone(), false);
// let factory = QueryEngineFactory::new(catalog_manager.clone(), false);
let factory = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
false,
None,
None,
plugins,
);
let query_engine = factory.query_engine();
let procedure_manager =

View File

@@ -32,7 +32,13 @@ impl Instance {
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let compaction_scheduler = Arc::new(NoopCompactionScheduler::default());
Instance::new(opts, Some(meta_client), compaction_scheduler).await
Instance::new(
opts,
Some(meta_client),
compaction_scheduler,
Default::default(),
)
.await
}
}

View File

@@ -65,7 +65,11 @@ impl MockStandaloneInstance {
pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance {
let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name);
let dn_instance = Arc::new(DatanodeInstance::with_opts(&opts).await.unwrap());
let dn_instance = Arc::new(
DatanodeInstance::with_opts(&opts, Default::default())
.await
.unwrap(),
);
let frontend_instance = Instance::try_new_standalone(dn_instance.clone())
.await