diff --git a/Cargo.lock b/Cargo.lock index 689b92c80d..ca4617e506 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7221,6 +7221,7 @@ name = "query" version = "0.4.0-nightly" dependencies = [ "ahash 0.8.3", + "api", "approx_eq", "arc-swap", "arrow", diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index e43a6fd69d..23a67ebae1 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -19,7 +19,6 @@ pub mod error; pub mod load_balance; mod metrics; pub mod region; -pub mod region_handler; mod stream_insert; pub use api; diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index fabedf7e4e..bffefabd47 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -12,12 +12,12 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod distributed; mod grpc; mod influxdb; mod opentsdb; mod otlp; mod prom_store; +mod region_query; mod script; mod standalone; use std::collections::HashMap; @@ -82,7 +82,7 @@ use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; pub use standalone::StandaloneDatanodeManager; -use self::distributed::DistRegionRequestHandler; +use self::region_query::FrontendRegionQueryHandler; use self::standalone::StandaloneTableMetadataCreator; use crate::error::{ self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu, @@ -158,14 +158,14 @@ impl Instance { ); let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone())); - let region_request_handler = DistRegionRequestHandler::arc( + let region_query_handler = FrontendRegionQueryHandler::arc( partition_manager.clone(), catalog_manager.datanode_manager().clone(), ); let query_engine = QueryEngineFactory::new_with_plugins( catalog_manager.clone(), - Some(region_request_handler.clone()), + Some(region_query_handler.clone()), true, plugins.clone(), ) @@ -298,12 +298,12 @@ impl Instance { let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone())); let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server)); - let region_request_handler = - DistRegionRequestHandler::arc(partition_manager.clone(), datanode_manager.clone()); + let region_query_handler = + FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone()); let query_engine = QueryEngineFactory::new_with_plugins( catalog_manager.clone(), - Some(region_request_handler), + Some(region_query_handler), true, plugins.clone(), ) diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/region_query.rs similarity index 84% rename from src/frontend/src/instance/distributed.rs rename to src/frontend/src/instance/region_query.rs index a14264a3ee..7ba88dabc3 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/region_query.rs @@ -16,23 +16,23 @@ use std::sync::Arc; use api::v1::region::QueryRequest; use async_trait::async_trait; -use client::error::{HandleRequestSnafu, Result as ClientResult}; -use client::region_handler::RegionRequestHandler; use common_error::ext::BoxedError; use common_meta::datanode_manager::DatanodeManagerRef; use common_recordbatch::SendableRecordBatchStream; use partition::manager::PartitionRuleManagerRef; +use query::error::{RegionQuerySnafu, Result as QueryResult}; +use query::region_query::RegionQueryHandler; use snafu::{OptionExt, ResultExt}; use store_api::storage::RegionId; use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, RequestQuerySnafu, Result}; -pub(crate) struct DistRegionRequestHandler { +pub(crate) struct FrontendRegionQueryHandler { partition_manager: PartitionRuleManagerRef, datanode_manager: DatanodeManagerRef, } -impl DistRegionRequestHandler { +impl FrontendRegionQueryHandler { pub fn arc( partition_manager: PartitionRuleManagerRef, datanode_manager: DatanodeManagerRef, @@ -45,16 +45,16 @@ impl DistRegionRequestHandler { } #[async_trait] -impl RegionRequestHandler for DistRegionRequestHandler { - async fn do_get(&self, request: QueryRequest) -> ClientResult { +impl RegionQueryHandler for FrontendRegionQueryHandler { + async fn do_get(&self, request: QueryRequest) -> QueryResult { self.do_get_inner(request) .await .map_err(BoxedError::new) - .context(HandleRequestSnafu) + .context(RegionQuerySnafu) } } -impl DistRegionRequestHandler { +impl FrontendRegionQueryHandler { async fn do_get_inner(&self, request: QueryRequest) -> Result { let region_id = RegionId::from_u64(request.region_id); diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 9e3881ecf5..12228d1336 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -17,9 +17,7 @@ use std::sync::Arc; use api::v1::meta::Partition; use api::v1::region::{QueryRequest, RegionRequest, RegionResponse}; use async_trait::async_trait; -use client::error::{HandleRequestSnafu, Result as ClientResult}; use client::region::check_response_header; -use client::region_handler::RegionRequestHandler; use common_error::ext::BoxedError; use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef}; use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext}; @@ -39,11 +37,21 @@ use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result}; const TABLE_ID_SEQ: &str = "table_id"; -pub(crate) struct StandaloneRegionRequestHandler { +pub struct StandaloneDatanodeManager(pub RegionServer); + +#[async_trait] +impl DatanodeManager for StandaloneDatanodeManager { + async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { + RegionInvoker::arc(self.0.clone()) + } +} + +/// Relative to [client::region::RegionRequester] +struct RegionInvoker { region_server: RegionServer, } -impl StandaloneRegionRequestHandler { +impl RegionInvoker { pub fn arc(region_server: RegionServer) -> Arc { Arc::new(Self { region_server }) } @@ -61,18 +69,7 @@ impl StandaloneRegionRequestHandler { } #[async_trait] -impl RegionRequestHandler for StandaloneRegionRequestHandler { - async fn do_get(&self, request: QueryRequest) -> ClientResult { - self.region_server - .handle_read(request) - .await - .map_err(BoxedError::new) - .context(HandleRequestSnafu) - } -} - -#[async_trait] -impl Datanode for StandaloneRegionRequestHandler { +impl Datanode for RegionInvoker { async fn handle(&self, request: RegionRequest) -> MetaResult { let response = self .handle_inner(request) @@ -94,15 +91,6 @@ impl Datanode for StandaloneRegionRequestHandler { } } -pub struct StandaloneDatanodeManager(pub RegionServer); - -#[async_trait] -impl DatanodeManager for StandaloneDatanodeManager { - async fn datanode(&self, _datanode: &Peer) -> DatanodeRef { - StandaloneRegionRequestHandler::arc(self.0.clone()) - } -} - pub(crate) struct StandaloneTableMetadataCreator { table_id_sequence: SequenceRef, } diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 19d642e077..a3cfcae789 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -6,6 +6,7 @@ license.workspace = true [dependencies] ahash = { version = "0.8", features = ["compile-time-rng"] } +api.workspace = true arc-swap = "1.0" arrow-schema.workspace = true arrow.workspace = true diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 2fe69f0cc4..e2a9b9c825 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use async_stream::stream; -use client::region_handler::RegionRequestHandlerRef; use common_base::bytes::Bytes; use common_error::ext::BoxedError; use common_meta::table_name::TableName; @@ -41,6 +40,7 @@ use snafu::ResultExt; use store_api::storage::RegionId; use crate::error::ConvertSchemaSnafu; +use crate::region_query::RegionQueryHandlerRef; #[derive(Debug, Hash, PartialEq, Eq, Clone)] pub struct MergeScanLogicalPlan { @@ -113,7 +113,7 @@ pub struct MergeScanExec { substrait_plan: Bytes, schema: SchemaRef, arrow_schema: ArrowSchemaRef, - request_handler: RegionRequestHandlerRef, + region_query_handler: RegionQueryHandlerRef, metric: ExecutionPlanMetricsSet, } @@ -133,7 +133,7 @@ impl MergeScanExec { regions: Vec, substrait_plan: Bytes, arrow_schema: &ArrowSchema, - request_handler: RegionRequestHandlerRef, + region_query_handler: RegionQueryHandlerRef, ) -> Result { let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema); let schema_without_metadata = @@ -144,7 +144,7 @@ impl MergeScanExec { substrait_plan, schema: schema_without_metadata, arrow_schema: arrow_schema_without_metadata, - request_handler, + region_query_handler, metric: ExecutionPlanMetricsSet::new(), }) } @@ -152,7 +152,7 @@ impl MergeScanExec { pub fn to_stream(&self, _context: Arc) -> Result { let substrait_plan = self.substrait_plan.to_vec(); let regions = self.regions.clone(); - let request_handler = self.request_handler.clone(); + let region_query_handler = self.region_query_handler.clone(); let metric = MergeScanMetric::new(&self.metric); let stream = Box::pin(stream!({ @@ -166,7 +166,7 @@ impl MergeScanExec { region_id: region_id.into(), plan: substrait_plan.clone(), }; - let mut stream = request_handler + let mut stream = region_query_handler .do_get(request) .await .map_err(BoxedError::new) diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index 1a71e04349..c3e0cca94e 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use async_trait::async_trait; use catalog::CatalogManagerRef; -use client::region_handler::RegionRequestHandlerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_meta::table_name::TableName; use datafusion::common::Result; @@ -39,20 +38,21 @@ use table::table::adapter::DfTableProviderAdapter; use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan}; use crate::error; use crate::error::{CatalogSnafu, TableNotFoundSnafu}; +use crate::region_query::RegionQueryHandlerRef; pub struct DistExtensionPlanner { catalog_manager: CatalogManagerRef, - request_handler: RegionRequestHandlerRef, + region_query_handler: RegionQueryHandlerRef, } impl DistExtensionPlanner { pub fn new( catalog_manager: CatalogManagerRef, - request_handler: RegionRequestHandlerRef, + region_query_handler: RegionQueryHandlerRef, ) -> Self { Self { catalog_manager, - request_handler, + region_query_handler, } } } @@ -108,7 +108,7 @@ impl ExtensionPlanner for DistExtensionPlanner { regions, substrait_plan, &schema, - self.request_handler.clone(), + self.region_query_handler.clone(), )?; Ok(Some(Arc::new(merge_scan_plan) as _)) } diff --git a/src/query/src/error.rs b/src/query/src/error.rs index ced384a68b..f3b6b94914 100644 --- a/src/query/src/error.rs +++ b/src/query/src/error.rs @@ -257,6 +257,12 @@ pub enum Error { #[snafu(display("Column schema has no default value, column: {}", column))] ColumnSchemaNoDefault { column: String, location: Location }, + + #[snafu(display("Region query error, source: {}", source))] + RegionQuery { + source: BoxedError, + location: Location, + }, } impl ErrorExt for Error { @@ -303,6 +309,7 @@ impl ErrorExt for Error { RemoteRequest { source, .. } => source.status_code(), UnexpectedOutputKind { .. } => StatusCode::Unexpected, CreateSchema { source, .. } => source.status_code(), + RegionQuery { source, .. } => source.status_code(), } } diff --git a/src/query/src/lib.rs b/src/query/src/lib.rs index 891edbbe7d..dfdbe9a208 100644 --- a/src/query/src/lib.rs +++ b/src/query/src/lib.rs @@ -30,6 +30,7 @@ pub mod plan; pub mod planner; pub mod query_engine; mod range_select; +pub mod region_query; pub mod sql; pub use crate::datafusion::DfContextProviderAdapter; diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 3ac4bff86b..b0dde22c36 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -21,7 +21,6 @@ use std::sync::Arc; use async_trait::async_trait; use catalog::CatalogManagerRef; -use client::region_handler::RegionRequestHandlerRef; use common_base::Plugins; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY}; @@ -39,6 +38,7 @@ use crate::plan::LogicalPlan; use crate::planner::LogicalPlanner; pub use crate::query_engine::context::QueryEngineContext; pub use crate::query_engine::state::QueryEngineState; +use crate::region_query::RegionQueryHandlerRef; pub type SqlStatementExecutorRef = Arc; @@ -87,12 +87,12 @@ pub struct QueryEngineFactory { impl QueryEngineFactory { pub fn new( catalog_manager: CatalogManagerRef, - request_handler: Option, + region_query_handler: Option, with_dist_planner: bool, ) -> Self { Self::new_with_plugins( catalog_manager, - request_handler, + region_query_handler, with_dist_planner, Default::default(), ) @@ -100,13 +100,13 @@ impl QueryEngineFactory { pub fn new_with_plugins( catalog_manager: CatalogManagerRef, - request_handler: Option, + region_query_handler: Option, with_dist_planner: bool, plugins: Arc, ) -> Self { let state = Arc::new(QueryEngineState::new( catalog_manager, - request_handler, + region_query_handler, with_dist_planner, plugins.clone(), )); diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index f4d6b51631..93916db861 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -18,7 +18,6 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use catalog::CatalogManagerRef; -use client::region_handler::RegionRequestHandlerRef; use common_base::Plugins; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::physical_plan::SessionContext; @@ -48,6 +47,7 @@ use crate::optimizer::string_normalization::StringNormalizationRule; use crate::optimizer::type_conversion::TypeConversionRule; use crate::query_engine::options::QueryOptions; use crate::range_select::planner::RangeSelectPlanner; +use crate::region_query::RegionQueryHandlerRef; /// Query engine global state // TODO(yingwen): This QueryEngineState still relies on datafusion, maybe we can define a trait for it, @@ -72,7 +72,7 @@ impl fmt::Debug for QueryEngineState { impl QueryEngineState { pub fn new( catalog_list: CatalogManagerRef, - request_handler: Option, + region_query_handler: Option, with_dist_planner: bool, plugins: Arc, ) -> Self { @@ -113,7 +113,7 @@ impl QueryEngineState { .with_analyzer_rules(analyzer.rules) .with_query_planner(Arc::new(DfQueryPlanner::new( catalog_list.clone(), - request_handler, + region_query_handler, ))) .with_optimizer_rules(optimizer.rules) .with_physical_optimizer_rules(physical_optimizers); @@ -221,14 +221,14 @@ impl QueryPlanner for DfQueryPlanner { impl DfQueryPlanner { fn new( catalog_manager: CatalogManagerRef, - request_handler: Option, + region_query_handler: Option, ) -> Self { let mut planners: Vec> = vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)]; - if let Some(request_handler) = request_handler { + if let Some(region_query_handler) = region_query_handler { planners.push(Arc::new(DistExtensionPlanner::new( catalog_manager, - request_handler, + region_query_handler, ))); } Self { diff --git a/src/client/src/region_handler.rs b/src/query/src/region_query.rs similarity index 89% rename from src/client/src/region_handler.rs rename to src/query/src/region_query.rs index a3977d8fd6..ef6e494dfc 100644 --- a/src/client/src/region_handler.rs +++ b/src/query/src/region_query.rs @@ -21,9 +21,9 @@ use common_recordbatch::SendableRecordBatchStream; use crate::error::Result; #[async_trait] -pub trait RegionRequestHandler: Send + Sync { +pub trait RegionQueryHandler: Send + Sync { // TODO(ruihang): add trace id and span id in the request. async fn do_get(&self, request: QueryRequest) -> Result; } -pub type RegionRequestHandlerRef = Arc; +pub type RegionQueryHandlerRef = Arc;