refactor: use DummyCatalog to construct query engine for datanode (#6723)

* refactor: use DummyCatalog to construct query engine for datanode

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* move to query/dummy_catalog

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-08-13 17:49:51 +08:00
committed by GitHub
parent a678b4dfd6
commit dea87b7e57
7 changed files with 115 additions and 24 deletions

1
Cargo.lock generated
View File

@@ -3880,7 +3880,6 @@ dependencies = [
"async-trait",
"bytes",
"cache",
"catalog",
"client",
"common-base",
"common-config",

View File

@@ -38,7 +38,7 @@ use crate::{CatalogManager, DeregisterTableRequest, RegisterSchemaRequest, Regis
type SchemaEntries = HashMap<String, HashMap<String, TableRef>>;
/// Simple in-memory list of catalogs
/// Simple in-memory list of catalogs used for tests.
#[derive(Clone)]
pub struct MemoryCatalogManager {
/// Collection of catalogs containing schemas and ultimately Tables

View File

@@ -16,7 +16,6 @@ api.workspace = true
arrow-flight.workspace = true
async-trait.workspace = true
bytes.workspace = true
catalog.workspace = true
client.workspace = true
common-base.workspace = true
common-config.workspace = true

View File

@@ -18,7 +18,6 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use catalog::memory::MemoryCatalogManager;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
@@ -44,7 +43,7 @@ use mito2::config::MitoConfig;
use mito2::engine::{MitoEngine, MitoEngineBuilder};
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::dummy_catalog::TableProviderFactoryRef;
use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::server::ServerHandlers;
@@ -377,7 +376,7 @@ impl DatanodeBuilder {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
DummyCatalogManager::arc(),
None,
None,
None,

View File

@@ -79,13 +79,6 @@ pub enum Error {
source: common_query::error::Error,
},
#[snafu(display("Catalog not found: {}", name))]
CatalogNotFound {
name: String,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Schema not found: {}", name))]
SchemaNotFound {
name: String,
@@ -159,13 +152,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to access catalog"))]
Catalog {
#[snafu(implicit)]
location: Location,
source: catalog::error::Error,
},
#[snafu(display("Failed to initialize meta client"))]
MetaClientInit {
#[snafu(implicit)]
@@ -429,12 +415,10 @@ impl ErrorExt for Error {
InvalidSql { .. }
| IllegalPrimaryKeysDef { .. }
| MissingTimestampColumn { .. }
| CatalogNotFound { .. }
| SchemaNotFound { .. }
| SchemaExists { .. }
| MissingNodeId { .. }
| ColumnNoneDefaultValue { .. }
| Catalog { .. }
| MissingRequiredField { .. }
| RegionEngineNotFound { .. }
| ParseAddr { .. }

View File

@@ -248,7 +248,7 @@ impl RegionServer {
None
};
let ctx: Option<session::context::QueryContext> = request.header.as_ref().map(|h| h.into());
let ctx = request.header.as_ref().map(|h| h.into());
let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
let provider = self

View File

@@ -20,6 +20,8 @@ use std::sync::{Arc, Mutex};
use api::v1::SemanticType;
use async_trait::async_trait;
use catalog::error::Result as CatalogResult;
use catalog::{CatalogManager, CatalogManagerRef};
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_recordbatch::OrderOption;
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
@@ -28,12 +30,15 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use session::context::QueryContextRef;
use futures::stream::BoxStream;
use session::context::{QueryContext, QueryContextRef};
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use table::metadata::{TableId, TableInfoRef};
use table::table::scan::RegionScanExec;
use table::TableRef;
use crate::error::{GetRegionMetadataSnafu, Result};
@@ -322,3 +327,108 @@ pub trait TableProviderFactory: Send + Sync {
}
pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;
/// A dummy catalog manager that always returns empty results.
///
/// Used to fill the arg of `QueryEngineFactory::new_with_plugins` in datanode.
pub struct DummyCatalogManager;
impl DummyCatalogManager {
/// Returns a new `CatalogManagerRef` instance.
pub fn arc() -> CatalogManagerRef {
Arc::new(Self)
}
}
#[async_trait::async_trait]
impl CatalogManager for DummyCatalogManager {
fn as_any(&self) -> &dyn Any {
self
}
async fn catalog_names(&self) -> CatalogResult<Vec<String>> {
Ok(vec![])
}
async fn schema_names(
&self,
_catalog: &str,
_query_ctx: Option<&QueryContext>,
) -> CatalogResult<Vec<String>> {
Ok(vec![])
}
async fn table_names(
&self,
_catalog: &str,
_schema: &str,
_query_ctx: Option<&QueryContext>,
) -> CatalogResult<Vec<String>> {
Ok(vec![])
}
async fn catalog_exists(&self, _catalog: &str) -> CatalogResult<bool> {
Ok(false)
}
async fn schema_exists(
&self,
_catalog: &str,
_schema: &str,
_query_ctx: Option<&QueryContext>,
) -> CatalogResult<bool> {
Ok(false)
}
async fn table_exists(
&self,
_catalog: &str,
_schema: &str,
_table: &str,
_query_ctx: Option<&QueryContext>,
) -> CatalogResult<bool> {
Ok(false)
}
async fn table(
&self,
_catalog: &str,
_schema: &str,
_table_name: &str,
_query_ctx: Option<&QueryContext>,
) -> CatalogResult<Option<TableRef>> {
Ok(None)
}
async fn table_id(
&self,
_catalog: &str,
_schema: &str,
_table_name: &str,
_query_ctx: Option<&QueryContext>,
) -> CatalogResult<Option<TableId>> {
Ok(None)
}
async fn table_info_by_id(&self, _table_id: TableId) -> CatalogResult<Option<TableInfoRef>> {
Ok(None)
}
async fn tables_by_ids(
&self,
_catalog: &str,
_schema: &str,
_table_ids: &[TableId],
) -> CatalogResult<Vec<TableRef>> {
Ok(vec![])
}
fn tables<'a>(
&'a self,
_catalog: &'a str,
_schema: &'a str,
_query_ctx: Option<&'a QueryContext>,
) -> BoxStream<'a, CatalogResult<TableRef>> {
Box::pin(futures::stream::empty())
}
}