From dea87b7e57efde412a6458a9e8dcaeaff2e9c90e Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Wed, 13 Aug 2025 17:49:51 +0800 Subject: [PATCH] refactor: use DummyCatalog to construct query engine for datanode (#6723) * refactor: use DummyCatalog to construct query engine for datanode Signed-off-by: Zhenchi * fix clippy Signed-off-by: Zhenchi * move to query/dummy_catalog Signed-off-by: Zhenchi --------- Signed-off-by: Zhenchi --- Cargo.lock | 1 - src/catalog/src/memory/manager.rs | 2 +- src/datanode/Cargo.toml | 1 - src/datanode/src/datanode.rs | 5 +- src/datanode/src/error.rs | 16 ----- src/datanode/src/region_server.rs | 2 +- src/query/src/dummy_catalog.rs | 112 +++++++++++++++++++++++++++++- 7 files changed, 115 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index be714e8d92..c36b5917e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3880,7 +3880,6 @@ dependencies = [ "async-trait", "bytes", "cache", - "catalog", "client", "common-base", "common-config", diff --git a/src/catalog/src/memory/manager.rs b/src/catalog/src/memory/manager.rs index a377542af6..4c3fbd4c23 100644 --- a/src/catalog/src/memory/manager.rs +++ b/src/catalog/src/memory/manager.rs @@ -38,7 +38,7 @@ use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, Regis type SchemaEntries = HashMap>; -/// Simple in-memory list of catalogs +/// Simple in-memory list of catalogs used for tests. #[derive(Clone)] pub struct MemoryCatalogManager { /// Collection of catalogs containing schemas and ultimately Tables diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index 6d21a428cf..0abbefa76e 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -16,7 +16,6 @@ api.workspace = true arrow-flight.workspace = true async-trait.workspace = true bytes.workspace = true -catalog.workspace = true client.workspace = true common-base.workspace = true common-config.workspace = true diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index a4d1e138e3..b45f7c95ed 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -18,7 +18,6 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; -use catalog::memory::MemoryCatalogManager; use common_base::Plugins; use common_error::ext::BoxedError; use common_greptimedb_telemetry::GreptimeDBTelemetryTask; @@ -44,7 +43,7 @@ use mito2::config::MitoConfig; use mito2::engine::{MitoEngine, MitoEngineBuilder}; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; use object_store::util::normalize_dir; -use query::dummy_catalog::TableProviderFactoryRef; +use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef}; use query::QueryEngineFactory; use servers::export_metrics::ExportMetricsTask; use servers::server::ServerHandlers; @@ -377,7 +376,7 @@ impl DatanodeBuilder { let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. - MemoryCatalogManager::with_default_setup(), + DummyCatalogManager::arc(), None, None, None, diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index aa15cf8428..a6d7600b31 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -79,13 +79,6 @@ pub enum Error { source: common_query::error::Error, }, - #[snafu(display("Catalog not found: {}", name))] - CatalogNotFound { - name: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Schema not found: {}", name))] SchemaNotFound { name: String, @@ -159,13 +152,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to access catalog"))] - Catalog { - #[snafu(implicit)] - location: Location, - source: catalog::error::Error, - }, - #[snafu(display("Failed to initialize meta client"))] MetaClientInit { #[snafu(implicit)] @@ -429,12 +415,10 @@ impl ErrorExt for Error { InvalidSql { .. } | IllegalPrimaryKeysDef { .. } | MissingTimestampColumn { .. } - | CatalogNotFound { .. } | SchemaNotFound { .. } | SchemaExists { .. } | MissingNodeId { .. } | ColumnNoneDefaultValue { .. } - | Catalog { .. } | MissingRequiredField { .. } | RegionEngineNotFound { .. } | ParseAddr { .. } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 8c2edac648..ad2d13031b 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -248,7 +248,7 @@ impl RegionServer { None }; - let ctx: Option = request.header.as_ref().map(|h| h.into()); + let ctx = request.header.as_ref().map(|h| h.into()); let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build())); let provider = self diff --git a/src/query/src/dummy_catalog.rs b/src/query/src/dummy_catalog.rs index 547bb7016e..d439ab77b1 100644 --- a/src/query/src/dummy_catalog.rs +++ b/src/query/src/dummy_catalog.rs @@ -20,6 +20,8 @@ use std::sync::{Arc, Mutex}; use api::v1::SemanticType; use async_trait::async_trait; +use catalog::error::Result as CatalogResult; +use catalog::{CatalogManager, CatalogManagerRef}; use common_recordbatch::filter::SimpleFilterEvaluator; use common_recordbatch::OrderOption; use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session}; @@ -28,12 +30,15 @@ use datafusion::physical_plan::ExecutionPlan; use datafusion_common::DataFusionError; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; use datatypes::arrow::datatypes::SchemaRef; -use session::context::QueryContextRef; +use futures::stream::BoxStream; +use session::context::{QueryContext, QueryContextRef}; use snafu::ResultExt; use store_api::metadata::RegionMetadataRef; use store_api::region_engine::RegionEngineRef; use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector}; +use table::metadata::{TableId, TableInfoRef}; use table::table::scan::RegionScanExec; +use table::TableRef; use crate::error::{GetRegionMetadataSnafu, Result}; @@ -322,3 +327,108 @@ pub trait TableProviderFactory: Send + Sync { } pub type TableProviderFactoryRef = Arc; + +/// A dummy catalog manager that always returns empty results. +/// +/// Used to fill the arg of `QueryEngineFactory::new_with_plugins` in datanode. +pub struct DummyCatalogManager; + +impl DummyCatalogManager { + /// Returns a new `CatalogManagerRef` instance. + pub fn arc() -> CatalogManagerRef { + Arc::new(Self) + } +} + +#[async_trait::async_trait] +impl CatalogManager for DummyCatalogManager { + fn as_any(&self) -> &dyn Any { + self + } + + async fn catalog_names(&self) -> CatalogResult> { + Ok(vec![]) + } + + async fn schema_names( + &self, + _catalog: &str, + _query_ctx: Option<&QueryContext>, + ) -> CatalogResult> { + Ok(vec![]) + } + + async fn table_names( + &self, + _catalog: &str, + _schema: &str, + _query_ctx: Option<&QueryContext>, + ) -> CatalogResult> { + Ok(vec![]) + } + + async fn catalog_exists(&self, _catalog: &str) -> CatalogResult { + Ok(false) + } + + async fn schema_exists( + &self, + _catalog: &str, + _schema: &str, + _query_ctx: Option<&QueryContext>, + ) -> CatalogResult { + Ok(false) + } + + async fn table_exists( + &self, + _catalog: &str, + _schema: &str, + _table: &str, + _query_ctx: Option<&QueryContext>, + ) -> CatalogResult { + Ok(false) + } + + async fn table( + &self, + _catalog: &str, + _schema: &str, + _table_name: &str, + _query_ctx: Option<&QueryContext>, + ) -> CatalogResult> { + Ok(None) + } + + async fn table_id( + &self, + _catalog: &str, + _schema: &str, + _table_name: &str, + _query_ctx: Option<&QueryContext>, + ) -> CatalogResult> { + Ok(None) + } + + async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult> { + Ok(None) + } + + async fn tables_by_ids( + &self, + _catalog: &str, + _schema: &str, + _table_ids: &[TableId], + ) -> CatalogResult> { + Ok(vec![]) + } + + fn tables<'a>( + &'a self, + _catalog: &'a str, + _schema: &'a str, + _query_ctx: Option<&'a QueryContext>, + ) -> BoxStream<'a, CatalogResult> { + Box::pin(futures::stream::empty()) + } +}