refactor: RegionRequestHandler -> RegionQueryHandler (#2439)

* refactor: RegionRequestHandler -> RegionQueryHandler

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: rename FrontendRegionQueryHandler

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: privte RegionInvoker

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2023-09-19 16:19:58 +08:00
committed by GitHub
parent 5805e8d4b6
commit deac284973
13 changed files with 62 additions and 65 deletions

1
Cargo.lock generated
View File

@@ -7221,6 +7221,7 @@ name = "query"
version = "0.4.0-nightly"
dependencies = [
"ahash 0.8.3",
"api",
"approx_eq",
"arc-swap",
"arrow",

View File

@@ -19,7 +19,6 @@ pub mod error;
pub mod load_balance;
mod metrics;
pub mod region;
pub mod region_handler;
mod stream_insert;
pub use api;

View File

@@ -12,12 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod distributed;
mod grpc;
mod influxdb;
mod opentsdb;
mod otlp;
mod prom_store;
mod region_query;
mod script;
mod standalone;
use std::collections::HashMap;
@@ -82,7 +82,7 @@ use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
pub use standalone::StandaloneDatanodeManager;
use self::distributed::DistRegionRequestHandler;
use self::region_query::FrontendRegionQueryHandler;
use self::standalone::StandaloneTableMetadataCreator;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, MissingMetasrvOptsSnafu,
@@ -158,14 +158,14 @@ impl Instance {
);
let partition_manager = Arc::new(PartitionRuleManager::new(meta_backend.clone()));
let region_request_handler = DistRegionRequestHandler::arc(
let region_query_handler = FrontendRegionQueryHandler::arc(
partition_manager.clone(),
catalog_manager.datanode_manager().clone(),
);
let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_request_handler.clone()),
Some(region_query_handler.clone()),
true,
plugins.clone(),
)
@@ -298,12 +298,12 @@ impl Instance {
let partition_manager = Arc::new(PartitionRuleManager::new(kv_backend.clone()));
let datanode_manager = Arc::new(StandaloneDatanodeManager(region_server));
let region_request_handler =
DistRegionRequestHandler::arc(partition_manager.clone(), datanode_manager.clone());
let region_query_handler =
FrontendRegionQueryHandler::arc(partition_manager.clone(), datanode_manager.clone());
let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(region_request_handler),
Some(region_query_handler),
true,
plugins.clone(),
)

View File

@@ -16,23 +16,23 @@ use std::sync::Arc;
use api::v1::region::QueryRequest;
use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region_handler::RegionRequestHandler;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::DatanodeManagerRef;
use common_recordbatch::SendableRecordBatchStream;
use partition::manager::PartitionRuleManagerRef;
use query::error::{RegionQuerySnafu, Result as QueryResult};
use query::region_query::RegionQueryHandler;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use crate::error::{FindDatanodeSnafu, FindTableRouteSnafu, RequestQuerySnafu, Result};
pub(crate) struct DistRegionRequestHandler {
pub(crate) struct FrontendRegionQueryHandler {
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
}
impl DistRegionRequestHandler {
impl FrontendRegionQueryHandler {
pub fn arc(
partition_manager: PartitionRuleManagerRef,
datanode_manager: DatanodeManagerRef,
@@ -45,16 +45,16 @@ impl DistRegionRequestHandler {
}
#[async_trait]
impl RegionRequestHandler for DistRegionRequestHandler {
async fn do_get(&self, request: QueryRequest) -> ClientResult<SendableRecordBatchStream> {
impl RegionQueryHandler for FrontendRegionQueryHandler {
async fn do_get(&self, request: QueryRequest) -> QueryResult<SendableRecordBatchStream> {
self.do_get_inner(request)
.await
.map_err(BoxedError::new)
.context(HandleRequestSnafu)
.context(RegionQuerySnafu)
}
}
impl DistRegionRequestHandler {
impl FrontendRegionQueryHandler {
async fn do_get_inner(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
let region_id = RegionId::from_u64(request.region_id);

View File

@@ -17,9 +17,7 @@ use std::sync::Arc;
use api::v1::meta::Partition;
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region::check_response_header;
use client::region_handler::RegionRequestHandler;
use common_error::ext::BoxedError;
use common_meta::datanode_manager::{AffectedRows, Datanode, DatanodeManager, DatanodeRef};
use common_meta::ddl::{TableMetadataAllocator, TableMetadataAllocatorContext};
@@ -39,11 +37,21 @@ use crate::error::{InvalidRegionRequestSnafu, InvokeRegionServerSnafu, Result};
const TABLE_ID_SEQ: &str = "table_id";
pub(crate) struct StandaloneRegionRequestHandler {
pub struct StandaloneDatanodeManager(pub RegionServer);
#[async_trait]
impl DatanodeManager for StandaloneDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
RegionInvoker::arc(self.0.clone())
}
}
/// Relative to [client::region::RegionRequester]
struct RegionInvoker {
region_server: RegionServer,
}
impl StandaloneRegionRequestHandler {
impl RegionInvoker {
pub fn arc(region_server: RegionServer) -> Arc<Self> {
Arc::new(Self { region_server })
}
@@ -61,18 +69,7 @@ impl StandaloneRegionRequestHandler {
}
#[async_trait]
impl RegionRequestHandler for StandaloneRegionRequestHandler {
async fn do_get(&self, request: QueryRequest) -> ClientResult<SendableRecordBatchStream> {
self.region_server
.handle_read(request)
.await
.map_err(BoxedError::new)
.context(HandleRequestSnafu)
}
}
#[async_trait]
impl Datanode for StandaloneRegionRequestHandler {
impl Datanode for RegionInvoker {
async fn handle(&self, request: RegionRequest) -> MetaResult<AffectedRows> {
let response = self
.handle_inner(request)
@@ -94,15 +91,6 @@ impl Datanode for StandaloneRegionRequestHandler {
}
}
pub struct StandaloneDatanodeManager(pub RegionServer);
#[async_trait]
impl DatanodeManager for StandaloneDatanodeManager {
async fn datanode(&self, _datanode: &Peer) -> DatanodeRef {
StandaloneRegionRequestHandler::arc(self.0.clone())
}
}
pub(crate) struct StandaloneTableMetadataCreator {
table_id_sequence: SequenceRef,
}

View File

@@ -6,6 +6,7 @@ license.workspace = true
[dependencies]
ahash = { version = "0.8", features = ["compile-time-rng"] }
api.workspace = true
arc-swap = "1.0"
arrow-schema.workspace = true
arrow.workspace = true

View File

@@ -17,7 +17,6 @@ use std::sync::Arc;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use async_stream::stream;
use client::region_handler::RegionRequestHandlerRef;
use common_base::bytes::Bytes;
use common_error::ext::BoxedError;
use common_meta::table_name::TableName;
@@ -41,6 +40,7 @@ use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error::ConvertSchemaSnafu;
use crate::region_query::RegionQueryHandlerRef;
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct MergeScanLogicalPlan {
@@ -113,7 +113,7 @@ pub struct MergeScanExec {
substrait_plan: Bytes,
schema: SchemaRef,
arrow_schema: ArrowSchemaRef,
request_handler: RegionRequestHandlerRef,
region_query_handler: RegionQueryHandlerRef,
metric: ExecutionPlanMetricsSet,
}
@@ -133,7 +133,7 @@ impl MergeScanExec {
regions: Vec<RegionId>,
substrait_plan: Bytes,
arrow_schema: &ArrowSchema,
request_handler: RegionRequestHandlerRef,
region_query_handler: RegionQueryHandlerRef,
) -> Result<Self> {
let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema);
let schema_without_metadata =
@@ -144,7 +144,7 @@ impl MergeScanExec {
substrait_plan,
schema: schema_without_metadata,
arrow_schema: arrow_schema_without_metadata,
request_handler,
region_query_handler,
metric: ExecutionPlanMetricsSet::new(),
})
}
@@ -152,7 +152,7 @@ impl MergeScanExec {
pub fn to_stream(&self, _context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
let substrait_plan = self.substrait_plan.to_vec();
let regions = self.regions.clone();
let request_handler = self.request_handler.clone();
let region_query_handler = self.region_query_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
let stream = Box::pin(stream!({
@@ -166,7 +166,7 @@ impl MergeScanExec {
region_id: region_id.into(),
plan: substrait_plan.clone(),
};
let mut stream = request_handler
let mut stream = region_query_handler
.do_get(request)
.await
.map_err(BoxedError::new)

View File

@@ -18,7 +18,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use client::region_handler::RegionRequestHandlerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::table_name::TableName;
use datafusion::common::Result;
@@ -39,20 +38,21 @@ use table::table::adapter::DfTableProviderAdapter;
use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan};
use crate::error;
use crate::error::{CatalogSnafu, TableNotFoundSnafu};
use crate::region_query::RegionQueryHandlerRef;
pub struct DistExtensionPlanner {
catalog_manager: CatalogManagerRef,
request_handler: RegionRequestHandlerRef,
region_query_handler: RegionQueryHandlerRef,
}
impl DistExtensionPlanner {
pub fn new(
catalog_manager: CatalogManagerRef,
request_handler: RegionRequestHandlerRef,
region_query_handler: RegionQueryHandlerRef,
) -> Self {
Self {
catalog_manager,
request_handler,
region_query_handler,
}
}
}
@@ -108,7 +108,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
regions,
substrait_plan,
&schema,
self.request_handler.clone(),
self.region_query_handler.clone(),
)?;
Ok(Some(Arc::new(merge_scan_plan) as _))
}

View File

@@ -257,6 +257,12 @@ pub enum Error {
#[snafu(display("Column schema has no default value, column: {}", column))]
ColumnSchemaNoDefault { column: String, location: Location },
#[snafu(display("Region query error, source: {}", source))]
RegionQuery {
source: BoxedError,
location: Location,
},
}
impl ErrorExt for Error {
@@ -303,6 +309,7 @@ impl ErrorExt for Error {
RemoteRequest { source, .. } => source.status_code(),
UnexpectedOutputKind { .. } => StatusCode::Unexpected,
CreateSchema { source, .. } => source.status_code(),
RegionQuery { source, .. } => source.status_code(),
}
}

View File

@@ -30,6 +30,7 @@ pub mod plan;
pub mod planner;
pub mod query_engine;
mod range_select;
pub mod region_query;
pub mod sql;
pub use crate::datafusion::DfContextProviderAdapter;

View File

@@ -21,7 +21,6 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use client::region_handler::RegionRequestHandlerRef;
use common_base::Plugins;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY};
@@ -39,6 +38,7 @@ use crate::plan::LogicalPlan;
use crate::planner::LogicalPlanner;
pub use crate::query_engine::context::QueryEngineContext;
pub use crate::query_engine::state::QueryEngineState;
use crate::region_query::RegionQueryHandlerRef;
pub type SqlStatementExecutorRef = Arc<dyn SqlStatementExecutor>;
@@ -87,12 +87,12 @@ pub struct QueryEngineFactory {
impl QueryEngineFactory {
pub fn new(
catalog_manager: CatalogManagerRef,
request_handler: Option<RegionRequestHandlerRef>,
region_query_handler: Option<RegionQueryHandlerRef>,
with_dist_planner: bool,
) -> Self {
Self::new_with_plugins(
catalog_manager,
request_handler,
region_query_handler,
with_dist_planner,
Default::default(),
)
@@ -100,13 +100,13 @@ impl QueryEngineFactory {
pub fn new_with_plugins(
catalog_manager: CatalogManagerRef,
request_handler: Option<RegionRequestHandlerRef>,
region_query_handler: Option<RegionQueryHandlerRef>,
with_dist_planner: bool,
plugins: Arc<Plugins>,
) -> Self {
let state = Arc::new(QueryEngineState::new(
catalog_manager,
request_handler,
region_query_handler,
with_dist_planner,
plugins.clone(),
));

View File

@@ -18,7 +18,6 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use client::region_handler::RegionRequestHandlerRef;
use common_base::Plugins;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::physical_plan::SessionContext;
@@ -48,6 +47,7 @@ use crate::optimizer::string_normalization::StringNormalizationRule;
use crate::optimizer::type_conversion::TypeConversionRule;
use crate::query_engine::options::QueryOptions;
use crate::range_select::planner::RangeSelectPlanner;
use crate::region_query::RegionQueryHandlerRef;
/// Query engine global state
// TODO(yingwen): This QueryEngineState still relies on datafusion, maybe we can define a trait for it,
@@ -72,7 +72,7 @@ impl fmt::Debug for QueryEngineState {
impl QueryEngineState {
pub fn new(
catalog_list: CatalogManagerRef,
request_handler: Option<RegionRequestHandlerRef>,
region_query_handler: Option<RegionQueryHandlerRef>,
with_dist_planner: bool,
plugins: Arc<Plugins>,
) -> Self {
@@ -113,7 +113,7 @@ impl QueryEngineState {
.with_analyzer_rules(analyzer.rules)
.with_query_planner(Arc::new(DfQueryPlanner::new(
catalog_list.clone(),
request_handler,
region_query_handler,
)))
.with_optimizer_rules(optimizer.rules)
.with_physical_optimizer_rules(physical_optimizers);
@@ -221,14 +221,14 @@ impl QueryPlanner for DfQueryPlanner {
impl DfQueryPlanner {
fn new(
catalog_manager: CatalogManagerRef,
request_handler: Option<RegionRequestHandlerRef>,
region_query_handler: Option<RegionQueryHandlerRef>,
) -> Self {
let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
if let Some(request_handler) = request_handler {
if let Some(region_query_handler) = region_query_handler {
planners.push(Arc::new(DistExtensionPlanner::new(
catalog_manager,
request_handler,
region_query_handler,
)));
}
Self {

View File

@@ -21,9 +21,9 @@ use common_recordbatch::SendableRecordBatchStream;
use crate::error::Result;
#[async_trait]
pub trait RegionRequestHandler: Send + Sync {
pub trait RegionQueryHandler: Send + Sync {
// TODO(ruihang): add trace id and span id in the request.
async fn do_get(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
}
pub type RegionRequestHandlerRef = Arc<dyn RegionRequestHandler>;
pub type RegionQueryHandlerRef = Arc<dyn RegionQueryHandler>;