feat: add InformationExtension.inspect_datanode for datanode inspection (#6921)

* feat: add InformationExtension.inspect_datanode for datanode inspection

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* aggregate results from all datanodes

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix fmt

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix unreleased mito engine

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2025-09-09 11:29:04 +08:00
committed by GitHub
parent 9fe84f6fbd
commit 264d05d20e
10 changed files with 653 additions and 66 deletions

View File

@@ -297,6 +297,20 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to handle query"))]
HandleQuery {
source: common_meta::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to project schema"))]
ProjectSchema {
source: datatypes::error::Error,
#[snafu(implicit)]
location: Location,
},
}
impl Error {
@@ -369,6 +383,8 @@ impl ErrorExt for Error {
Error::FrontendNotFound { .. } | Error::MetaClientMissing { .. } => {
StatusCode::Unexpected
}
Error::HandleQuery { source, .. } => source.status_code(),
Error::ProjectSchema { source, .. } => source.status_code(),
}
}

View File

@@ -14,25 +14,34 @@
use api::v1::meta::ProcedureStatus;
use common_error::ext::BoxedError;
use common_meta::cluster::{ClusterInfo, NodeInfo};
use common_meta::cluster::{ClusterInfo, NodeInfo, Role};
use common_meta::datanode::RegionStat;
use common_meta::key::flow::flow_state::FlowStat;
use common_meta::node_manager::DatanodeManagerRef;
use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor};
use common_meta::rpc::procedure;
use common_procedure::{ProcedureInfo, ProcedureState};
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::util::ChainedRecordBatchStream;
use meta_client::MetaClientRef;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error;
use crate::information_schema::InformationExtension;
use crate::information_schema::{DatanodeInspectRequest, InformationExtension};
pub struct DistributedInformationExtension {
meta_client: MetaClientRef,
datanode_manager: DatanodeManagerRef,
}
impl DistributedInformationExtension {
pub fn new(meta_client: MetaClientRef) -> Self {
Self { meta_client }
pub fn new(meta_client: MetaClientRef, datanode_manager: DatanodeManagerRef) -> Self {
Self {
meta_client,
datanode_manager,
}
}
}
@@ -98,4 +107,39 @@ impl InformationExtension for DistributedInformationExtension {
.map_err(BoxedError::new)
.context(crate::error::ListFlowStatsSnafu)
}
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
// Aggregate results from all datanodes
let nodes = self
.meta_client
.list_nodes(Some(Role::Datanode))
.await
.map_err(BoxedError::new)
.context(crate::error::ListNodesSnafu)?;
let plan = request
.build_plan()
.context(crate::error::DatafusionSnafu)?;
let mut streams = Vec::with_capacity(nodes.len());
for node in nodes {
let client = self.datanode_manager.datanode(&node.peer).await;
let stream = client
.handle_query(QueryRequest {
plan: plan.clone(),
region_id: RegionId::default(),
header: None,
})
.await
.context(crate::error::HandleQuerySnafu)?;
streams.push(stream);
}
let chained =
ChainedRecordBatchStream::new(streams).context(crate::error::CreateRecordBatchSnafu)?;
Ok(Box::pin(chained))
}
}

View File

@@ -41,10 +41,13 @@ use common_meta::key::flow::flow_state::FlowStat;
use common_meta::kv_backend::KvBackendRef;
use common_procedure::ProcedureInfo;
use common_recordbatch::SendableRecordBatchStream;
use datafusion::error::DataFusionError;
use datafusion::logical_expr::LogicalPlan;
use datatypes::schema::SchemaRef;
use lazy_static::lazy_static;
use paste::paste;
use process_list::InformationSchemaProcessList;
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::storage::{ScanRequest, TableId};
use table::TableRef;
use table::metadata::TableType;
@@ -409,8 +412,43 @@ pub trait InformationExtension {
/// Get the flow statistics. If no flownode is available, return `None`.
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error>;
/// Inspects the datanode.
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error>;
}
/// The request to inspect the datanode.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DatanodeInspectRequest {
/// Kind to fetch from datanode.
pub kind: DatanodeInspectKind,
/// Pushdown scan configuration (projection/predicate/limit) for the returned stream.
/// This allows server-side filtering to reduce I/O and network costs.
pub scan: ScanRequest,
}
/// The kind of the datanode inspect request.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DatanodeInspectKind {
/// List SST entries recorded in manifest
SstManifest,
/// List SST entries discovered in storage layer
SstStorage,
}
impl DatanodeInspectRequest {
/// Builds a logical plan for the datanode inspect request.
pub fn build_plan(self) -> std::result::Result<LogicalPlan, DataFusionError> {
match self.kind {
DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan),
DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan),
}
}
}
pub struct NoopInformationExtension;
#[async_trait::async_trait]
@@ -432,4 +470,11 @@ impl InformationExtension for NoopInformationExtension {
async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
Ok(None)
}
async fn inspect_datanode(
&self,
_request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
Ok(common_recordbatch::RecordBatches::empty().as_stream())
}
}

View File

@@ -341,8 +341,18 @@ impl StartCommand {
.build(),
);
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
// flownode's frontend to datanode need not timeout.
// Some queries are expected to take long time.
let channel_config = ChannelConfig {
timeout: None,
..Default::default()
};
let client = Arc::new(NodeClients::new(channel_config));
let information_extension = Arc::new(DistributedInformationExtension::new(
meta_client.clone(),
client.clone(),
));
let catalog_manager = KvBackendCatalogManagerBuilder::new(
information_extension,
cached_meta_backend.clone(),
@@ -398,14 +408,6 @@ impl StartCommand {
flownode.setup_services(services);
let flownode = flownode;
// flownode's frontend to datanode need not timeout.
// Some queries are expected to take long time.
let channel_config = ChannelConfig {
timeout: None,
..Default::default()
};
let client = Arc::new(NodeClients::new(channel_config));
let invoker = FrontendInvoker::build_from(
flownode.flow_engine().streaming_engine(),
catalog_manager.clone(),

View File

@@ -378,8 +378,24 @@ impl StartCommand {
.build(),
);
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
// frontend to datanode need not timeout.
// Some queries are expected to take long time.
let mut channel_config = ChannelConfig {
timeout: None,
tcp_nodelay: opts.datanode.client.tcp_nodelay,
connect_timeout: Some(opts.datanode.client.connect_timeout),
..Default::default()
};
if opts.grpc.flight_compression.transport_compression() {
channel_config.accept_compression = true;
channel_config.send_compression = true;
}
let client = Arc::new(NodeClients::new(channel_config));
let information_extension = Arc::new(DistributedInformationExtension::new(
meta_client.clone(),
client.clone(),
));
let process_manager = Arc::new(ProcessManager::new(
addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
@@ -413,26 +429,12 @@ impl StartCommand {
);
let heartbeat_task = Some(heartbeat_task);
// frontend to datanode need not timeout.
// Some queries are expected to take long time.
let mut channel_config = ChannelConfig {
timeout: None,
tcp_nodelay: opts.datanode.client.tcp_nodelay,
connect_timeout: Some(opts.datanode.client.connect_timeout),
..Default::default()
};
if opts.grpc.flight_compression.transport_compression() {
channel_config.accept_compression = true;
channel_config.send_compression = true;
}
let client = NodeClients::new(channel_config);
let instance = FrontendBuilder::new(
opts.clone(),
cached_meta_backend.clone(),
layered_cache_registry.clone(),
catalog_manager,
Arc::new(client),
client,
meta_client,
process_manager,
)

View File

@@ -19,10 +19,11 @@ use std::{fs, path};
use async_trait::async_trait;
use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
use catalog::information_schema::InformationExtension;
use catalog::information_schema::{DatanodeInspectRequest, InformationExtension};
use catalog::kvbackend::KvBackendCatalogManagerBuilder;
use catalog::process_manager::ProcessManager;
use clap::Parser;
use client::SendableRecordBatchStream;
use client::api::v1::meta::RegionRole;
use common_base::Plugins;
use common_base::readable_size::ReadableSize;
@@ -48,6 +49,7 @@ use common_meta::sequence::SequenceBuilder;
use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
use common_options::memory::MemoryOptions;
use common_procedure::{ProcedureInfo, ProcedureManagerRef};
use common_query::request::QueryRequest;
use common_telemetry::info;
use common_telemetry::logging::{
DEFAULT_LOGGING_DIR, LoggingOptions, SlowQueryOptions, TracingOptions,
@@ -80,6 +82,7 @@ use servers::grpc::GrpcOptions;
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::sync::RwLock;
use tracing_appender::non_blocking::WorkerGuard;
@@ -856,6 +859,25 @@ impl InformationExtension for StandaloneInformationExtension {
.await,
))
}
async fn inspect_datanode(
&self,
request: DatanodeInspectRequest,
) -> std::result::Result<SendableRecordBatchStream, Self::Error> {
let req = QueryRequest {
plan: request
.build_plan()
.context(catalog::error::DatafusionSnafu)?,
region_id: RegionId::default(),
header: None,
};
self.region_server
.handle_read(req)
.await
.map_err(BoxedError::new)
.context(catalog::error::InternalSnafu)
}
}
#[cfg(test)]

View File

@@ -315,6 +315,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to list SST entries from storage"))]
ListStorageSsts {
#[snafu(implicit)]
location: Location,
source: mito2::error::Error,
},
#[snafu(display("Failed to serialize options to TOML"))]
TomlFormat {
#[snafu(implicit)]
@@ -453,6 +460,7 @@ impl ErrorExt for Error {
FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
BuildMetricEngine { source, .. } => source.status_code(),
ListStorageSsts { source, .. } => source.status_code(),
ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => {
StatusCode::RegionBusy
}

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod catalog;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
@@ -39,13 +41,11 @@ use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
use common_telemetry::{debug, error, info, warn};
use dashmap::DashMap;
use datafusion::datasource::{TableProvider, provider_as_source};
use datafusion::error::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_expr::{LogicalPlan, TableSource};
use datafusion::datasource::TableProvider;
use datafusion_common::tree_node::TreeNode;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
use mito2::engine::MITO_ENGINE_NAME;
use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
use prost::Message;
use query::QueryEngineRef;
pub use query::dummy_catalog::{
@@ -82,6 +82,7 @@ use crate::error::{
Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder};
#[derive(Clone)]
pub struct RegionServer {
@@ -215,10 +216,11 @@ impl RegionServer {
};
let region_id = RegionId::from_u64(request.region_id);
let provider = self
.table_provider(region_id, Some(query_ctx.clone()))
.await?;
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
let catalog_list = Arc::new(NameAwareCatalogList::new(
self.clone(),
region_id,
query_ctx.clone(),
));
if query_ctx.explain_verbose() {
common_telemetry::info!("Handle remote read for region: {}", region_id);
@@ -259,33 +261,15 @@ impl RegionServer {
let ctx = request.header.as_ref().map(|h| h.into());
let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build()));
let provider = self
.table_provider(request.region_id, Some(query_ctx.clone()))
let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan)
.context(DataFusionSnafu)?;
let mut injector = injector_builder
.build(self, request.region_id, query_ctx.clone())
.await?;
struct RegionDataSourceInjector {
source: Arc<dyn TableSource>,
}
impl TreeNodeRewriter for RegionDataSourceInjector {
type Node = LogicalPlan;
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
Ok(match node {
LogicalPlan::TableScan(mut scan) => {
scan.source = self.source.clone();
Transformed::yes(LogicalPlan::TableScan(scan))
}
_ => Transformed::no(node),
})
}
}
let plan = request
.plan
.rewrite(&mut RegionDataSourceInjector {
source: provider_as_source(provider),
})
.rewrite(&mut injector)
.context(DataFusionSnafu)?
.data;
@@ -697,6 +681,10 @@ struct RegionServerInner {
parallelism: Option<RegionServerParallelism>,
// The topic stats reporter.
topic_stats_reporter: RwLock<Option<Box<dyn TopicStatsReporter>>>,
// HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the
// server with a concrete engine; acceptable for now to fetch Mito-specific
// info (e.g., list SSTs). Consider a diagnostics trait later.
mito_engine: RwLock<Option<MitoEngine>>,
}
struct RegionServerParallelism {
@@ -763,11 +751,18 @@ impl RegionServerInner {
table_provider_factory,
parallelism,
topic_stats_reporter: RwLock::new(None),
mito_engine: RwLock::new(None),
}
}
pub fn register_engine(&self, engine: RegionEngineRef) {
let engine_name = engine.name();
if engine_name == MITO_ENGINE_NAME
&& let Some(mito_engine) = engine.as_any().downcast_ref::<MitoEngine>()
{
*self.mito_engine.write().unwrap() = Some(mito_engine.clone());
}
info!("Region Engine {engine_name} is registered");
self.engines
.write()
@@ -1286,6 +1281,7 @@ impl RegionServerInner {
self.region_map.clear();
info!("closed {num_regions} regions");
drop(self.mito_engine.write().unwrap().take());
let engines = self.engines.write().unwrap().drain().collect::<Vec<_>>();
for (engine_name, engine) in engines {
engine

View File

@@ -0,0 +1,450 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::catalog::{
CatalogProvider, CatalogProviderList, MemTable, SchemaProvider, TableProvider,
};
use datafusion::datasource::provider_as_source;
use datafusion::error as df_error;
use datafusion::error::Result as DfResult;
use datafusion_common::DataFusionError;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter};
use datafusion_expr::{LogicalPlan, TableSource};
use futures::TryStreamExt;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};
use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry};
use store_api::storage::RegionId;
use crate::error::{DataFusionSnafu, ListStorageSstsSnafu, Result, UnexpectedSnafu};
use crate::region_server::RegionServer;
/// Reserved internal table kinds used.
/// These are recognized by reserved table names and mapped to providers.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)]
enum InternalTableKind {
InspectSstManifest,
InspectSstStorage,
}
impl InternalTableKind {
/// Determine if the name is a reserved internal table (case-insensitive).
pub fn from_table_name(name: &str) -> Option<Self> {
if name.eq_ignore_ascii_case(ManifestSstEntry::reserved_table_name_for_inspection()) {
return Some(Self::InspectSstManifest);
}
if name.eq_ignore_ascii_case(StorageSstEntry::reserved_table_name_for_inspection()) {
return Some(Self::InspectSstStorage);
}
None
}
/// Return the `TableProvider` for the internal table.
pub async fn table_provider(&self, server: &RegionServer) -> Result<Arc<dyn TableProvider>> {
match self {
Self::InspectSstManifest => server.inspect_sst_manifest_provider().await,
Self::InspectSstStorage => server.inspect_sst_storage_provider().await,
}
}
}
impl RegionServer {
/// Expose SSTs listed in Manifest as an in-memory table for inspection.
pub async fn inspect_sst_manifest_provider(&self) -> Result<Arc<dyn TableProvider>> {
let mito = {
let guard = self.inner.mito_engine.read().unwrap();
guard.as_ref().cloned().context(UnexpectedSnafu {
violated: "mito engine not available",
})?
};
let entries = mito.all_ssts_from_manifest().collect::<Vec<_>>();
let schema = ManifestSstEntry::schema().arrow_schema().clone();
let batch = ManifestSstEntry::to_record_batch(&entries)
.map_err(DataFusionError::from)
.context(DataFusionSnafu)?;
let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
Ok(Arc::new(table))
}
/// Expose SSTs found in storage as an in-memory table for inspection.
pub async fn inspect_sst_storage_provider(&self) -> Result<Arc<dyn TableProvider>> {
let mito = {
let guard = self.inner.mito_engine.read().unwrap();
guard.as_ref().cloned().context(UnexpectedSnafu {
violated: "mito engine not available",
})?
};
let entries = mito
.all_ssts_from_storage()
.try_collect::<Vec<_>>()
.await
.context(ListStorageSstsSnafu)?;
let schema = StorageSstEntry::schema().arrow_schema().clone();
let batch = StorageSstEntry::to_record_batch(&entries)
.map_err(DataFusionError::from)
.context(DataFusionSnafu)?;
let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?;
Ok(Arc::new(table))
}
}
/// A catalog list that resolves `TableProvider` by table name:
/// - For reserved internal names, return inspection providers;
/// - Otherwise, fall back to the Region provider.
#[derive(Clone, Debug)]
pub(crate) struct NameAwareCatalogList {
catalog: NameAwareCatalogProvider,
}
impl NameAwareCatalogList {
/// Creates the catalog list.
pub fn new(server: RegionServer, region_id: RegionId, query_ctx: QueryContextRef) -> Self {
let schema_provider = NameAwareSchemaProvider {
server,
region_id,
query_ctx,
};
let catalog = NameAwareCatalogProvider {
schema: schema_provider,
};
Self { catalog }
}
}
impl CatalogProviderList for NameAwareCatalogList {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn register_catalog(
&self,
_name: String,
_catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
None
}
fn catalog_names(&self) -> Vec<String> {
vec![]
}
fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
Some(Arc::new(self.catalog.clone()))
}
}
#[derive(Clone, Debug)]
struct NameAwareCatalogProvider {
schema: NameAwareSchemaProvider,
}
impl CatalogProvider for NameAwareCatalogProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn schema_names(&self) -> Vec<String> {
vec![]
}
fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(Arc::new(self.schema.clone()))
}
}
#[derive(Clone)]
struct NameAwareSchemaProvider {
server: RegionServer,
region_id: RegionId,
query_ctx: QueryContextRef,
}
impl std::fmt::Debug for NameAwareSchemaProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "NameAwareSchemaProvider")
}
}
#[async_trait::async_trait]
impl SchemaProvider for NameAwareSchemaProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn table_names(&self) -> Vec<String> {
vec![]
}
async fn table(&self, name: &str) -> DfResult<Option<Arc<dyn TableProvider>>> {
// Resolve inspect providers by reserved names.
if let Some(kind) = InternalTableKind::from_table_name(name) {
return kind
.table_provider(&self.server)
.await
.map(Some)
.map_err(|e| df_error::DataFusionError::External(Box::new(e)));
}
// Fallback to region provider for any other table name.
let provider = self
.server
.table_provider(self.region_id, Some(self.query_ctx.clone()))
.await
.map_err(|e| df_error::DataFusionError::External(Box::new(e)))?;
Ok(Some(provider))
}
fn table_exist(&self, _name: &str) -> bool {
true
}
}
/// Builds a `NameAwareDataSourceInjector` from a logical plan.
///
/// It scans the plan to determine:
/// - whether a Region `TableSource` is required, and
/// - which internal inspection sources are referenced.
pub(crate) struct NameAwareDataSourceInjectorBuilder {
/// Whether the plan requires a Region `TableSource`.
need_region_provider: bool,
/// Internal table kinds referenced by the plan.
reserved_table_needed: Vec<InternalTableKind>,
}
impl NameAwareDataSourceInjectorBuilder {
/// Walk the `LogicalPlan` to determine whether a Region source is needed,
/// and collect the kinds of internal sources required.
pub fn from_plan(plan: &LogicalPlan) -> DfResult<Self> {
let mut need_region_provider = false;
let mut reserved_table_needed = Vec::new();
plan.apply(|node| {
if let LogicalPlan::TableScan(ts) = node {
let name = ts.table_name.to_string();
if let Some(kind) = InternalTableKind::from_table_name(&name) {
if !reserved_table_needed.contains(&kind) {
reserved_table_needed.push(kind);
}
} else {
// Any normal table scan implies a Region source is needed.
need_region_provider = true;
}
}
Ok(TreeNodeRecursion::Continue)
})?;
Ok(Self {
need_region_provider,
reserved_table_needed,
})
}
pub async fn build(
self,
server: &RegionServer,
region_id: RegionId,
query_ctx: QueryContextRef,
) -> Result<NameAwareDataSourceInjector> {
let region = if self.need_region_provider {
let provider = server.table_provider(region_id, Some(query_ctx)).await?;
Some(provider_as_source(provider))
} else {
None
};
let mut reserved_sources = HashMap::new();
for kind in &self.reserved_table_needed {
let provider = kind.table_provider(server).await?;
reserved_sources.insert(*kind, provider_as_source(provider));
}
Ok(NameAwareDataSourceInjector {
reserved_sources,
region_source: region,
})
}
}
/// Rewrites `LogicalPlan` to inject proper data sources for `TableScan`.
/// Uses internal sources for reserved tables; otherwise uses the Region source.
pub(crate) struct NameAwareDataSourceInjector {
/// Sources for reserved internal tables, keyed by kind.
reserved_sources: HashMap<InternalTableKind, Arc<dyn TableSource>>,
/// Optional Region-level source used for normal tables.
region_source: Option<Arc<dyn TableSource>>,
}
impl TreeNodeRewriter for NameAwareDataSourceInjector {
type Node = LogicalPlan;
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
Ok(match node {
LogicalPlan::TableScan(mut scan) => {
let name = scan.table_name.to_string();
if let Some(kind) = InternalTableKind::from_table_name(&name)
&& let Some(source) = self.reserved_sources.get(&kind)
{
// Matched a reserved internal table: rewrite to its dedicated source.
scan.source = source.clone();
} else {
let Some(region) = &self.region_source else {
// Region source required but not constructed; this is unexpected.
return Err(datafusion::error::DataFusionError::Plan(
"region provider not available".to_string(),
));
};
// Normal table: rewrite to the Region source.
scan.source = region.clone();
}
Transformed::yes(LogicalPlan::TableScan(scan))
}
_ => Transformed::no(node),
})
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datafusion::catalog::MemTable as DfMemTable;
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::{LogicalPlanBuilder, table_scan};
use datatypes::arrow::array::Int32Array;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use datatypes::arrow::record_batch::RecordBatch;
use super::*; // bring rewrite() into scope
fn test_schema() -> Schema {
Schema::new(vec![Field::new("a", DataType::Int32, true)])
}
fn empty_mem_table() -> Arc<DfMemTable> {
let schema = Arc::new(test_schema());
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int32Array::from(Vec::<i32>::new()))],
)
.unwrap();
Arc::new(DfMemTable::try_new(schema, vec![vec![batch]]).unwrap())
}
#[test]
fn test_injector_builder_from_plan_flags() {
let schema = test_schema();
let reserved = ManifestSstEntry::reserved_table_name_for_inspection();
// plan1: reserved table scan only
let plan1 = table_scan(Some(reserved), &schema, None)
.unwrap()
.build()
.unwrap();
let b1 = NameAwareDataSourceInjectorBuilder::from_plan(&plan1).unwrap();
assert!(!b1.need_region_provider);
assert_eq!(
b1.reserved_table_needed,
vec![InternalTableKind::InspectSstManifest]
);
// plan2: normal table scan only
let plan2 = table_scan(Some("normal_table"), &schema, None)
.unwrap()
.build()
.unwrap();
let b2 = NameAwareDataSourceInjectorBuilder::from_plan(&plan2).unwrap();
assert!(b2.need_region_provider);
assert!(b2.reserved_table_needed.is_empty());
// plan3: both reserved and normal (via UNION)
let p_res = table_scan(Some(reserved), &schema, None)
.unwrap()
.build()
.unwrap();
let p_norm = table_scan(Some("normal_table"), &schema, None)
.unwrap()
.build()
.unwrap();
let plan3 = LogicalPlanBuilder::from(p_res)
.union(LogicalPlanBuilder::from(p_norm).build().unwrap())
.unwrap()
.build()
.unwrap();
let b3 = NameAwareDataSourceInjectorBuilder::from_plan(&plan3).unwrap();
assert!(b3.need_region_provider);
assert_eq!(
b3.reserved_table_needed,
vec![InternalTableKind::InspectSstManifest]
);
}
#[test]
fn test_rewriter_replaces_with_reserved_source() {
let schema = test_schema();
let table_name = ManifestSstEntry::reserved_table_name_for_inspection();
let plan = table_scan(Some(table_name), &schema, None)
.unwrap()
.build()
.unwrap();
let provider = empty_mem_table();
let source = provider_as_source(provider);
let mut injector = NameAwareDataSourceInjector {
reserved_sources: {
let mut m = HashMap::new();
m.insert(InternalTableKind::InspectSstManifest, source.clone());
m
},
region_source: None,
};
let transformed = plan.rewrite(&mut injector).unwrap();
let new_plan = transformed.data;
if let LogicalPlan::TableScan(scan) = new_plan {
// Compare the underlying Arc ptrs to ensure replacement happened
let src_ptr = Arc::as_ptr(&scan.source);
let want_ptr = Arc::as_ptr(&source);
assert!(std::ptr::eq(src_ptr, want_ptr));
} else {
panic!("expected TableScan after rewrite");
}
}
#[test]
fn test_rewriter_replaces_with_region_source_for_normal() {
let schema = test_schema();
let plan = table_scan(Some("normal_table"), &schema, None)
.unwrap()
.build()
.unwrap();
let provider = empty_mem_table();
let region_source = provider_as_source(provider);
let mut injector = NameAwareDataSourceInjector {
reserved_sources: HashMap::new(),
region_source: Some(region_source.clone()),
};
let transformed = plan.rewrite(&mut injector).unwrap();
let new_plan = transformed.data;
if let LogicalPlan::TableScan(scan) = new_plan {
let src_ptr = Arc::as_ptr(&scan.source);
let want_ptr = Arc::as_ptr(&region_source);
assert!(std::ptr::eq(src_ptr, want_ptr));
} else {
panic!("expected TableScan after rewrite");
}
}
}

View File

@@ -389,8 +389,10 @@ impl GreptimeDbClusterBuilder {
.build(),
);
let information_extension =
Arc::new(DistributedInformationExtension::new(meta_client.clone()));
let information_extension = Arc::new(DistributedInformationExtension::new(
meta_client.clone(),
datanode_clients.clone(),
));
let catalog_manager = KvBackendCatalogManagerBuilder::new(
information_extension,
cached_meta_backend.clone(),