mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-06 05:12:54 +00:00
refactor: remove substrait ser/de for region query in standalone (#3812)
* refactor: remove substrait serde for region query in standalone * fix ci * move QueryRequest to common-query Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * format code Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * format toml file Signed-off-by: Ruihang Xia <waynestxia@gmail.com> * chore: format toml --------- Signed-off-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Ruihang Xia <waynestxia@gmail.com> Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -1560,6 +1560,7 @@ dependencies = [
|
||||
"parking_lot 0.12.3",
|
||||
"prometheus",
|
||||
"prost 0.12.6",
|
||||
"query",
|
||||
"rand",
|
||||
"serde_json",
|
||||
"snafu 0.8.3",
|
||||
@@ -1955,11 +1956,13 @@ dependencies = [
|
||||
"common-macro",
|
||||
"common-procedure",
|
||||
"common-procedure-test",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-telemetry",
|
||||
"common-time",
|
||||
"common-wal",
|
||||
"datafusion-common 38.0.0",
|
||||
"datafusion-expr 38.0.0",
|
||||
"datatypes",
|
||||
"derive_builder 0.12.0",
|
||||
"etcd-client",
|
||||
@@ -2048,6 +2051,7 @@ dependencies = [
|
||||
"sqlparser 0.45.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=54a267ac89c09b11c0c88934690530807185d3e7)",
|
||||
"sqlparser_derive 0.1.1",
|
||||
"statrs",
|
||||
"store-api",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
@@ -3215,6 +3219,7 @@ dependencies = [
|
||||
"session",
|
||||
"snafu 0.8.3",
|
||||
"store-api",
|
||||
"substrait 0.8.1",
|
||||
"table",
|
||||
"tokio",
|
||||
"toml 0.8.13",
|
||||
@@ -10244,7 +10249,6 @@ dependencies = [
|
||||
"common-base",
|
||||
"common-error",
|
||||
"common-macro",
|
||||
"common-query",
|
||||
"common-recordbatch",
|
||||
"common-wal",
|
||||
"datafusion-expr 38.0.0",
|
||||
|
||||
@@ -31,9 +31,11 @@ moka = { workspace = true, features = ["future"] }
|
||||
parking_lot = "0.12"
|
||||
prometheus.workspace = true
|
||||
prost.workspace = true
|
||||
query.workspace = true
|
||||
rand.workspace = true
|
||||
serde_json.workspace = true
|
||||
snafu.workspace = true
|
||||
substrait.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-stream = { workspace = true, features = ["net"] }
|
||||
tonic.workspace = true
|
||||
@@ -42,7 +44,6 @@ tonic.workspace = true
|
||||
common-grpc-expr.workspace = true
|
||||
datanode.workspace = true
|
||||
derive-new = "0.5"
|
||||
substrait.workspace = true
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
use api::v1::region::RegionRequest;
|
||||
use api::v1::ResponseHeader;
|
||||
use arc_swap::ArcSwapOption;
|
||||
use arrow_flight::Ticket;
|
||||
@@ -26,12 +26,15 @@ use common_error::status_code::StatusCode;
|
||||
use common_grpc::flight::{FlightDecoder, FlightMessage};
|
||||
use common_meta::error::{self as meta_error, Result as MetaResult};
|
||||
use common_meta::node_manager::Datanode;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::error::ExternalSnafu;
|
||||
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
|
||||
use common_telemetry::error;
|
||||
use common_telemetry::tracing_context::TracingContext;
|
||||
use prost::Message;
|
||||
use query::query_engine::DefaultSerializer;
|
||||
use snafu::{location, Location, OptionExt, ResultExt};
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
use tokio_stream::StreamExt;
|
||||
|
||||
use crate::error::{
|
||||
@@ -63,6 +66,17 @@ impl Datanode for RegionRequester {
|
||||
}
|
||||
|
||||
async fn handle_query(&self, request: QueryRequest) -> MetaResult<SendableRecordBatchStream> {
|
||||
let plan = DFLogicalSubstraitConvertor
|
||||
.encode(&request.plan, DefaultSerializer)
|
||||
.map_err(BoxedError::new)
|
||||
.context(meta_error::ExternalSnafu)?
|
||||
.to_vec();
|
||||
let request = api::v1::region::QueryRequest {
|
||||
header: request.header,
|
||||
region_id: request.region_id.as_u64(),
|
||||
plan,
|
||||
};
|
||||
|
||||
let ticket = Ticket {
|
||||
ticket: request.encode_to_vec().into(),
|
||||
};
|
||||
|
||||
@@ -25,11 +25,13 @@ common-grpc-expr.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-procedure.workspace = true
|
||||
common-procedure-test.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-telemetry.workspace = true
|
||||
common-time.workspace = true
|
||||
common-wal.workspace = true
|
||||
datafusion-common.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
datatypes.workspace = true
|
||||
derive_builder.workspace = true
|
||||
etcd-client.workspace = true
|
||||
|
||||
@@ -131,9 +131,10 @@ mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
use api::v1::region::RegionRequest;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use table::table_name::TableName;
|
||||
|
||||
|
||||
@@ -13,9 +13,10 @@
|
||||
// limitations under the License.
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::region::{QueryRequest, RegionRequest};
|
||||
use api::v1::region::RegionRequest;
|
||||
use common_error::ext::{BoxedError, ErrorExt, StackError};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::debug;
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
@@ -16,8 +16,9 @@ use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
@@ -16,8 +16,9 @@ use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::flow::{FlowRequest, FlowResponse};
|
||||
use api::v1::region::{InsertRequests, QueryRequest, RegionRequest};
|
||||
use api::v1::region::{InsertRequests, RegionRequest};
|
||||
pub use common_base::AffectedRows;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
|
||||
use crate::cache_invalidator::DummyCacheInvalidator;
|
||||
|
||||
@@ -27,6 +27,7 @@ snafu.workspace = true
|
||||
sqlparser.workspace = true
|
||||
sqlparser_derive = "0.1"
|
||||
statrs = "0.16"
|
||||
store-api.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
common-base.workspace = true
|
||||
|
||||
@@ -17,6 +17,7 @@ pub mod error;
|
||||
mod function;
|
||||
pub mod logical_plan;
|
||||
pub mod prelude;
|
||||
pub mod request;
|
||||
mod signature;
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub mod test_util;
|
||||
|
||||
29
src/common/query/src/request.rs
Normal file
29
src/common/query/src/request.rs
Normal file
@@ -0,0 +1,29 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use api::v1::region::RegionRequestHeader;
|
||||
use datafusion_expr::LogicalPlan;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
/// The query request to be handled by the RegionServer (Datanode).
|
||||
pub struct QueryRequest {
|
||||
/// The header of this request. Often to store some context of the query. None means all to defaults.
|
||||
pub header: Option<RegionRequestHeader>,
|
||||
|
||||
/// The id of the region to be queried.
|
||||
pub region_id: RegionId,
|
||||
|
||||
/// The form of the query: a logical plan.
|
||||
pub plan: LogicalPlan,
|
||||
}
|
||||
@@ -20,7 +20,7 @@ use datafusion::logical_expr::LogicalPlan;
|
||||
use crate::error::Result;
|
||||
use crate::logical_plan::SubstraitPlanDecoder;
|
||||
|
||||
/// Dummy `[SubstraitPlanDecoder]` for test.
|
||||
/// Dummy [`SubstraitPlanDecoder`] for test.
|
||||
pub struct DummyDecoder;
|
||||
|
||||
impl DummyDecoder {
|
||||
|
||||
@@ -57,6 +57,7 @@ servers.workspace = true
|
||||
session.workspace = true
|
||||
snafu.workspace = true
|
||||
store-api.workspace = true
|
||||
substrait.workspace = true
|
||||
table.workspace = true
|
||||
tokio.workspace = true
|
||||
toml.workspace = true
|
||||
|
||||
@@ -394,6 +394,14 @@ pub enum Error {
|
||||
location: Location,
|
||||
source: BoxedError,
|
||||
},
|
||||
|
||||
#[snafu(display("DataFusion"))]
|
||||
DataFusion {
|
||||
#[snafu(source)]
|
||||
error: datafusion::error::DataFusionError,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -446,7 +454,8 @@ impl ErrorExt for Error {
|
||||
| IncorrectInternalState { .. }
|
||||
| ShutdownInstance { .. }
|
||||
| RegionEngineNotFound { .. }
|
||||
| UnsupportedOutput { .. } => StatusCode::Internal,
|
||||
| UnsupportedOutput { .. }
|
||||
| DataFusion { .. } => StatusCode::Internal,
|
||||
|
||||
RegionNotFound { .. } => StatusCode::RegionNotFound,
|
||||
RegionNotReady { .. } => StatusCode::RegionNotReady,
|
||||
|
||||
@@ -18,13 +18,14 @@ use std::ops::Deref;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::region::{region_request, QueryRequest, RegionResponse as RegionResponseV1};
|
||||
use api::v1::region::{region_request, RegionResponse as RegionResponseV1};
|
||||
use api::v1::{ResponseHeader, Status};
|
||||
use arrow_flight::{FlightData, Ticket};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_query::OutputData;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_runtime::Runtime;
|
||||
@@ -32,6 +33,10 @@ use common_telemetry::tracing::{self, info_span};
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
use common_telemetry::{error, info, warn};
|
||||
use dashmap::DashMap;
|
||||
use datafusion::datasource::{provider_as_source, TableProvider};
|
||||
use datafusion::error::Result as DfResult;
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
|
||||
use datafusion_expr::{LogicalPlan, TableSource};
|
||||
use futures_util::future::try_join_all;
|
||||
use metric_engine::engine::MetricEngine;
|
||||
use mito2::engine::MITO_ENGINE_NAME;
|
||||
@@ -44,7 +49,7 @@ use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as S
|
||||
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
|
||||
use servers::grpc::region_server::RegionServerHandler;
|
||||
use session::context::{QueryContextBuilder, QueryContextRef};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use snafu::{ensure, OptionExt, ResultExt};
|
||||
use store_api::metric_engine_consts::{
|
||||
FILE_ENGINE_NAME, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME,
|
||||
};
|
||||
@@ -56,10 +61,10 @@ use store_api::storage::RegionId;
|
||||
use tonic::{Request, Response, Result as TonicResult};
|
||||
|
||||
use crate::error::{
|
||||
self, BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
|
||||
FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu,
|
||||
NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result,
|
||||
StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
|
||||
self, BuildRegionRequestsSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
|
||||
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu,
|
||||
HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
|
||||
RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
|
||||
};
|
||||
use crate::event_listener::RegionServerEventListenerRef;
|
||||
|
||||
@@ -138,9 +143,94 @@ impl RegionServer {
|
||||
self.inner.handle_request(region_id, request).await
|
||||
}
|
||||
|
||||
async fn table_provider(&self, region_id: RegionId) -> Result<Arc<dyn TableProvider>> {
|
||||
let status = self
|
||||
.inner
|
||||
.region_map
|
||||
.get(®ion_id)
|
||||
.context(RegionNotFoundSnafu { region_id })?
|
||||
.clone();
|
||||
ensure!(
|
||||
matches!(status, RegionEngineWithStatus::Ready(_)),
|
||||
RegionNotReadySnafu { region_id }
|
||||
);
|
||||
|
||||
self.inner
|
||||
.table_provider_factory
|
||||
.create(region_id, status.into_engine())
|
||||
.await
|
||||
.context(ExecuteLogicalPlanSnafu)
|
||||
}
|
||||
|
||||
/// Handle reads from remote. They're often query requests received by our Arrow Flight service.
|
||||
pub async fn handle_remote_read(
|
||||
&self,
|
||||
request: api::v1::region::QueryRequest,
|
||||
) -> Result<SendableRecordBatchStream> {
|
||||
let region_id = RegionId::from_u64(request.region_id);
|
||||
let provider = self.table_provider(region_id).await?;
|
||||
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(provider));
|
||||
|
||||
let query_ctx: QueryContextRef = request
|
||||
.header
|
||||
.as_ref()
|
||||
.map(|h| Arc::new(h.into()))
|
||||
.unwrap_or_else(|| Arc::new(QueryContextBuilder::default().build()));
|
||||
|
||||
let decoder = self
|
||||
.inner
|
||||
.query_engine
|
||||
.engine_context(query_ctx)
|
||||
.new_plan_decoder()
|
||||
.context(NewPlanDecoderSnafu)?;
|
||||
|
||||
let plan = decoder
|
||||
.decode(Bytes::from(request.plan), catalog_list, false)
|
||||
.await
|
||||
.context(DecodeLogicalPlanSnafu)?;
|
||||
|
||||
self.inner
|
||||
.handle_read(QueryRequest {
|
||||
header: request.header,
|
||||
region_id,
|
||||
plan,
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
|
||||
self.inner.handle_read(request).await
|
||||
let provider = self.table_provider(request.region_id).await?;
|
||||
|
||||
struct RegionDataSourceInjector {
|
||||
source: Arc<dyn TableSource>,
|
||||
}
|
||||
|
||||
impl TreeNodeRewriter for RegionDataSourceInjector {
|
||||
type Node = LogicalPlan;
|
||||
|
||||
fn f_up(&mut self, node: Self::Node) -> DfResult<Transformed<Self::Node>> {
|
||||
Ok(match node {
|
||||
LogicalPlan::TableScan(mut scan) => {
|
||||
scan.source = self.source.clone();
|
||||
Transformed::yes(LogicalPlan::TableScan(scan))
|
||||
}
|
||||
_ => Transformed::no(node),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
let plan = request
|
||||
.plan
|
||||
.rewrite(&mut RegionDataSourceInjector {
|
||||
source: provider_as_source(provider),
|
||||
})
|
||||
.context(DataFusionSnafu)?
|
||||
.data;
|
||||
|
||||
self.inner
|
||||
.handle_read(QueryRequest { plan, ..request })
|
||||
.await
|
||||
}
|
||||
|
||||
/// Returns all opened and reportable regions.
|
||||
@@ -302,7 +392,7 @@ impl FlightCraft for RegionServer {
|
||||
request: Request<Ticket>,
|
||||
) -> TonicResult<Response<TonicStream<FlightData>>> {
|
||||
let ticket = request.into_inner().ticket;
|
||||
let request = QueryRequest::decode(ticket.as_ref())
|
||||
let request = api::v1::region::QueryRequest::decode(ticket.as_ref())
|
||||
.context(servers_error::InvalidFlightTicketSnafu)?;
|
||||
let tracing_context = request
|
||||
.header
|
||||
@@ -311,7 +401,7 @@ impl FlightCraft for RegionServer {
|
||||
.unwrap_or_default();
|
||||
|
||||
let result = self
|
||||
.handle_read(request)
|
||||
.handle_remote_read(request)
|
||||
.trace(tracing_context.attach(info_span!("RegionServer::handle_read")))
|
||||
.await?;
|
||||
|
||||
@@ -339,10 +429,6 @@ impl RegionEngineWithStatus {
|
||||
RegionEngineWithStatus::Ready(engine) => engine,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_registering(&self) -> bool {
|
||||
matches!(self, Self::Registering(_))
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for RegionEngineWithStatus {
|
||||
@@ -741,51 +827,16 @@ impl RegionServerInner {
|
||||
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
|
||||
// TODO(ruihang): add metrics and set trace id
|
||||
|
||||
let QueryRequest {
|
||||
header,
|
||||
region_id,
|
||||
plan,
|
||||
} = request;
|
||||
let region_id = RegionId::from_u64(region_id);
|
||||
|
||||
// Build query context from gRPC header
|
||||
let ctx: QueryContextRef = header
|
||||
let query_ctx: QueryContextRef = request
|
||||
.header
|
||||
.as_ref()
|
||||
.map(|h| Arc::new(h.into()))
|
||||
.unwrap_or_else(|| QueryContextBuilder::default().build().into());
|
||||
|
||||
// build dummy catalog list
|
||||
let region_status = self
|
||||
.region_map
|
||||
.get(®ion_id)
|
||||
.with_context(|| RegionNotFoundSnafu { region_id })?
|
||||
.clone();
|
||||
|
||||
if region_status.is_registering() {
|
||||
return error::RegionNotReadySnafu { region_id }.fail();
|
||||
}
|
||||
|
||||
let table_provider = self
|
||||
.table_provider_factory
|
||||
.create(region_id, region_status.into_engine())
|
||||
.await
|
||||
.context(ExecuteLogicalPlanSnafu)?;
|
||||
|
||||
let catalog_list = Arc::new(DummyCatalogList::with_table_provider(table_provider));
|
||||
let query_engine_ctx = self.query_engine.engine_context(ctx.clone());
|
||||
let plan_decoder = query_engine_ctx
|
||||
.new_plan_decoder()
|
||||
.context(NewPlanDecoderSnafu)?;
|
||||
|
||||
// decode substrait plan to logical plan and execute it
|
||||
let logical_plan = plan_decoder
|
||||
.decode(Bytes::from(plan), catalog_list, false)
|
||||
.await
|
||||
.context(DecodeLogicalPlanSnafu)?;
|
||||
|
||||
let result = self
|
||||
.query_engine
|
||||
.execute(logical_plan.into(), ctx)
|
||||
.execute(request.plan.into(), query_ctx)
|
||||
.await
|
||||
.context(ExecuteLogicalPlanSnafu)?;
|
||||
|
||||
|
||||
@@ -14,16 +14,15 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::region::QueryRequest;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::node_manager::NodeManagerRef;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use partition::manager::PartitionRuleManagerRef;
|
||||
use query::error::{RegionQuerySnafu, Result as QueryResult};
|
||||
use query::region_query::RegionQueryHandler;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
use crate::error::{FindTableRouteSnafu, RequestQuerySnafu, Result};
|
||||
|
||||
@@ -56,7 +55,7 @@ impl RegionQueryHandler for FrontendRegionQueryHandler {
|
||||
|
||||
impl FrontendRegionQueryHandler {
|
||||
async fn do_get_inner(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
|
||||
let region_id = RegionId::from_u64(request.region_id);
|
||||
let region_id = request.region_id;
|
||||
|
||||
let peer = &self
|
||||
.partition_manager
|
||||
|
||||
@@ -15,13 +15,14 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::region::RegionResponse;
|
||||
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse as RegionResponseV1};
|
||||
use api::v1::region::{RegionRequest, RegionResponse as RegionResponseV1};
|
||||
use async_trait::async_trait;
|
||||
use client::region::check_response_header;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::error::{self as meta_error, Result as MetaResult};
|
||||
use common_meta::node_manager::{Datanode, DatanodeRef, FlownodeRef, NodeManager};
|
||||
use common_meta::peer::Peer;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::tracing;
|
||||
use common_telemetry::tracing_context::{FutureExt, TracingContext};
|
||||
|
||||
@@ -18,7 +18,6 @@ use std::sync::Arc;
|
||||
|
||||
use api::v1::meta::{HeartbeatRequest, Role};
|
||||
use async_trait::async_trait;
|
||||
use common_catalog::consts::default_engine;
|
||||
use common_meta::RegionIdent;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -91,8 +90,7 @@ impl HeartbeatHandler for RegionFailureHandler {
|
||||
datanode_id: stat.id,
|
||||
table_id: region_id.table_id(),
|
||||
region_number: region_id.region_number(),
|
||||
// TODO(LFC): Use the actual table engine (maybe retrieve from heartbeat).
|
||||
engine: default_engine().to_string(),
|
||||
engine: x.engine.clone(),
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
@@ -109,6 +107,7 @@ impl HeartbeatHandler for RegionFailureHandler {
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
|
||||
use common_catalog::consts::default_engine;
|
||||
use common_meta::key::MAINTENANCE_KEY;
|
||||
use store_api::region_engine::RegionRole;
|
||||
use store_api::storage::RegionId;
|
||||
|
||||
@@ -18,10 +18,10 @@ use std::time::Duration;
|
||||
|
||||
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
|
||||
use async_stream::stream;
|
||||
use common_base::bytes::Bytes;
|
||||
use common_catalog::parse_catalog_and_schema_from_db_string;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_plugins::GREPTIME_EXEC_READ_COST;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
|
||||
use common_recordbatch::error::ExternalSnafu;
|
||||
use common_recordbatch::{
|
||||
@@ -40,7 +40,7 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
|
||||
use datafusion_physical_expr::EquivalenceProperties;
|
||||
use datatypes::schema::{Schema, SchemaRef};
|
||||
use futures_util::StreamExt;
|
||||
use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader};
|
||||
use greptime_proto::v1::region::RegionRequestHeader;
|
||||
use greptime_proto::v1::QueryContext;
|
||||
use meter_core::data::ReadItem;
|
||||
use meter_macros::read_meter;
|
||||
@@ -125,7 +125,7 @@ impl MergeScanLogicalPlan {
|
||||
pub struct MergeScanExec {
|
||||
table: TableName,
|
||||
regions: Vec<RegionId>,
|
||||
substrait_plan: Bytes,
|
||||
plan: LogicalPlan,
|
||||
schema: SchemaRef,
|
||||
arrow_schema: ArrowSchemaRef,
|
||||
region_query_handler: RegionQueryHandlerRef,
|
||||
@@ -150,7 +150,7 @@ impl MergeScanExec {
|
||||
pub fn new(
|
||||
table: TableName,
|
||||
regions: Vec<RegionId>,
|
||||
substrait_plan: Bytes,
|
||||
plan: LogicalPlan,
|
||||
arrow_schema: &ArrowSchema,
|
||||
region_query_handler: RegionQueryHandlerRef,
|
||||
query_ctx: QueryContextRef,
|
||||
@@ -166,7 +166,7 @@ impl MergeScanExec {
|
||||
Ok(Self {
|
||||
table,
|
||||
regions,
|
||||
substrait_plan,
|
||||
plan,
|
||||
schema: schema_without_metadata,
|
||||
arrow_schema: arrow_schema_without_metadata,
|
||||
region_query_handler,
|
||||
@@ -178,7 +178,6 @@ impl MergeScanExec {
|
||||
}
|
||||
|
||||
pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
|
||||
let substrait_plan = self.substrait_plan.to_vec();
|
||||
let regions = self.regions.clone();
|
||||
let region_query_handler = self.region_query_handler.clone();
|
||||
let metric = MergeScanMetric::new(&self.metric);
|
||||
@@ -192,6 +191,7 @@ impl MergeScanExec {
|
||||
let extensions = self.query_ctx.extensions();
|
||||
|
||||
let sub_sgate_metrics_moved = self.sub_stage_metrics.clone();
|
||||
let plan = self.plan.clone();
|
||||
let stream = Box::pin(stream!({
|
||||
MERGE_SCAN_REGIONS.observe(regions.len() as f64);
|
||||
let _finish_timer = metric.finish_time().timer();
|
||||
@@ -210,8 +210,8 @@ impl MergeScanExec {
|
||||
extensions: extensions.clone(),
|
||||
}),
|
||||
}),
|
||||
region_id: region_id.into(),
|
||||
plan: substrait_plan.clone(),
|
||||
region_id,
|
||||
plan: plan.clone(),
|
||||
};
|
||||
let mut stream = region_query_handler
|
||||
.do_get(request)
|
||||
|
||||
@@ -24,22 +24,19 @@ use datafusion::datasource::DefaultTableSource;
|
||||
use datafusion::execution::context::SessionState;
|
||||
use datafusion::physical_plan::ExecutionPlan;
|
||||
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
|
||||
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor};
|
||||
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
|
||||
use datafusion_common::TableReference;
|
||||
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
|
||||
use datafusion_optimizer::analyzer::Analyzer;
|
||||
use session::context::QueryContext;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::storage::RegionId;
|
||||
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
|
||||
pub use table::metadata::TableType;
|
||||
use table::table::adapter::DfTableProviderAdapter;
|
||||
use table::table_name::TableName;
|
||||
|
||||
use crate::dist_plan::merge_scan::{MergeScanExec, MergeScanLogicalPlan};
|
||||
use crate::error;
|
||||
use crate::error::{CatalogSnafu, TableNotFoundSnafu};
|
||||
use crate::query_engine::DefaultSerializer;
|
||||
use crate::region_query::RegionQueryHandlerRef;
|
||||
|
||||
pub struct DistExtensionPlanner {
|
||||
@@ -99,13 +96,6 @@ impl ExtensionPlanner for DistExtensionPlanner {
|
||||
|
||||
// TODO(ruihang): generate different execution plans for different variant merge operation
|
||||
let schema = optimized_plan.schema().as_ref().into();
|
||||
// Pass down the original plan, allow execution nodes to do their optimization
|
||||
let amended_plan = Self::plan_with_full_table_name(input_plan.clone(), &table_name)?;
|
||||
let substrait_plan = DFLogicalSubstraitConvertor
|
||||
.encode(&amended_plan, DefaultSerializer)
|
||||
.context(error::EncodeSubstraitLogicalPlanSnafu)?
|
||||
.into();
|
||||
|
||||
let query_ctx = session_state
|
||||
.config()
|
||||
.get_extension()
|
||||
@@ -113,7 +103,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
|
||||
let merge_scan_plan = MergeScanExec::new(
|
||||
table_name,
|
||||
regions,
|
||||
substrait_plan,
|
||||
input_plan.clone(),
|
||||
&schema,
|
||||
self.region_query_handler.clone(),
|
||||
query_ctx,
|
||||
@@ -130,12 +120,6 @@ impl DistExtensionPlanner {
|
||||
Ok(extractor.table_name)
|
||||
}
|
||||
|
||||
/// Apply the fully resolved table name to the TableScan plan
|
||||
fn plan_with_full_table_name(plan: LogicalPlan, name: &TableName) -> Result<LogicalPlan> {
|
||||
plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name))
|
||||
.map(|x| x.data)
|
||||
}
|
||||
|
||||
async fn get_regions(&self, table_name: &TableName) -> Result<Vec<RegionId>> {
|
||||
let table = self
|
||||
.catalog_manager
|
||||
@@ -230,24 +214,3 @@ impl TreeNodeVisitor<'_> for TableNameExtractor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct TableNameRewriter;
|
||||
|
||||
impl TableNameRewriter {
|
||||
fn rewrite_table_name(
|
||||
plan: LogicalPlan,
|
||||
name: &TableName,
|
||||
) -> datafusion_common::Result<Transformed<LogicalPlan>> {
|
||||
Ok(match plan {
|
||||
LogicalPlan::TableScan(mut table_scan) => {
|
||||
table_scan.table_name = TableReference::full(
|
||||
name.catalog_name.clone(),
|
||||
name.schema_name.clone(),
|
||||
name.table_name.clone(),
|
||||
);
|
||||
Transformed::yes(LogicalPlan::TableScan(table_scan))
|
||||
}
|
||||
_ => Transformed::no(plan),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -140,13 +140,6 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to encode Substrait logical plan"))]
|
||||
EncodeSubstraitLogicalPlan {
|
||||
source: substrait::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("General SQL error"))]
|
||||
Sql {
|
||||
#[snafu(implicit)]
|
||||
@@ -340,7 +333,6 @@ impl ErrorExt for Error {
|
||||
| ColumnSchemaNoDefault { .. } => StatusCode::InvalidArguments,
|
||||
|
||||
BuildBackend { .. } | ListObjects { .. } => StatusCode::StorageUnavailable,
|
||||
EncodeSubstraitLogicalPlan { source, .. } => source.status_code(),
|
||||
|
||||
ParseFileFormat { source, .. } | InferSchema { source, .. } => source.status_code(),
|
||||
|
||||
|
||||
@@ -14,8 +14,8 @@
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::v1::region::QueryRequest;
|
||||
use async_trait::async_trait;
|
||||
use common_query::request::QueryRequest;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
|
||||
use crate::error::Result;
|
||||
|
||||
@@ -14,7 +14,6 @@ async-trait.workspace = true
|
||||
common-base.workspace = true
|
||||
common-error.workspace = true
|
||||
common-macro.workspace = true
|
||||
common-query.workspace = true
|
||||
common-recordbatch.workspace = true
|
||||
common-wal.workspace = true
|
||||
datafusion-expr.workspace = true
|
||||
|
||||
@@ -75,7 +75,8 @@ impl ColumnMetadata {
|
||||
column_def.datatype_extension.clone(),
|
||||
)
|
||||
.into();
|
||||
ColumnSchema::new(column_def.name, data_type, column_def.is_nullable)
|
||||
ColumnSchema::new(&column_def.name, data_type, column_def.is_nullable)
|
||||
.with_time_index(column_def.semantic_type() == SemanticType::Timestamp)
|
||||
.with_default_constraint(default_constrain)
|
||||
.context(ConvertDatatypesSnafu)
|
||||
}
|
||||
|
||||
@@ -21,14 +21,13 @@ use std::sync::{Arc, Mutex};
|
||||
use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole};
|
||||
use api::region::RegionResponse;
|
||||
use async_trait::async_trait;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_query::error::ExecuteRepeatedlySnafu;
|
||||
use common_error::ext::{BoxedError, PlainError};
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_recordbatch::SendableRecordBatchStream;
|
||||
use datafusion_physical_plan::{DisplayAs, DisplayFormatType};
|
||||
use datatypes::schema::SchemaRef;
|
||||
use futures::future::join_all;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::OptionExt;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::logstore::entry;
|
||||
@@ -295,10 +294,12 @@ impl RegionScanner for SinglePartitionScanner {
|
||||
|
||||
fn scan_partition(&self, _partition: usize) -> Result<SendableRecordBatchStream, BoxedError> {
|
||||
let mut stream = self.stream.lock().unwrap();
|
||||
stream
|
||||
.take()
|
||||
.context(ExecuteRepeatedlySnafu)
|
||||
.map_err(BoxedError::new)
|
||||
stream.take().ok_or_else(|| {
|
||||
BoxedError::new(PlainError::new(
|
||||
"Not expected to run ExecutionPlan more than once".to_string(),
|
||||
StatusCode::Unexpected,
|
||||
))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -647,8 +647,6 @@ pub struct TableInfo {
|
||||
/// Id and version of the table.
|
||||
#[builder(default, setter(into))]
|
||||
pub ident: TableIdent,
|
||||
|
||||
// TODO(LFC): Remove the catalog, schema and table names from TableInfo.
|
||||
/// Name of the table.
|
||||
#[builder(setter(into))]
|
||||
pub name: String,
|
||||
|
||||
@@ -555,7 +555,7 @@ CREATE TABLE {table_name} (
|
||||
let region_id = RegionId::new(table_id, *region);
|
||||
|
||||
let stream = region_server
|
||||
.handle_read(RegionQueryRequest {
|
||||
.handle_remote_read(RegionQueryRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
plan: plan.to_vec(),
|
||||
..Default::default()
|
||||
|
||||
@@ -249,7 +249,7 @@ mod tests {
|
||||
let region_id = RegionId::new(table_id, *region);
|
||||
|
||||
let stream = region_server
|
||||
.handle_read(QueryRequest {
|
||||
.handle_remote_read(QueryRequest {
|
||||
region_id: region_id.as_u64(),
|
||||
plan: plan.to_vec(),
|
||||
..Default::default()
|
||||
|
||||
@@ -31,9 +31,9 @@ explain analyze SELECT count(*) FROM system_metrics;
|
||||
+-+-+-+
|
||||
| 0_| 0_|_MergeScanExec: REDACTED
|
||||
|_|_|_|
|
||||
| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[COUNT(greptime.public.system_REDACTED
|
||||
| 1_| 0_|_AggregateExec: mode=Final, gby=[], aggr=[COUNT(system_REDACTED
|
||||
|_|_|_CoalescePartitionsExec REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(greptime.public.system_REDACTED
|
||||
|_|_|_AggregateExec: mode=Partial, gby=[], aggr=[COUNT(system_REDACTED
|
||||
|_|_|_RepartitionExec: partitioning=REDACTED
|
||||
|_|_|_SinglePartitionScanner: <SendableRecordBatchStream> REDACTED
|
||||
|_|_|_|
|
||||
|
||||
Reference in New Issue
Block a user