feat: read/write works in distributed mode 🎉 (#2327)

* add do_get method to RegionRequestHandler

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move RegionRequestHandler to client crate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use RegionRequestHandler in MergeScan

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* minor fix

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* ignore tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix format

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-09-05 07:39:14 -05:00
parent 5f7d48f107
commit eb7116ab56
34 changed files with 309 additions and 137 deletions

2
Cargo.lock generated
View File

@@ -1539,6 +1539,7 @@ dependencies = [
"parking_lot",
"prost",
"rand",
"session",
"snafu",
"substrait 0.4.0-nightly",
"substrait 0.7.5",
@@ -3231,6 +3232,7 @@ name = "frontend"
version = "0.4.0-nightly"
dependencies = [
"api",
"arrow-flight",
"async-compat",
"async-stream",
"async-trait",

View File

@@ -837,10 +837,10 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
Value::Date(v) => Some(ValueData::DateValue(v.val())),
Value::DateTime(v) => Some(ValueData::DatetimeValue(v.val())),
Value::Timestamp(v) => Some(match v.unit() {
TimeUnit::Second => ValueData::TimeSecondValue(v.value()),
TimeUnit::Millisecond => ValueData::TimeMillisecondValue(v.value()),
TimeUnit::Microsecond => ValueData::TimeMicrosecondValue(v.value()),
TimeUnit::Nanosecond => ValueData::TimeNanosecondValue(v.value()),
TimeUnit::Second => ValueData::TsSecondValue(v.value()),
TimeUnit::Millisecond => ValueData::TsMillisecondValue(v.value()),
TimeUnit::Microsecond => ValueData::TsMicrosecondValue(v.value()),
TimeUnit::Nanosecond => ValueData::TsNanosecondValue(v.value()),
}),
Value::Time(v) => Some(match v.unit() {
TimeUnit::Second => ValueData::TimeSecondValue(v.value()),

View File

@@ -30,6 +30,7 @@ moka = { version = "0.9", features = ["future"] }
parking_lot = "0.12"
prost.workspace = true
rand.workspace = true
session = { workspace = true }
snafu.workspace = true
tokio-stream = { version = "0.1", features = ["net"] }
tokio.workspace = true

View File

@@ -33,6 +33,16 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display(
"Failure occurs during handling request, location: {}, source: {}",
location,
source
))]
HandleRequest {
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to convert FlightData, source: {}", source))]
ConvertFlightData {
location: Location,
@@ -85,7 +95,9 @@ impl ErrorExt for Error {
| Error::ClientStreaming { .. } => StatusCode::Internal,
Error::Server { code, .. } => *code,
Error::FlightGet { source, .. } => source.status_code(),
Error::FlightGet { source, .. } | Error::HandleRequest { source, .. } => {
source.status_code()
}
Error::CreateChannel { source, .. } | Error::ConvertFlightData { source, .. } => {
source.status_code()
}

View File

@@ -19,6 +19,7 @@ pub mod error;
pub mod load_balance;
mod metrics;
pub mod region;
pub mod region_handler;
mod stream_insert;
pub use api;

View File

@@ -14,17 +14,26 @@
use api::v1::region::{RegionRequest, RegionResponse};
use api::v1::ResponseHeader;
use arrow_flight::Ticket;
use async_stream::stream;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::datanode_manager::{AffectedRows, Datanode};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_telemetry::timer;
use snafu::{location, Location, OptionExt};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_telemetry::{error, timer};
use snafu::{location, Location, OptionExt, ResultExt};
use tokio_stream::StreamExt;
use crate::error::Error::FlightGet;
use crate::error::{IllegalDatabaseResponseSnafu, MissingFieldSnafu, Result, ServerSnafu};
use crate::{metrics, Client};
use crate::error::{
self, ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu,
MissingFieldSnafu, Result, ServerSnafu,
};
use crate::{metrics, Client, Error};
#[derive(Debug)]
pub struct RegionRequester {
@@ -54,6 +63,77 @@ impl RegionRequester {
Self { client }
}
pub async fn do_get(&self, ticket: Ticket) -> Result<SendableRecordBatchStream> {
let mut flight_client = self.client.make_flight_client()?;
let response = flight_client
.mut_inner()
.do_get(ticket)
.await
.map_err(|e| {
let tonic_code = e.code();
let e: error::Error = e.into();
let code = e.status_code();
let msg = e.to_string();
let error = Error::FlightGet {
tonic_code,
addr: flight_client.addr().to_string(),
source: BoxedError::new(ServerSnafu { code, msg }.build()),
};
error!(
e; "Failed to do Flight get, addr: {}, code: {}",
flight_client.addr(),
tonic_code
);
error
})?;
let flight_data_stream = response.into_inner();
let mut decoder = FlightDecoder::default();
let mut flight_message_stream = flight_data_stream.map(move |flight_data| {
flight_data
.map_err(Error::from)
.and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu))
});
let Some(first_flight_message) = flight_message_stream.next().await else {
return IllegalFlightMessagesSnafu {
reason: "Expect the response not to be empty",
}
.fail();
};
let FlightMessage::Schema(schema) = first_flight_message? else {
return IllegalFlightMessagesSnafu {
reason: "Expect schema to be the first flight message",
}
.fail();
};
let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let FlightMessage::Recordbatch(record_batch) = flight_message else {
yield IllegalFlightMessagesSnafu {
reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
};
yield Ok(record_batch);
}
}));
let record_batch_stream = RecordBatchStreamAdaptor {
schema,
stream,
output_ordering: None,
};
Ok(Box::pin(record_batch_stream))
}
async fn handle_inner(&self, request: RegionRequest) -> Result<AffectedRows> {
let request_type = request
.body

View File

@@ -14,8 +14,9 @@
use std::sync::Arc;
use api::v1::region::{region_request, RegionResponse};
use api::v1::region::{region_request, QueryRequest, RegionResponse};
use async_trait::async_trait;
use common_recordbatch::SendableRecordBatchStream;
use session::context::QueryContextRef;
use crate::error::Result;
@@ -27,6 +28,9 @@ pub trait RegionRequestHandler: Send + Sync {
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<RegionResponse>;
// TODO(ruihang): add trace id and span id in the request.
async fn do_get(&self, request: QueryRequest) -> Result<SendableRecordBatchStream>;
}
pub type RegionRequestHandlerRef = Arc<dyn RegionRequestHandler>;

View File

@@ -269,9 +269,8 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let plugins: Arc<Plugins> = Default::default();
let state = Arc::new(QueryEngineState::new(
catalog_list,
None,
false,
None,
None,
plugins.clone(),
));

View File

@@ -406,9 +406,8 @@ impl Datanode {
let query_engine_factory = QueryEngineFactory::new_with_plugins(
// query engine in datanode only executes plan with resolved table source.
MemoryCatalogManager::with_default_setup(),
None,
false,
None,
None,
plugins,
);
let query_engine = query_engine_factory.query_engine();

View File

@@ -241,13 +241,8 @@ impl Instance {
}
};
let factory = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
false,
None,
None,
plugins,
);
let factory =
QueryEngineFactory::new_with_plugins(catalog_manager.clone(), None, false, plugins);
let query_engine = factory.query_engine();
let procedure_manager = create_procedure_manager(
opts.node_id.unwrap_or(0),

View File

@@ -179,6 +179,7 @@ impl RegionServerInner {
pub fn register_engine(&self, engine: RegionEngineRef) {
let engine_name = engine.name();
info!("Region Engine {engine_name} is registered");
self.engines
.write()
.unwrap()

View File

@@ -11,6 +11,7 @@ testing = []
[dependencies]
api = { workspace = true }
arrow-flight.workspace = true
async-compat = "0.2"
async-stream.workspace = true
async-trait = "0.1"

View File

@@ -22,6 +22,7 @@ use api::v1::{
AlterExpr, ColumnSchema, DdlRequest, InsertRequests, RowInsertRequest, RowInsertRequests,
};
use catalog::CatalogManagerRef;
use client::region_handler::RegionRequestHandlerRef;
use common_catalog::consts::default_engine;
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
use common_query::Output;
@@ -36,10 +37,9 @@ use table::TableRef;
use self::req_convert::{ColumnToRow, RowToRegion};
use crate::error::{
CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu,
Result,
RequestDatanodeSnafu, Result,
};
use crate::expr_factory::CreateExprFactory;
use crate::instance::region_handler::RegionRequestHandlerRef;
pub(crate) struct Inserter<'a> {
catalog_manager: &'a CatalogManagerRef,
@@ -94,7 +94,8 @@ impl<'a> Inserter<'a> {
let response = self
.region_request_handler
.handle(region_request, ctx)
.await?;
.await
.context(RequestDatanodeSnafu)?;
Ok(Output::AffectedRows(response.affected_rows as _))
}
}

View File

@@ -18,7 +18,6 @@ mod influxdb;
mod opentsdb;
mod otlp;
mod prom_store;
pub mod region_handler;
mod script;
mod standalone;
@@ -33,6 +32,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use catalog::remote::CachedMetaKvBackend;
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use client::region_handler::RegionRequestHandlerRef;
use common_base::Plugins;
use common_error::ext::BoxedError;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
@@ -75,7 +75,6 @@ use sql::statements::statement::Statement;
use sqlparser::ast::ObjectName;
use self::distributed::DistRegionRequestHandler;
use self::region_handler::RegionRequestHandlerRef;
use self::standalone::StandaloneRegionRequestHandler;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
@@ -166,12 +165,12 @@ impl Instance {
catalog_manager.set_dist_instance(dist_instance.clone());
let catalog_manager = Arc::new(catalog_manager);
let dist_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone());
let query_engine = QueryEngineFactory::new_with_plugins(
catalog_manager.clone(),
Some(dist_request_handler),
true,
Some(partition_manager.clone()),
Some(datanode_clients.clone()),
plugins.clone(),
)
.query_engine();

View File

@@ -21,13 +21,17 @@ use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::Expr as DdlExpr;
use api::v1::greptime_request::Request;
use api::v1::region::{region_request, RegionResponse};
use api::v1::region::{region_request, QueryRequest, RegionResponse};
use api::v1::{
column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, TruncateTableExpr,
};
use arrow_flight::Ticket;
use async_trait::async_trait;
use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest};
use chrono::DateTime;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region::RegionRequester;
use client::region_handler::RegionRequestHandler;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::ext::BoxedError;
@@ -37,12 +41,14 @@ use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse
use common_meta::rpc::router::{Partition, Partition as MetaPartition};
use common_meta::table_name::TableName;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::info;
use datanode::instance::sql::table_idents_to_full_name;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use partition::manager::PartitionInfo;
use partition::partition::{PartitionBound, PartitionDef};
use prost::Message;
use query::error::QueryExecutionSnafu;
use query::query_engine::SqlStatementExecutor;
use servers::query_handler::grpc::GrpcQueryHandler;
@@ -52,17 +58,17 @@ use sql::ast::{Ident, Value as SqlValue};
use sql::statements::create::{PartitionEntry, Partitions};
use sql::statements::statement::Statement;
use sql::statements::{self, sql_value_to_value};
use store_api::storage::RegionId;
use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType};
use table::requests::{AlterTableRequest, TableOptions};
use table::TableRef;
use super::region_handler::RegionRequestHandler;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu,
DeserializePartitionSnafu, NotSupportedSnafu, ParseSqlSnafu, Result, SchemaExistsSnafu,
TableAlreadyExistSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu,
UnrecognizedTableOptionSnafu,
DeserializePartitionSnafu, FindDatanodeSnafu, FindTableRouteSnafu, NotSupportedSnafu,
ParseSqlSnafu, RequestDatanodeSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu,
TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu,
};
use crate::expr_factory;
use crate::inserter::req_convert::StatementToRegion;
@@ -596,6 +602,26 @@ impl RegionRequestHandler for DistRegionRequestHandler {
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> ClientResult<RegionResponse> {
self.handle_inner(request, ctx)
.await
.map_err(BoxedError::new)
.context(HandleRequestSnafu)
}
async fn do_get(&self, request: QueryRequest) -> ClientResult<SendableRecordBatchStream> {
self.do_get_inner(request)
.await
.map_err(BoxedError::new)
.context(HandleRequestSnafu)
}
}
impl DistRegionRequestHandler {
async fn handle_inner(
&self,
request: region_request::Body,
ctx: QueryContextRef,
) -> Result<RegionResponse> {
match request {
region_request::Body::Inserts(inserts) => {
@@ -641,6 +667,38 @@ impl RegionRequestHandler for DistRegionRequestHandler {
.fail(),
}
}
async fn do_get_inner(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
let region_id = RegionId::from_u64(request.region_id);
let table_route = self
.catalog_manager
.partition_manager()
.find_table_route(region_id.table_id())
.await
.context(FindTableRouteSnafu {
table_id: region_id.table_id(),
})?;
let peer = table_route
.find_region_leader(region_id.region_number())
.context(FindDatanodeSnafu {
region: region_id.region_number(),
})?;
let client = self
.catalog_manager
.datanode_clients()
.get_client(peer)
.await;
let ticket = Ticket {
ticket: request.encode_to_vec().into(),
};
let region_requester = RegionRequester::new(client);
region_requester
.do_get(ticket)
.await
.context(RequestDatanodeSnafu)
}
}
fn create_partitions_stmt(partitions: Vec<PartitionInfo>) -> Result<Option<Partitions>> {

View File

@@ -15,9 +15,13 @@
use std::sync::Arc;
use api::v1::greptime_request::Request;
use api::v1::region::{region_request, RegionResponse};
use api::v1::region::{region_request, QueryRequest, RegionResponse};
use async_trait::async_trait;
use client::error::{HandleRequestSnafu, Result as ClientResult};
use client::region_handler::RegionRequestHandler;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use datanode::error::Error as DatanodeError;
use datanode::region_server::RegionServer;
use servers::grpc::region_server::RegionServerHandler;
@@ -25,7 +29,6 @@ use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef};
use session::context::QueryContextRef;
use snafu::ResultExt;
use super::region_handler::RegionRequestHandler;
use crate::error::{Error, InvokeDatanodeSnafu, InvokeRegionServerSnafu, Result};
pub(crate) struct StandaloneGrpcQueryHandler(GrpcQueryHandlerRef<DatanodeError>);
@@ -64,10 +67,20 @@ impl RegionRequestHandler for StandaloneRegionRequestHandler {
&self,
request: region_request::Body,
_ctx: QueryContextRef,
) -> Result<RegionResponse> {
) -> ClientResult<RegionResponse> {
self.region_server
.handle(request)
.await
.context(InvokeRegionServerSnafu)
.map_err(BoxedError::new)
.context(HandleRequestSnafu)
}
async fn do_get(&self, request: QueryRequest) -> ClientResult<SendableRecordBatchStream> {
self.region_server
.handle_read(request)
.await
.map_err(BoxedError::new)
.context(HandleRequestSnafu)
}
}

View File

@@ -26,6 +26,7 @@ use std::sync::Arc;
use api::v1::region::region_request;
use catalog::CatalogManagerRef;
use client::region_handler::RegionRequestHandlerRef;
use common_error::ext::BoxedError;
use common_query::Output;
use common_time::range::TimestampRange;
@@ -49,11 +50,10 @@ use table::TableRef;
use crate::catalog::FrontendCatalogManager;
use crate::error::{
self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu,
PlanStatementSnafu, Result, TableNotFoundSnafu,
PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu,
};
use crate::inserter::req_convert::TableToRegion;
use crate::instance::distributed::deleter::DistDeleter;
use crate::instance::region_handler::RegionRequestHandlerRef;
use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY};
#[derive(Clone)]
@@ -188,7 +188,8 @@ impl StatementExecutor {
let region_response = self
.region_request_handler
.handle(region_request::Body::Inserts(request), query_ctx)
.await?;
.await
.context(RequestDatanodeSnafu)?;
Ok(region_response.affected_rows as _)
}

View File

@@ -151,7 +151,7 @@ impl EngineInner {
#[async_trait]
impl RegionEngine for MitoEngine {
fn name(&self) -> &str {
"MitoEngine"
"mito"
}
async fn handle_request(

View File

@@ -48,6 +48,7 @@ serde_json = "1.0"
session.workspace = true
snafu = { version = "0.7", features = ["backtraces"] }
sql.workspace = true
store-api.workspace = true
substrait.workspace = true
table.workspace = true
tokio.workspace = true

View File

@@ -517,7 +517,7 @@ mod tests {
};
let _ = catalog_manager.register_table(req).await.unwrap();
QueryEngineFactory::new(catalog_manager, false).query_engine()
QueryEngineFactory::new(catalog_manager, None, false).query_engine()
}
#[tokio::test]

View File

@@ -17,14 +17,11 @@ use std::sync::Arc;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use async_stream::stream;
use client::client_manager::DatanodeClients;
use client::Database;
use client::region_handler::RegionRequestHandlerRef;
use common_base::bytes::Bytes;
use common_error::ext::BoxedError;
use common_meta::peer::Peer;
use common_meta::table_name::TableName;
use common_query::physical_plan::TaskContext;
use common_query::Output;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
@@ -39,9 +36,11 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore};
use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::QueryRequest;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error::{ConvertSchemaSnafu, RemoteRequestSnafu, UnexpectedOutputKindSnafu};
use crate::error::ConvertSchemaSnafu;
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct MergeScanLogicalPlan {
@@ -108,48 +107,52 @@ impl MergeScanLogicalPlan {
}
}
#[derive(Debug)]
pub struct MergeScanExec {
table: TableName,
peers: Vec<Peer>,
regions: Vec<RegionId>,
substrait_plan: Bytes,
schema: SchemaRef,
arrow_schema: ArrowSchemaRef,
clients: Arc<DatanodeClients>,
request_handler: RegionRequestHandlerRef,
metric: ExecutionPlanMetricsSet,
}
impl std::fmt::Debug for MergeScanExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MergeScanExec")
.field("table", &self.table)
.field("regions", &self.regions)
.field("schema", &self.schema)
.finish()
}
}
impl MergeScanExec {
pub fn new(
table: TableName,
peers: Vec<Peer>,
regions: Vec<RegionId>,
substrait_plan: Bytes,
arrow_schema: &ArrowSchema,
clients: Arc<DatanodeClients>,
request_handler: RegionRequestHandlerRef,
) -> Result<Self> {
let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema);
let schema_without_metadata =
Self::arrow_schema_to_schema(arrow_schema_without_metadata.clone())?;
Ok(Self {
table,
peers,
regions,
substrait_plan,
schema: schema_without_metadata,
arrow_schema: arrow_schema_without_metadata,
clients,
request_handler,
metric: ExecutionPlanMetricsSet::new(),
})
}
pub fn to_stream(&self, context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
pub fn to_stream(&self, _context: Arc<TaskContext>) -> Result<SendableRecordBatchStream> {
let substrait_plan = self.substrait_plan.to_vec();
let peers = self.peers.clone();
let clients = self.clients.clone();
let table = self.table.clone();
let trace_id = context
.task_id()
.and_then(|id| id.parse().ok())
.unwrap_or_default();
let regions = self.regions.clone();
let request_handler = self.request_handler.clone();
let metric = MergeScanMetric::new(&self.metric);
let stream = Box::pin(stream!({
@@ -157,27 +160,17 @@ impl MergeScanExec {
let mut ready_timer = metric.ready_time().timer();
let mut first_consume_timer = Some(metric.first_consume_time().timer());
for peer in peers {
let client = clients.get_client(&peer).await;
let database = Database::new(&table.catalog_name, &table.schema_name, client);
let output: Output = database
.logical_plan(substrait_plan.clone(), trace_id)
for region_id in regions {
let request = QueryRequest {
region_id: region_id.into(),
plan: substrait_plan.clone(),
};
let mut stream = request_handler
.do_get(request)
.await
.context(RemoteRequestSnafu)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let Output::Stream(mut stream) = output else {
yield UnexpectedOutputKindSnafu {
expected: "Stream",
got: "RecordBatches or AffectedRows",
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
return;
};
ready_timer.stop();
while let Some(batch) = stream.next().await {
@@ -279,8 +272,8 @@ impl ExecutionPlan for MergeScanExec {
impl DisplayAs for MergeScanExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "MergeScanExec: peers=[")?;
for peer in self.peers.iter() {
write!(f, "{}, ", peer)?;
for region_id in self.regions.iter() {
write!(f, "{}, ", region_id)?;
}
write!(f, "]")
}

View File

@@ -18,9 +18,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use client::region_handler::RegionRequestHandlerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::peer::Peer;
use common_meta::table_name::TableName;
use datafusion::common::Result;
use datafusion::datasource::DefaultTableSource;
@@ -28,11 +27,11 @@ use datafusion::execution::context::SessionState;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner};
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::{DataFusionError, TableReference};
use datafusion_common::TableReference;
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_optimizer::analyzer::Analyzer;
use partition::manager::PartitionRuleManager;
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
pub use table::metadata::TableType;
use table::table::adapter::DfTableProviderAdapter;
@@ -42,21 +41,18 @@ use crate::error;
use crate::error::{CatalogSnafu, TableNotFoundSnafu};
pub struct DistExtensionPlanner {
partition_manager: Arc<PartitionRuleManager>,
clients: Arc<DatanodeClients>,
catalog_manager: CatalogManagerRef,
request_handler: RegionRequestHandlerRef,
}
impl DistExtensionPlanner {
pub fn new(
partition_manager: Arc<PartitionRuleManager>,
clients: Arc<DatanodeClients>,
catalog_manager: CatalogManagerRef,
request_handler: RegionRequestHandlerRef,
) -> Self {
Self {
partition_manager,
clients,
catalog_manager,
request_handler,
}
}
}
@@ -94,7 +90,7 @@ impl ExtensionPlanner for DistExtensionPlanner {
return fallback(&optimized_plan).await;
};
let Ok(peers) = self.get_peers(&table_name).await else {
let Ok(regions) = self.get_regions(&table_name).await else {
// no peers found, going to execute them locally
return fallback(&optimized_plan).await;
};
@@ -109,10 +105,10 @@ impl ExtensionPlanner for DistExtensionPlanner {
.into();
let merge_scan_plan = MergeScanExec::new(
table_name,
peers,
regions,
substrait_plan,
&schema,
self.clients.clone(),
self.request_handler.clone(),
)?;
Ok(Some(Arc::new(merge_scan_plan) as _))
}
@@ -131,7 +127,7 @@ impl DistExtensionPlanner {
plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name))
}
async fn get_peers(&self, table_name: &TableName) -> Result<Vec<Peer>> {
async fn get_regions(&self, table_name: &TableName) -> Result<Vec<RegionId>> {
let table = self
.catalog_manager
.table(
@@ -144,15 +140,7 @@ impl DistExtensionPlanner {
.with_context(|| TableNotFoundSnafu {
table: table_name.to_string(),
})?;
let table_id = table.table_info().table_id();
self.partition_manager
.find_table_region_leaders(table_id)
.await
.with_context(|_| error::RoutePartitionSnafu {
table: table_name.clone(),
})
.map_err(|e| DataFusionError::External(Box::new(e)))
Ok(table.table_info().region_ids())
}
// TODO(ruihang): find a more elegant way to optimize input logical plan

View File

@@ -21,14 +21,13 @@ use std::sync::Arc;
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use client::region_handler::RegionRequestHandlerRef;
use common_base::Plugins;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY};
use common_query::prelude::ScalarUdf;
use common_query::Output;
use datatypes::schema::Schema;
use partition::manager::PartitionRuleManager;
use session::context::QueryContextRef;
use sql::statements::statement::Statement;
use table::TableRef;
@@ -86,28 +85,29 @@ pub struct QueryEngineFactory {
}
impl QueryEngineFactory {
pub fn new(catalog_manager: CatalogManagerRef, with_dist_planner: bool) -> Self {
pub fn new(
catalog_manager: CatalogManagerRef,
request_handler: Option<RegionRequestHandlerRef>,
with_dist_planner: bool,
) -> Self {
Self::new_with_plugins(
catalog_manager,
request_handler,
with_dist_planner,
None,
None,
Default::default(),
)
}
pub fn new_with_plugins(
catalog_manager: CatalogManagerRef,
request_handler: Option<RegionRequestHandlerRef>,
with_dist_planner: bool,
partition_manager: Option<Arc<PartitionRuleManager>>,
clients: Option<Arc<DatanodeClients>>,
plugins: Arc<Plugins>,
) -> Self {
let state = Arc::new(QueryEngineState::new(
catalog_manager,
request_handler,
with_dist_planner,
partition_manager,
clients,
plugins.clone(),
));
let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins));
@@ -139,7 +139,7 @@ mod tests {
#[test]
fn test_query_engine_factory() {
let catalog_list = catalog::local::new_memory_catalog_manager().unwrap();
let factory = QueryEngineFactory::new(catalog_list, false);
let factory = QueryEngineFactory::new(catalog_list, None, false);
let engine = factory.query_engine();

View File

@@ -18,7 +18,7 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use client::client_manager::DatanodeClients;
use client::region_handler::RegionRequestHandlerRef;
use common_base::Plugins;
use common_function::scalars::aggregate::AggregateFunctionMetaRef;
use common_query::physical_plan::SessionContext;
@@ -37,7 +37,6 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, Phy
use datafusion_expr::LogicalPlan as DfLogicalPlan;
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use partition::manager::PartitionRuleManager;
use promql::extension_plan::PromExtensionPlanner;
use substrait::extension_serializer::ExtensionSerializer;
use table::table::adapter::DfTableProviderAdapter;
@@ -73,9 +72,8 @@ impl fmt::Debug for QueryEngineState {
impl QueryEngineState {
pub fn new(
catalog_list: CatalogManagerRef,
request_handler: Option<RegionRequestHandlerRef>,
with_dist_planner: bool,
partition_manager: Option<Arc<PartitionRuleManager>>,
datanode_clients: Option<Arc<DatanodeClients>>,
plugins: Arc<Plugins>,
) -> Self {
let runtime_env = Arc::new(RuntimeEnv::default());
@@ -114,9 +112,8 @@ impl QueryEngineState {
.with_serializer_registry(Arc::new(ExtensionSerializer))
.with_analyzer_rules(analyzer.rules)
.with_query_planner(Arc::new(DfQueryPlanner::new(
partition_manager,
datanode_clients,
catalog_list.clone(),
request_handler,
)))
.with_optimizer_rules(optimizer.rules)
.with_physical_optimizer_rules(physical_optimizers);
@@ -223,15 +220,16 @@ impl QueryPlanner for DfQueryPlanner {
impl DfQueryPlanner {
fn new(
partition_manager: Option<Arc<PartitionRuleManager>>,
datanode_clients: Option<Arc<DatanodeClients>>,
catalog_manager: CatalogManagerRef,
request_handler: Option<RegionRequestHandlerRef>,
) -> Self {
let mut planners: Vec<Arc<dyn ExtensionPlanner + Send + Sync>> =
vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)];
if let Some(partition_manager) = partition_manager
&& let Some(datanode_clients) = datanode_clients {
planners.push(Arc::new(DistExtensionPlanner::new(partition_manager, datanode_clients, catalog_manager)));
if let Some(request_handler) = request_handler {
planners.push(Arc::new(DistExtensionPlanner::new(
catalog_manager,
request_handler,
)));
}
Self {
physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners),

View File

@@ -389,7 +389,7 @@ mod test {
})
.await
.is_ok());
QueryEngineFactory::new(catalog_list, false).query_engine()
QueryEngineFactory::new(catalog_list, None, false).query_engine()
}
async fn query_plan_compare(sql: &str, expected: String) {

View File

@@ -51,5 +51,5 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
QueryEngineFactory::new(catalog_manager, false).query_engine()
QueryEngineFactory::new(catalog_manager, None, false).query_engine()
}

View File

@@ -47,7 +47,7 @@ async fn test_datafusion_query_engine() -> Result<()> {
let catalog_list = catalog::local::new_memory_catalog_manager()
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let factory = QueryEngineFactory::new(catalog_list, false);
let factory = QueryEngineFactory::new(catalog_list, None, false);
let engine = factory.query_engine();
let column_schemas = vec![ColumnSchema::new(
@@ -129,7 +129,7 @@ async fn test_query_validate() -> Result<()> {
});
let plugins = Arc::new(plugins);
let factory = QueryEngineFactory::new_with_plugins(catalog_list, false, None, None, plugins);
let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, false, plugins);
let engine = factory.query_engine();
let stmt = QueryLanguageParser::parse_sql("select number from public.numbers").unwrap();
@@ -153,7 +153,7 @@ async fn test_udf() -> Result<()> {
common_telemetry::init_default_ut_logging();
let catalog_list = catalog_manager()?;
let factory = QueryEngineFactory::new(catalog_list, false);
let factory = QueryEngineFactory::new(catalog_list, None, false);
let engine = factory.query_engine();
let pow = make_scalar_function(pow);

View File

@@ -106,7 +106,7 @@ fn create_test_engine() -> TimeRangeTester {
};
let _ = catalog_manager.register_table_sync(req).unwrap();
let engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
let engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
TimeRangeTester { engine, filter }
}

View File

@@ -52,7 +52,7 @@ where
pub(crate) fn sample_script_engine() -> PyEngine {
let catalog_manager =
MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID));
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
PyEngine::new(query_engine.clone())
}

View File

@@ -41,7 +41,7 @@ impl ScriptManager {
Ok(Self {
compiled: RwLock::new(HashMap::default()),
py_engine: PyEngine::new(query_engine.clone()),
table: ScriptsTable::new(catalog_manager, query_engine).await?,
table: ScriptsTable::new_empty(catalog_manager, query_engine)?,
})
}
@@ -139,6 +139,7 @@ mod tests {
type DefaultEngine = MitoEngine<EngineImpl<RaftEngineLogStore>>;
#[ignore = "script engine is temporary disabled"]
#[tokio::test]
async fn test_insert_find_compile_script() {
let wal_dir = create_temp_dir("test_insert_find_compile_script_wal");
@@ -168,7 +169,7 @@ mod tests {
.unwrap(),
);
let factory = QueryEngineFactory::new(catalog_manager.clone(), false);
let factory = QueryEngineFactory::new(catalog_manager.clone(), None, false);
let query_engine = factory.query_engine();
let mgr = ScriptManager::new(catalog_manager.clone(), query_engine)
.await

View File

@@ -372,7 +372,7 @@ mod tests {
pub(crate) fn sample_script_engine() -> PyEngine {
let catalog_manager =
MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID));
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
PyEngine::new(query_engine.clone())
}

View File

@@ -131,6 +131,22 @@ impl ScriptsTable {
}
Ok(())
}
pub fn new_empty(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,
) -> Result<Self> {
Ok(Self {
catalog_manager,
query_engine,
name: format_full_table_name(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
SCRIPTS_TABLE_NAME,
),
})
}
pub async fn new(
catalog_manager: CatalogManagerRef,
query_engine: QueryEngineRef,

View File

@@ -204,7 +204,7 @@ impl GrpcQueryHandler for DummyInstance {
fn create_testing_instance(table: TableRef) -> DummyInstance {
let catalog_manager = MemoryCatalogManager::new_with_table(table);
let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine();
let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine();
DummyInstance::new(query_engine)
}

View File

@@ -24,7 +24,7 @@ use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRe
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
use crate::error::{self, Result};
use crate::requests::{AddColumnRequest, AlterKind, TableOptions};
@@ -469,6 +469,14 @@ impl TableInfo {
pub fn table_id(&self) -> TableId {
self.ident.table_id
}
pub fn region_ids(&self) -> Vec<RegionId> {
self.meta
.region_numbers
.iter()
.map(|id| RegionId::new(self.table_id(), *id))
.collect()
}
}
impl TableInfoBuilder {