diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 54d777d3f6..7d3fc30972 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -382,7 +382,7 @@ pub struct Datanode { impl Datanode { pub async fn new(opts: DatanodeOptions) -> Result { - let instance = Arc::new(Instance::with_opts(&opts).await?); + let instance = Arc::new(Instance::with_opts(&opts, Default::default()).await?); let services = match opts.mode { Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?), Mode::Standalone => None, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 5e5a63006f..884ca31e47 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -22,6 +22,7 @@ use catalog::remote::region_alive_keeper::RegionAliveKeepers; use catalog::remote::{CachedMetaKvBackend, RemoteCatalogManager}; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; use common_base::paths::{CLUSTER_DIR, WAL_DIR}; +use common_base::Plugins; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -85,7 +86,7 @@ pub struct Instance { pub type InstanceRef = Arc; impl Instance { - pub async fn with_opts(opts: &DatanodeOptions) -> Result { + pub async fn with_opts(opts: &DatanodeOptions, plugins: Arc) -> Result { let meta_client = match opts.mode { Mode::Standalone => None, Mode::Distributed => { @@ -102,13 +103,14 @@ impl Instance { let compaction_scheduler = create_compaction_scheduler(opts); - Self::new(opts, meta_client, compaction_scheduler).await + Self::new(opts, meta_client, compaction_scheduler, plugins).await } pub(crate) async fn new( opts: &DatanodeOptions, meta_client: Option>, compaction_scheduler: CompactionSchedulerRef, + plugins: Arc, ) -> Result { let object_store = store::new_object_store(&opts.storage.store).await?; let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?); @@ -234,7 +236,14 @@ impl Instance { } }; - let factory = QueryEngineFactory::new(catalog_manager.clone(), false); + // let factory = QueryEngineFactory::new(catalog_manager.clone(), false); + let factory = QueryEngineFactory::new_with_plugins( + catalog_manager.clone(), + false, + None, + None, + plugins, + ); let query_engine = factory.query_engine(); let procedure_manager = diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 43153e8e15..81af9b0cde 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -32,7 +32,13 @@ impl Instance { pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result { let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); - Instance::new(opts, Some(meta_client), compaction_scheduler).await + Instance::new( + opts, + Some(meta_client), + compaction_scheduler, + Default::default(), + ) + .await } } diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 06442b4d0a..ada759064e 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -65,7 +65,11 @@ impl MockStandaloneInstance { pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance { let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name); - let dn_instance = Arc::new(DatanodeInstance::with_opts(&opts).await.unwrap()); + let dn_instance = Arc::new( + DatanodeInstance::with_opts(&opts, Default::default()) + .await + .unwrap(), + ); let frontend_instance = Instance::try_new_standalone(dn_instance.clone()) .await