diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 30edab94df..e7c6efc018 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -297,6 +297,20 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to handle query"))] + HandleQuery { + source: common_meta::error::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to project schema"))] + ProjectSchema { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, } impl Error { @@ -369,6 +383,8 @@ impl ErrorExt for Error { Error::FrontendNotFound { .. } | Error::MetaClientMissing { .. } => { StatusCode::Unexpected } + Error::HandleQuery { source, .. } => source.status_code(), + Error::ProjectSchema { source, .. } => source.status_code(), } } diff --git a/src/catalog/src/information_extension.rs b/src/catalog/src/information_extension.rs index e481d469bd..bcb5056bad 100644 --- a/src/catalog/src/information_extension.rs +++ b/src/catalog/src/information_extension.rs @@ -14,25 +14,34 @@ use api::v1::meta::ProcedureStatus; use common_error::ext::BoxedError; -use common_meta::cluster::{ClusterInfo, NodeInfo}; +use common_meta::cluster::{ClusterInfo, NodeInfo, Role}; use common_meta::datanode::RegionStat; use common_meta::key::flow::flow_state::FlowStat; +use common_meta::node_manager::DatanodeManagerRef; use common_meta::procedure_executor::{ExecutorContext, ProcedureExecutor}; use common_meta::rpc::procedure; use common_procedure::{ProcedureInfo, ProcedureState}; +use common_query::request::QueryRequest; +use common_recordbatch::SendableRecordBatchStream; +use common_recordbatch::util::ChainedRecordBatchStream; use meta_client::MetaClientRef; use snafu::ResultExt; +use store_api::storage::RegionId; use crate::error; -use crate::information_schema::InformationExtension; +use crate::information_schema::{DatanodeInspectRequest, InformationExtension}; pub struct DistributedInformationExtension { meta_client: MetaClientRef, + datanode_manager: DatanodeManagerRef, } impl DistributedInformationExtension { - pub fn new(meta_client: MetaClientRef) -> Self { - Self { meta_client } + pub fn new(meta_client: MetaClientRef, datanode_manager: DatanodeManagerRef) -> Self { + Self { + meta_client, + datanode_manager, + } } } @@ -98,4 +107,39 @@ impl InformationExtension for DistributedInformationExtension { .map_err(BoxedError::new) .context(crate::error::ListFlowStatsSnafu) } + + async fn inspect_datanode( + &self, + request: DatanodeInspectRequest, + ) -> std::result::Result { + // Aggregate results from all datanodes + let nodes = self + .meta_client + .list_nodes(Some(Role::Datanode)) + .await + .map_err(BoxedError::new) + .context(crate::error::ListNodesSnafu)?; + + let plan = request + .build_plan() + .context(crate::error::DatafusionSnafu)?; + + let mut streams = Vec::with_capacity(nodes.len()); + for node in nodes { + let client = self.datanode_manager.datanode(&node.peer).await; + let stream = client + .handle_query(QueryRequest { + plan: plan.clone(), + region_id: RegionId::default(), + header: None, + }) + .await + .context(crate::error::HandleQuerySnafu)?; + streams.push(stream); + } + + let chained = + ChainedRecordBatchStream::new(streams).context(crate::error::CreateRecordBatchSnafu)?; + Ok(Box::pin(chained)) + } } diff --git a/src/catalog/src/system_schema/information_schema.rs b/src/catalog/src/system_schema/information_schema.rs index 724859c904..00313a78e6 100644 --- a/src/catalog/src/system_schema/information_schema.rs +++ b/src/catalog/src/system_schema/information_schema.rs @@ -41,10 +41,13 @@ use common_meta::key::flow::flow_state::FlowStat; use common_meta::kv_backend::KvBackendRef; use common_procedure::ProcedureInfo; use common_recordbatch::SendableRecordBatchStream; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::LogicalPlan; use datatypes::schema::SchemaRef; use lazy_static::lazy_static; use paste::paste; use process_list::InformationSchemaProcessList; +use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry}; use store_api::storage::{ScanRequest, TableId}; use table::TableRef; use table::metadata::TableType; @@ -409,8 +412,43 @@ pub trait InformationExtension { /// Get the flow statistics. If no flownode is available, return `None`. async fn flow_stats(&self) -> std::result::Result, Self::Error>; + + /// Inspects the datanode. + async fn inspect_datanode( + &self, + request: DatanodeInspectRequest, + ) -> std::result::Result; } +/// The request to inspect the datanode. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DatanodeInspectRequest { + /// Kind to fetch from datanode. + pub kind: DatanodeInspectKind, + + /// Pushdown scan configuration (projection/predicate/limit) for the returned stream. + /// This allows server-side filtering to reduce I/O and network costs. + pub scan: ScanRequest, +} + +/// The kind of the datanode inspect request. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum DatanodeInspectKind { + /// List SST entries recorded in manifest + SstManifest, + /// List SST entries discovered in storage layer + SstStorage, +} + +impl DatanodeInspectRequest { + /// Builds a logical plan for the datanode inspect request. + pub fn build_plan(self) -> std::result::Result { + match self.kind { + DatanodeInspectKind::SstManifest => ManifestSstEntry::build_plan(self.scan), + DatanodeInspectKind::SstStorage => StorageSstEntry::build_plan(self.scan), + } + } +} pub struct NoopInformationExtension; #[async_trait::async_trait] @@ -432,4 +470,11 @@ impl InformationExtension for NoopInformationExtension { async fn flow_stats(&self) -> std::result::Result, Self::Error> { Ok(None) } + + async fn inspect_datanode( + &self, + _request: DatanodeInspectRequest, + ) -> std::result::Result { + Ok(common_recordbatch::RecordBatches::empty().as_stream()) + } } diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index f62fb7919e..500e9bfa89 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -341,8 +341,18 @@ impl StartCommand { .build(), ); - let information_extension = - Arc::new(DistributedInformationExtension::new(meta_client.clone())); + // flownode's frontend to datanode need not timeout. + // Some queries are expected to take long time. + let channel_config = ChannelConfig { + timeout: None, + ..Default::default() + }; + let client = Arc::new(NodeClients::new(channel_config)); + + let information_extension = Arc::new(DistributedInformationExtension::new( + meta_client.clone(), + client.clone(), + )); let catalog_manager = KvBackendCatalogManagerBuilder::new( information_extension, cached_meta_backend.clone(), @@ -398,14 +408,6 @@ impl StartCommand { flownode.setup_services(services); let flownode = flownode; - // flownode's frontend to datanode need not timeout. - // Some queries are expected to take long time. - let channel_config = ChannelConfig { - timeout: None, - ..Default::default() - }; - let client = Arc::new(NodeClients::new(channel_config)); - let invoker = FrontendInvoker::build_from( flownode.flow_engine().streaming_engine(), catalog_manager.clone(), diff --git a/src/cmd/src/frontend.rs b/src/cmd/src/frontend.rs index cab9912203..d147fc648e 100644 --- a/src/cmd/src/frontend.rs +++ b/src/cmd/src/frontend.rs @@ -378,8 +378,24 @@ impl StartCommand { .build(), ); - let information_extension = - Arc::new(DistributedInformationExtension::new(meta_client.clone())); + // frontend to datanode need not timeout. + // Some queries are expected to take long time. + let mut channel_config = ChannelConfig { + timeout: None, + tcp_nodelay: opts.datanode.client.tcp_nodelay, + connect_timeout: Some(opts.datanode.client.connect_timeout), + ..Default::default() + }; + if opts.grpc.flight_compression.transport_compression() { + channel_config.accept_compression = true; + channel_config.send_compression = true; + } + let client = Arc::new(NodeClients::new(channel_config)); + + let information_extension = Arc::new(DistributedInformationExtension::new( + meta_client.clone(), + client.clone(), + )); let process_manager = Arc::new(ProcessManager::new( addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)), @@ -413,26 +429,12 @@ impl StartCommand { ); let heartbeat_task = Some(heartbeat_task); - // frontend to datanode need not timeout. - // Some queries are expected to take long time. - let mut channel_config = ChannelConfig { - timeout: None, - tcp_nodelay: opts.datanode.client.tcp_nodelay, - connect_timeout: Some(opts.datanode.client.connect_timeout), - ..Default::default() - }; - if opts.grpc.flight_compression.transport_compression() { - channel_config.accept_compression = true; - channel_config.send_compression = true; - } - let client = NodeClients::new(channel_config); - let instance = FrontendBuilder::new( opts.clone(), cached_meta_backend.clone(), layered_cache_registry.clone(), catalog_manager, - Arc::new(client), + client, meta_client, process_manager, ) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index a6362ca156..9ee7d6b728 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -19,10 +19,11 @@ use std::{fs, path}; use async_trait::async_trait; use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry}; -use catalog::information_schema::InformationExtension; +use catalog::information_schema::{DatanodeInspectRequest, InformationExtension}; use catalog::kvbackend::KvBackendCatalogManagerBuilder; use catalog::process_manager::ProcessManager; use clap::Parser; +use client::SendableRecordBatchStream; use client::api::v1::meta::RegionRole; use common_base::Plugins; use common_base::readable_size::ReadableSize; @@ -48,6 +49,7 @@ use common_meta::sequence::SequenceBuilder; use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator}; use common_options::memory::MemoryOptions; use common_procedure::{ProcedureInfo, ProcedureManagerRef}; +use common_query::request::QueryRequest; use common_telemetry::info; use common_telemetry::logging::{ DEFAULT_LOGGING_DIR, LoggingOptions, SlowQueryOptions, TracingOptions, @@ -80,6 +82,7 @@ use servers::grpc::GrpcOptions; use servers::http::HttpOptions; use servers::tls::{TlsMode, TlsOption}; use snafu::ResultExt; +use store_api::storage::RegionId; use tokio::sync::RwLock; use tracing_appender::non_blocking::WorkerGuard; @@ -856,6 +859,25 @@ impl InformationExtension for StandaloneInformationExtension { .await, )) } + + async fn inspect_datanode( + &self, + request: DatanodeInspectRequest, + ) -> std::result::Result { + let req = QueryRequest { + plan: request + .build_plan() + .context(catalog::error::DatafusionSnafu)?, + region_id: RegionId::default(), + header: None, + }; + + self.region_server + .handle_read(req) + .await + .map_err(BoxedError::new) + .context(catalog::error::InternalSnafu) + } } #[cfg(test)] diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index a6d7600b31..a2e6f674e2 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -315,6 +315,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to list SST entries from storage"))] + ListStorageSsts { + #[snafu(implicit)] + location: Location, + source: mito2::error::Error, + }, + #[snafu(display("Failed to serialize options to TOML"))] TomlFormat { #[snafu(implicit)] @@ -453,6 +460,7 @@ impl ErrorExt for Error { FindLogicalRegions { source, .. } => source.status_code(), BuildMitoEngine { source, .. } => source.status_code(), BuildMetricEngine { source, .. } => source.status_code(), + ListStorageSsts { source, .. } => source.status_code(), ConcurrentQueryLimiterClosed { .. } | ConcurrentQueryLimiterTimeout { .. } => { StatusCode::RegionBusy } diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index fdf981b97e..b7a2087ef5 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod catalog; + use std::collections::HashMap; use std::fmt::Debug; use std::ops::Deref; @@ -39,13 +41,11 @@ use common_telemetry::tracing::{self, info_span}; use common_telemetry::tracing_context::{FutureExt, TracingContext}; use common_telemetry::{debug, error, info, warn}; use dashmap::DashMap; -use datafusion::datasource::{TableProvider, provider_as_source}; -use datafusion::error::Result as DfResult; -use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; -use datafusion_expr::{LogicalPlan, TableSource}; +use datafusion::datasource::TableProvider; +use datafusion_common::tree_node::TreeNode; use futures_util::future::try_join_all; use metric_engine::engine::MetricEngine; -use mito2::engine::MITO_ENGINE_NAME; +use mito2::engine::{MITO_ENGINE_NAME, MitoEngine}; use prost::Message; use query::QueryEngineRef; pub use query::dummy_catalog::{ @@ -82,6 +82,7 @@ use crate::error::{ Result, SerializeJsonSnafu, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu, }; use crate::event_listener::RegionServerEventListenerRef; +use crate::region_server::catalog::{NameAwareCatalogList, NameAwareDataSourceInjectorBuilder}; #[derive(Clone)] pub struct RegionServer { @@ -215,10 +216,11 @@ impl RegionServer { }; let region_id = RegionId::from_u64(request.region_id); - let provider = self - .table_provider(region_id, Some(query_ctx.clone())) - .await?; - let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider)); + let catalog_list = Arc::new(NameAwareCatalogList::new( + self.clone(), + region_id, + query_ctx.clone(), + )); if query_ctx.explain_verbose() { common_telemetry::info!("Handle remote read for region: {}", region_id); @@ -259,33 +261,15 @@ impl RegionServer { let ctx = request.header.as_ref().map(|h| h.into()); let query_ctx = Arc::new(ctx.unwrap_or_else(|| QueryContextBuilder::default().build())); - let provider = self - .table_provider(request.region_id, Some(query_ctx.clone())) + let injector_builder = NameAwareDataSourceInjectorBuilder::from_plan(&request.plan) + .context(DataFusionSnafu)?; + let mut injector = injector_builder + .build(self, request.region_id, query_ctx.clone()) .await?; - struct RegionDataSourceInjector { - source: Arc, - } - - impl TreeNodeRewriter for RegionDataSourceInjector { - type Node = LogicalPlan; - - fn f_up(&mut self, node: Self::Node) -> DfResult> { - Ok(match node { - LogicalPlan::TableScan(mut scan) => { - scan.source = self.source.clone(); - Transformed::yes(LogicalPlan::TableScan(scan)) - } - _ => Transformed::no(node), - }) - } - } - let plan = request .plan - .rewrite(&mut RegionDataSourceInjector { - source: provider_as_source(provider), - }) + .rewrite(&mut injector) .context(DataFusionSnafu)? .data; @@ -697,6 +681,10 @@ struct RegionServerInner { parallelism: Option, // The topic stats reporter. topic_stats_reporter: RwLock>>, + // HACK(zhongzc): Direct MitoEngine handle for diagnostics. This couples the + // server with a concrete engine; acceptable for now to fetch Mito-specific + // info (e.g., list SSTs). Consider a diagnostics trait later. + mito_engine: RwLock>, } struct RegionServerParallelism { @@ -763,11 +751,18 @@ impl RegionServerInner { table_provider_factory, parallelism, topic_stats_reporter: RwLock::new(None), + mito_engine: RwLock::new(None), } } pub fn register_engine(&self, engine: RegionEngineRef) { let engine_name = engine.name(); + if engine_name == MITO_ENGINE_NAME + && let Some(mito_engine) = engine.as_any().downcast_ref::() + { + *self.mito_engine.write().unwrap() = Some(mito_engine.clone()); + } + info!("Region Engine {engine_name} is registered"); self.engines .write() @@ -1286,6 +1281,7 @@ impl RegionServerInner { self.region_map.clear(); info!("closed {num_regions} regions"); + drop(self.mito_engine.write().unwrap().take()); let engines = self.engines.write().unwrap().drain().collect::>(); for (engine_name, engine) in engines { engine diff --git a/src/datanode/src/region_server/catalog.rs b/src/datanode/src/region_server/catalog.rs new file mode 100644 index 0000000000..0d750ae881 --- /dev/null +++ b/src/datanode/src/region_server/catalog.rs @@ -0,0 +1,450 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion::catalog::{ + CatalogProvider, CatalogProviderList, MemTable, SchemaProvider, TableProvider, +}; +use datafusion::datasource::provider_as_source; +use datafusion::error as df_error; +use datafusion::error::Result as DfResult; +use datafusion_common::DataFusionError; +use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeRewriter}; +use datafusion_expr::{LogicalPlan, TableSource}; +use futures::TryStreamExt; +use session::context::QueryContextRef; +use snafu::{OptionExt, ResultExt}; +use store_api::sst_entry::{ManifestSstEntry, StorageSstEntry}; +use store_api::storage::RegionId; + +use crate::error::{DataFusionSnafu, ListStorageSstsSnafu, Result, UnexpectedSnafu}; +use crate::region_server::RegionServer; + +/// Reserved internal table kinds used. +/// These are recognized by reserved table names and mapped to providers. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Copy)] +enum InternalTableKind { + InspectSstManifest, + InspectSstStorage, +} + +impl InternalTableKind { + /// Determine if the name is a reserved internal table (case-insensitive). + pub fn from_table_name(name: &str) -> Option { + if name.eq_ignore_ascii_case(ManifestSstEntry::reserved_table_name_for_inspection()) { + return Some(Self::InspectSstManifest); + } + if name.eq_ignore_ascii_case(StorageSstEntry::reserved_table_name_for_inspection()) { + return Some(Self::InspectSstStorage); + } + None + } + + /// Return the `TableProvider` for the internal table. + pub async fn table_provider(&self, server: &RegionServer) -> Result> { + match self { + Self::InspectSstManifest => server.inspect_sst_manifest_provider().await, + Self::InspectSstStorage => server.inspect_sst_storage_provider().await, + } + } +} + +impl RegionServer { + /// Expose SSTs listed in Manifest as an in-memory table for inspection. + pub async fn inspect_sst_manifest_provider(&self) -> Result> { + let mito = { + let guard = self.inner.mito_engine.read().unwrap(); + guard.as_ref().cloned().context(UnexpectedSnafu { + violated: "mito engine not available", + })? + }; + + let entries = mito.all_ssts_from_manifest().collect::>(); + let schema = ManifestSstEntry::schema().arrow_schema().clone(); + let batch = ManifestSstEntry::to_record_batch(&entries) + .map_err(DataFusionError::from) + .context(DataFusionSnafu)?; + + let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?; + Ok(Arc::new(table)) + } + + /// Expose SSTs found in storage as an in-memory table for inspection. + pub async fn inspect_sst_storage_provider(&self) -> Result> { + let mito = { + let guard = self.inner.mito_engine.read().unwrap(); + guard.as_ref().cloned().context(UnexpectedSnafu { + violated: "mito engine not available", + })? + }; + let entries = mito + .all_ssts_from_storage() + .try_collect::>() + .await + .context(ListStorageSstsSnafu)?; + let schema = StorageSstEntry::schema().arrow_schema().clone(); + let batch = StorageSstEntry::to_record_batch(&entries) + .map_err(DataFusionError::from) + .context(DataFusionSnafu)?; + + let table = MemTable::try_new(schema, vec![vec![batch]]).context(DataFusionSnafu)?; + Ok(Arc::new(table)) + } +} + +/// A catalog list that resolves `TableProvider` by table name: +/// - For reserved internal names, return inspection providers; +/// - Otherwise, fall back to the Region provider. +#[derive(Clone, Debug)] +pub(crate) struct NameAwareCatalogList { + catalog: NameAwareCatalogProvider, +} + +impl NameAwareCatalogList { + /// Creates the catalog list. + pub fn new(server: RegionServer, region_id: RegionId, query_ctx: QueryContextRef) -> Self { + let schema_provider = NameAwareSchemaProvider { + server, + region_id, + query_ctx, + }; + let catalog = NameAwareCatalogProvider { + schema: schema_provider, + }; + Self { catalog } + } +} + +impl CatalogProviderList for NameAwareCatalogList { + fn as_any(&self) -> &dyn std::any::Any { + self + } + fn register_catalog( + &self, + _name: String, + _catalog: Arc, + ) -> Option> { + None + } + fn catalog_names(&self) -> Vec { + vec![] + } + fn catalog(&self, _name: &str) -> Option> { + Some(Arc::new(self.catalog.clone())) + } +} + +#[derive(Clone, Debug)] +struct NameAwareCatalogProvider { + schema: NameAwareSchemaProvider, +} + +impl CatalogProvider for NameAwareCatalogProvider { + fn as_any(&self) -> &dyn std::any::Any { + self + } + fn schema_names(&self) -> Vec { + vec![] + } + fn schema(&self, _name: &str) -> Option> { + Some(Arc::new(self.schema.clone())) + } +} + +#[derive(Clone)] +struct NameAwareSchemaProvider { + server: RegionServer, + region_id: RegionId, + query_ctx: QueryContextRef, +} + +impl std::fmt::Debug for NameAwareSchemaProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "NameAwareSchemaProvider") + } +} + +#[async_trait::async_trait] +impl SchemaProvider for NameAwareSchemaProvider { + fn as_any(&self) -> &dyn std::any::Any { + self + } + fn table_names(&self) -> Vec { + vec![] + } + + async fn table(&self, name: &str) -> DfResult>> { + // Resolve inspect providers by reserved names. + if let Some(kind) = InternalTableKind::from_table_name(name) { + return kind + .table_provider(&self.server) + .await + .map(Some) + .map_err(|e| df_error::DataFusionError::External(Box::new(e))); + } + + // Fallback to region provider for any other table name. + let provider = self + .server + .table_provider(self.region_id, Some(self.query_ctx.clone())) + .await + .map_err(|e| df_error::DataFusionError::External(Box::new(e)))?; + Ok(Some(provider)) + } + + fn table_exist(&self, _name: &str) -> bool { + true + } +} +/// Builds a `NameAwareDataSourceInjector` from a logical plan. +/// +/// It scans the plan to determine: +/// - whether a Region `TableSource` is required, and +/// - which internal inspection sources are referenced. +pub(crate) struct NameAwareDataSourceInjectorBuilder { + /// Whether the plan requires a Region `TableSource`. + need_region_provider: bool, + /// Internal table kinds referenced by the plan. + reserved_table_needed: Vec, +} + +impl NameAwareDataSourceInjectorBuilder { + /// Walk the `LogicalPlan` to determine whether a Region source is needed, + /// and collect the kinds of internal sources required. + pub fn from_plan(plan: &LogicalPlan) -> DfResult { + let mut need_region_provider = false; + let mut reserved_table_needed = Vec::new(); + plan.apply(|node| { + if let LogicalPlan::TableScan(ts) = node { + let name = ts.table_name.to_string(); + if let Some(kind) = InternalTableKind::from_table_name(&name) { + if !reserved_table_needed.contains(&kind) { + reserved_table_needed.push(kind); + } + } else { + // Any normal table scan implies a Region source is needed. + need_region_provider = true; + } + } + Ok(TreeNodeRecursion::Continue) + })?; + + Ok(Self { + need_region_provider, + reserved_table_needed, + }) + } + + pub async fn build( + self, + server: &RegionServer, + region_id: RegionId, + query_ctx: QueryContextRef, + ) -> Result { + let region = if self.need_region_provider { + let provider = server.table_provider(region_id, Some(query_ctx)).await?; + Some(provider_as_source(provider)) + } else { + None + }; + + let mut reserved_sources = HashMap::new(); + for kind in &self.reserved_table_needed { + let provider = kind.table_provider(server).await?; + reserved_sources.insert(*kind, provider_as_source(provider)); + } + + Ok(NameAwareDataSourceInjector { + reserved_sources, + region_source: region, + }) + } +} + +/// Rewrites `LogicalPlan` to inject proper data sources for `TableScan`. +/// Uses internal sources for reserved tables; otherwise uses the Region source. +pub(crate) struct NameAwareDataSourceInjector { + /// Sources for reserved internal tables, keyed by kind. + reserved_sources: HashMap>, + /// Optional Region-level source used for normal tables. + region_source: Option>, +} + +impl TreeNodeRewriter for NameAwareDataSourceInjector { + type Node = LogicalPlan; + + fn f_up(&mut self, node: Self::Node) -> DfResult> { + Ok(match node { + LogicalPlan::TableScan(mut scan) => { + let name = scan.table_name.to_string(); + if let Some(kind) = InternalTableKind::from_table_name(&name) + && let Some(source) = self.reserved_sources.get(&kind) + { + // Matched a reserved internal table: rewrite to its dedicated source. + scan.source = source.clone(); + } else { + let Some(region) = &self.region_source else { + // Region source required but not constructed; this is unexpected. + return Err(datafusion::error::DataFusionError::Plan( + "region provider not available".to_string(), + )); + }; + // Normal table: rewrite to the Region source. + scan.source = region.clone(); + } + Transformed::yes(LogicalPlan::TableScan(scan)) + } + _ => Transformed::no(node), + }) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use datafusion::catalog::MemTable as DfMemTable; + use datafusion_common::tree_node::TreeNode; + use datafusion_expr::{LogicalPlanBuilder, table_scan}; + use datatypes::arrow::array::Int32Array; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; + use datatypes::arrow::record_batch::RecordBatch; + + use super::*; // bring rewrite() into scope + + fn test_schema() -> Schema { + Schema::new(vec![Field::new("a", DataType::Int32, true)]) + } + + fn empty_mem_table() -> Arc { + let schema = Arc::new(test_schema()); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(Vec::::new()))], + ) + .unwrap(); + Arc::new(DfMemTable::try_new(schema, vec![vec![batch]]).unwrap()) + } + + #[test] + fn test_injector_builder_from_plan_flags() { + let schema = test_schema(); + let reserved = ManifestSstEntry::reserved_table_name_for_inspection(); + // plan1: reserved table scan only + let plan1 = table_scan(Some(reserved), &schema, None) + .unwrap() + .build() + .unwrap(); + let b1 = NameAwareDataSourceInjectorBuilder::from_plan(&plan1).unwrap(); + assert!(!b1.need_region_provider); + assert_eq!( + b1.reserved_table_needed, + vec![InternalTableKind::InspectSstManifest] + ); + + // plan2: normal table scan only + let plan2 = table_scan(Some("normal_table"), &schema, None) + .unwrap() + .build() + .unwrap(); + let b2 = NameAwareDataSourceInjectorBuilder::from_plan(&plan2).unwrap(); + assert!(b2.need_region_provider); + assert!(b2.reserved_table_needed.is_empty()); + + // plan3: both reserved and normal (via UNION) + let p_res = table_scan(Some(reserved), &schema, None) + .unwrap() + .build() + .unwrap(); + let p_norm = table_scan(Some("normal_table"), &schema, None) + .unwrap() + .build() + .unwrap(); + let plan3 = LogicalPlanBuilder::from(p_res) + .union(LogicalPlanBuilder::from(p_norm).build().unwrap()) + .unwrap() + .build() + .unwrap(); + let b3 = NameAwareDataSourceInjectorBuilder::from_plan(&plan3).unwrap(); + assert!(b3.need_region_provider); + assert_eq!( + b3.reserved_table_needed, + vec![InternalTableKind::InspectSstManifest] + ); + } + + #[test] + fn test_rewriter_replaces_with_reserved_source() { + let schema = test_schema(); + let table_name = ManifestSstEntry::reserved_table_name_for_inspection(); + let plan = table_scan(Some(table_name), &schema, None) + .unwrap() + .build() + .unwrap(); + + let provider = empty_mem_table(); + let source = provider_as_source(provider); + + let mut injector = NameAwareDataSourceInjector { + reserved_sources: { + let mut m = HashMap::new(); + m.insert(InternalTableKind::InspectSstManifest, source.clone()); + m + }, + region_source: None, + }; + + let transformed = plan.rewrite(&mut injector).unwrap(); + let new_plan = transformed.data; + + if let LogicalPlan::TableScan(scan) = new_plan { + // Compare the underlying Arc ptrs to ensure replacement happened + let src_ptr = Arc::as_ptr(&scan.source); + let want_ptr = Arc::as_ptr(&source); + assert!(std::ptr::eq(src_ptr, want_ptr)); + } else { + panic!("expected TableScan after rewrite"); + } + } + + #[test] + fn test_rewriter_replaces_with_region_source_for_normal() { + let schema = test_schema(); + let plan = table_scan(Some("normal_table"), &schema, None) + .unwrap() + .build() + .unwrap(); + + let provider = empty_mem_table(); + let region_source = provider_as_source(provider); + + let mut injector = NameAwareDataSourceInjector { + reserved_sources: HashMap::new(), + region_source: Some(region_source.clone()), + }; + + let transformed = plan.rewrite(&mut injector).unwrap(); + let new_plan = transformed.data; + + if let LogicalPlan::TableScan(scan) = new_plan { + let src_ptr = Arc::as_ptr(&scan.source); + let want_ptr = Arc::as_ptr(®ion_source); + assert!(std::ptr::eq(src_ptr, want_ptr)); + } else { + panic!("expected TableScan after rewrite"); + } + } +} diff --git a/tests-integration/src/cluster.rs b/tests-integration/src/cluster.rs index e73f02b467..1676d40e99 100644 --- a/tests-integration/src/cluster.rs +++ b/tests-integration/src/cluster.rs @@ -389,8 +389,10 @@ impl GreptimeDbClusterBuilder { .build(), ); - let information_extension = - Arc::new(DistributedInformationExtension::new(meta_client.clone())); + let information_extension = Arc::new(DistributedInformationExtension::new( + meta_client.clone(), + datanode_clients.clone(), + )); let catalog_manager = KvBackendCatalogManagerBuilder::new( information_extension, cached_meta_backend.clone(),