From eb7116ab56b0550f55fa67e0ad4ee7ff3865cd62 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 5 Sep 2023 07:39:14 -0500 Subject: [PATCH] =?UTF-8?q?feat:=20read/write=20works=20in=20distributed?= =?UTF-8?q?=20mode=20=F0=9F=8E=89=20(#2327)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add do_get method to RegionRequestHandler Signed-off-by: Ruihang Xia * move RegionRequestHandler to client crate Signed-off-by: Ruihang Xia * use RegionRequestHandler in MergeScan Signed-off-by: Ruihang Xia * minor fix Signed-off-by: Ruihang Xia * ignore tests Signed-off-by: Ruihang Xia * fix format Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 2 + src/api/src/helper.rs | 8 +- src/client/Cargo.toml | 1 + src/client/src/error.rs | 14 ++- src/client/src/lib.rs | 1 + src/client/src/region.rs | 90 +++++++++++++++++-- .../instance => client/src}/region_handler.rs | 6 +- src/cmd/src/cli/repl.rs | 3 +- src/datanode/src/datanode.rs | 3 +- src/datanode/src/instance.rs | 9 +- src/datanode/src/region_server.rs | 1 + src/frontend/Cargo.toml | 1 + src/frontend/src/inserter.rs | 7 +- src/frontend/src/instance.rs | 7 +- src/frontend/src/instance/distributed.rs | 68 ++++++++++++-- src/frontend/src/instance/standalone.rs | 19 +++- src/frontend/src/statement.rs | 7 +- src/mito2/src/engine.rs | 2 +- src/query/Cargo.toml | 1 + src/query/src/datafusion.rs | 2 +- src/query/src/dist_plan/merge_scan.rs | 71 +++++++-------- src/query/src/dist_plan/planner.rs | 34 +++---- src/query/src/query_engine.rs | 20 ++--- src/query/src/query_engine/state.rs | 20 ++--- src/query/src/range_select/plan_rewrite.rs | 2 +- src/query/src/tests.rs | 2 +- src/query/src/tests/query_engine_test.rs | 6 +- src/query/src/tests/time_range_filter_test.rs | 2 +- src/script/benches/py_benchmark.rs | 2 +- src/script/src/manager.rs | 5 +- src/script/src/python/engine.rs | 2 +- src/script/src/table.rs | 16 ++++ src/servers/tests/mod.rs | 2 +- src/table/src/metadata.rs | 10 ++- 34 files changed, 309 insertions(+), 137 deletions(-) rename src/{frontend/src/instance => client/src}/region_handler.rs (78%) diff --git a/Cargo.lock b/Cargo.lock index f334ab921b..75d33fc098 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1539,6 +1539,7 @@ dependencies = [ "parking_lot", "prost", "rand", + "session", "snafu", "substrait 0.4.0-nightly", "substrait 0.7.5", @@ -3231,6 +3232,7 @@ name = "frontend" version = "0.4.0-nightly" dependencies = [ "api", + "arrow-flight", "async-compat", "async-stream", "async-trait", diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index c2510295cc..17aa41d0ec 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -837,10 +837,10 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { Value::Date(v) => Some(ValueData::DateValue(v.val())), Value::DateTime(v) => Some(ValueData::DatetimeValue(v.val())), Value::Timestamp(v) => Some(match v.unit() { - TimeUnit::Second => ValueData::TimeSecondValue(v.value()), - TimeUnit::Millisecond => ValueData::TimeMillisecondValue(v.value()), - TimeUnit::Microsecond => ValueData::TimeMicrosecondValue(v.value()), - TimeUnit::Nanosecond => ValueData::TimeNanosecondValue(v.value()), + TimeUnit::Second => ValueData::TsSecondValue(v.value()), + TimeUnit::Millisecond => ValueData::TsMillisecondValue(v.value()), + TimeUnit::Microsecond => ValueData::TsMicrosecondValue(v.value()), + TimeUnit::Nanosecond => ValueData::TsNanosecondValue(v.value()), }), Value::Time(v) => Some(match v.unit() { TimeUnit::Second => ValueData::TimeSecondValue(v.value()), diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index b77f515b97..dde80a0194 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -30,6 +30,7 @@ moka = { version = "0.9", features = ["future"] } parking_lot = "0.12" prost.workspace = true rand.workspace = true +session = { workspace = true } snafu.workspace = true tokio-stream = { version = "0.1", features = ["net"] } tokio.workspace = true diff --git a/src/client/src/error.rs b/src/client/src/error.rs index bb68dad64e..7ea8261e81 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -33,6 +33,16 @@ pub enum Error { source: BoxedError, }, + #[snafu(display( + "Failure occurs during handling request, location: {}, source: {}", + location, + source + ))] + HandleRequest { + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to convert FlightData, source: {}", source))] ConvertFlightData { location: Location, @@ -85,7 +95,9 @@ impl ErrorExt for Error { | Error::ClientStreaming { .. } => StatusCode::Internal, Error::Server { code, .. } => *code, - Error::FlightGet { source, .. } => source.status_code(), + Error::FlightGet { source, .. } | Error::HandleRequest { source, .. } => { + source.status_code() + } Error::CreateChannel { source, .. } | Error::ConvertFlightData { source, .. } => { source.status_code() } diff --git a/src/client/src/lib.rs b/src/client/src/lib.rs index 23a67ebae1..e43a6fd69d 100644 --- a/src/client/src/lib.rs +++ b/src/client/src/lib.rs @@ -19,6 +19,7 @@ pub mod error; pub mod load_balance; mod metrics; pub mod region; +pub mod region_handler; mod stream_insert; pub use api; diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 4c325a0a56..f8dc4f599f 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -14,17 +14,26 @@ use api::v1::region::{RegionRequest, RegionResponse}; use api::v1::ResponseHeader; +use arrow_flight::Ticket; +use async_stream::stream; use async_trait::async_trait; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; +use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_meta::datanode_manager::{AffectedRows, Datanode}; use common_meta::error::{self as meta_error, Result as MetaResult}; -use common_telemetry::timer; -use snafu::{location, Location, OptionExt}; +use common_recordbatch::error::ExternalSnafu; +use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream}; +use common_telemetry::{error, timer}; +use snafu::{location, Location, OptionExt, ResultExt}; +use tokio_stream::StreamExt; use crate::error::Error::FlightGet; -use crate::error::{IllegalDatabaseResponseSnafu, MissingFieldSnafu, Result, ServerSnafu}; -use crate::{metrics, Client}; +use crate::error::{ + self, ConvertFlightDataSnafu, IllegalDatabaseResponseSnafu, IllegalFlightMessagesSnafu, + MissingFieldSnafu, Result, ServerSnafu, +}; +use crate::{metrics, Client, Error}; #[derive(Debug)] pub struct RegionRequester { @@ -54,6 +63,77 @@ impl RegionRequester { Self { client } } + pub async fn do_get(&self, ticket: Ticket) -> Result { + let mut flight_client = self.client.make_flight_client()?; + let response = flight_client + .mut_inner() + .do_get(ticket) + .await + .map_err(|e| { + let tonic_code = e.code(); + let e: error::Error = e.into(); + let code = e.status_code(); + let msg = e.to_string(); + let error = Error::FlightGet { + tonic_code, + addr: flight_client.addr().to_string(), + source: BoxedError::new(ServerSnafu { code, msg }.build()), + }; + error!( + e; "Failed to do Flight get, addr: {}, code: {}", + flight_client.addr(), + tonic_code + ); + error + })?; + + let flight_data_stream = response.into_inner(); + let mut decoder = FlightDecoder::default(); + + let mut flight_message_stream = flight_data_stream.map(move |flight_data| { + flight_data + .map_err(Error::from) + .and_then(|data| decoder.try_decode(data).context(ConvertFlightDataSnafu)) + }); + + let Some(first_flight_message) = flight_message_stream.next().await else { + return IllegalFlightMessagesSnafu { + reason: "Expect the response not to be empty", + } + .fail(); + }; + let FlightMessage::Schema(schema) = first_flight_message? else { + return IllegalFlightMessagesSnafu { + reason: "Expect schema to be the first flight message", + } + .fail(); + }; + + let stream = Box::pin(stream!({ + while let Some(flight_message) = flight_message_stream.next().await { + let flight_message = flight_message + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + let FlightMessage::Recordbatch(record_batch) = flight_message else { + yield IllegalFlightMessagesSnafu { + reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages" + } + .fail() + .map_err(BoxedError::new) + .context(ExternalSnafu); + break; + }; + yield Ok(record_batch); + } + })); + let record_batch_stream = RecordBatchStreamAdaptor { + schema, + stream, + output_ordering: None, + }; + Ok(Box::pin(record_batch_stream)) + } + async fn handle_inner(&self, request: RegionRequest) -> Result { let request_type = request .body diff --git a/src/frontend/src/instance/region_handler.rs b/src/client/src/region_handler.rs similarity index 78% rename from src/frontend/src/instance/region_handler.rs rename to src/client/src/region_handler.rs index 5d0c9d5b63..dc3ad1df93 100644 --- a/src/frontend/src/instance/region_handler.rs +++ b/src/client/src/region_handler.rs @@ -14,8 +14,9 @@ use std::sync::Arc; -use api::v1::region::{region_request, RegionResponse}; +use api::v1::region::{region_request, QueryRequest, RegionResponse}; use async_trait::async_trait; +use common_recordbatch::SendableRecordBatchStream; use session::context::QueryContextRef; use crate::error::Result; @@ -27,6 +28,9 @@ pub trait RegionRequestHandler: Send + Sync { request: region_request::Body, ctx: QueryContextRef, ) -> Result; + + // TODO(ruihang): add trace id and span id in the request. + async fn do_get(&self, request: QueryRequest) -> Result; } pub type RegionRequestHandlerRef = Arc; diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index d963944076..f8d6832415 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -269,9 +269,8 @@ async fn create_query_engine(meta_addr: &str) -> Result { let plugins: Arc = Default::default(); let state = Arc::new(QueryEngineState::new( catalog_list, + None, false, - None, - None, plugins.clone(), )); diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index ca25f45d10..5bbed0a148 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -406,9 +406,8 @@ impl Datanode { let query_engine_factory = QueryEngineFactory::new_with_plugins( // query engine in datanode only executes plan with resolved table source. MemoryCatalogManager::with_default_setup(), + None, false, - None, - None, plugins, ); let query_engine = query_engine_factory.query_engine(); diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 4985c2bb17..0693f7ca48 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -241,13 +241,8 @@ impl Instance { } }; - let factory = QueryEngineFactory::new_with_plugins( - catalog_manager.clone(), - false, - None, - None, - plugins, - ); + let factory = + QueryEngineFactory::new_with_plugins(catalog_manager.clone(), None, false, plugins); let query_engine = factory.query_engine(); let procedure_manager = create_procedure_manager( opts.node_id.unwrap_or(0), diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 5be6ed752e..0aea68307d 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -179,6 +179,7 @@ impl RegionServerInner { pub fn register_engine(&self, engine: RegionEngineRef) { let engine_name = engine.name(); + info!("Region Engine {engine_name} is registered"); self.engines .write() .unwrap() diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 0ea5c683e5..8b2edc804c 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -11,6 +11,7 @@ testing = [] [dependencies] api = { workspace = true } +arrow-flight.workspace = true async-compat = "0.2" async-stream.workspace = true async-trait = "0.1" diff --git a/src/frontend/src/inserter.rs b/src/frontend/src/inserter.rs index d78412606a..31d66235b0 100644 --- a/src/frontend/src/inserter.rs +++ b/src/frontend/src/inserter.rs @@ -22,6 +22,7 @@ use api::v1::{ AlterExpr, ColumnSchema, DdlRequest, InsertRequests, RowInsertRequest, RowInsertRequests, }; use catalog::CatalogManagerRef; +use client::region_handler::RegionRequestHandlerRef; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; use common_query::Output; @@ -36,10 +37,9 @@ use table::TableRef; use self::req_convert::{ColumnToRow, RowToRegion}; use crate::error::{ CatalogSnafu, EmptyDataSnafu, Error, FindNewColumnsOnInsertionSnafu, InvalidInsertRequestSnafu, - Result, + RequestDatanodeSnafu, Result, }; use crate::expr_factory::CreateExprFactory; -use crate::instance::region_handler::RegionRequestHandlerRef; pub(crate) struct Inserter<'a> { catalog_manager: &'a CatalogManagerRef, @@ -94,7 +94,8 @@ impl<'a> Inserter<'a> { let response = self .region_request_handler .handle(region_request, ctx) - .await?; + .await + .context(RequestDatanodeSnafu)?; Ok(Output::AffectedRows(response.affected_rows as _)) } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 020b674f89..5f6fc6c0ba 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -18,7 +18,6 @@ mod influxdb; mod opentsdb; mod otlp; mod prom_store; -pub mod region_handler; mod script; mod standalone; @@ -33,6 +32,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use catalog::remote::CachedMetaKvBackend; use catalog::CatalogManagerRef; use client::client_manager::DatanodeClients; +use client::region_handler::RegionRequestHandlerRef; use common_base::Plugins; use common_error::ext::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -75,7 +75,6 @@ use sql::statements::statement::Statement; use sqlparser::ast::ObjectName; use self::distributed::DistRegionRequestHandler; -use self::region_handler::RegionRequestHandlerRef; use self::standalone::StandaloneRegionRequestHandler; use crate::catalog::FrontendCatalogManager; use crate::error::{ @@ -166,12 +165,12 @@ impl Instance { catalog_manager.set_dist_instance(dist_instance.clone()); let catalog_manager = Arc::new(catalog_manager); + let dist_request_handler = DistRegionRequestHandler::arc(catalog_manager.clone()); let query_engine = QueryEngineFactory::new_with_plugins( catalog_manager.clone(), + Some(dist_request_handler), true, - Some(partition_manager.clone()), - Some(datanode_clients.clone()), plugins.clone(), ) .query_engine(); diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index a9d98a4537..45d7ca1221 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -21,13 +21,17 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; -use api::v1::region::{region_request, RegionResponse}; +use api::v1::region::{region_request, QueryRequest, RegionResponse}; use api::v1::{ column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequests, TruncateTableExpr, }; +use arrow_flight::Ticket; use async_trait::async_trait; use catalog::{CatalogManager, DeregisterTableRequest, RegisterTableRequest}; use chrono::DateTime; +use client::error::{HandleRequestSnafu, Result as ClientResult}; +use client::region::RegionRequester; +use client::region_handler::RegionRequestHandler; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::format_full_table_name; use common_error::ext::BoxedError; @@ -37,12 +41,14 @@ use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_meta::table_name::TableName; use common_query::Output; +use common_recordbatch::SendableRecordBatchStream; use common_telemetry::info; use datanode::instance::sql::table_idents_to_full_name; use datatypes::prelude::ConcreteDataType; use datatypes::schema::RawSchema; use partition::manager::PartitionInfo; use partition::partition::{PartitionBound, PartitionDef}; +use prost::Message; use query::error::QueryExecutionSnafu; use query::query_engine::SqlStatementExecutor; use servers::query_handler::grpc::GrpcQueryHandler; @@ -52,17 +58,17 @@ use sql::ast::{Ident, Value as SqlValue}; use sql::statements::create::{PartitionEntry, Partitions}; use sql::statements::statement::Statement; use sql::statements::{self, sql_value_to_value}; +use store_api::storage::RegionId; use table::metadata::{RawTableInfo, RawTableMeta, TableId, TableIdent, TableInfo, TableType}; use table::requests::{AlterTableRequest, TableOptions}; use table::TableRef; -use super::region_handler::RegionRequestHandler; use crate::catalog::FrontendCatalogManager; use crate::error::{ self, AlterExprToRequestSnafu, CatalogSnafu, ColumnDataTypeSnafu, ColumnNotFoundSnafu, - DeserializePartitionSnafu, NotSupportedSnafu, ParseSqlSnafu, Result, SchemaExistsSnafu, - TableAlreadyExistSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, - UnrecognizedTableOptionSnafu, + DeserializePartitionSnafu, FindDatanodeSnafu, FindTableRouteSnafu, NotSupportedSnafu, + ParseSqlSnafu, RequestDatanodeSnafu, Result, SchemaExistsSnafu, TableAlreadyExistSnafu, + TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, }; use crate::expr_factory; use crate::inserter::req_convert::StatementToRegion; @@ -596,6 +602,26 @@ impl RegionRequestHandler for DistRegionRequestHandler { &self, request: region_request::Body, ctx: QueryContextRef, + ) -> ClientResult { + self.handle_inner(request, ctx) + .await + .map_err(BoxedError::new) + .context(HandleRequestSnafu) + } + + async fn do_get(&self, request: QueryRequest) -> ClientResult { + self.do_get_inner(request) + .await + .map_err(BoxedError::new) + .context(HandleRequestSnafu) + } +} + +impl DistRegionRequestHandler { + async fn handle_inner( + &self, + request: region_request::Body, + ctx: QueryContextRef, ) -> Result { match request { region_request::Body::Inserts(inserts) => { @@ -641,6 +667,38 @@ impl RegionRequestHandler for DistRegionRequestHandler { .fail(), } } + + async fn do_get_inner(&self, request: QueryRequest) -> Result { + let region_id = RegionId::from_u64(request.region_id); + + let table_route = self + .catalog_manager + .partition_manager() + .find_table_route(region_id.table_id()) + .await + .context(FindTableRouteSnafu { + table_id: region_id.table_id(), + })?; + let peer = table_route + .find_region_leader(region_id.region_number()) + .context(FindDatanodeSnafu { + region: region_id.region_number(), + })?; + let client = self + .catalog_manager + .datanode_clients() + .get_client(peer) + .await; + + let ticket = Ticket { + ticket: request.encode_to_vec().into(), + }; + let region_requester = RegionRequester::new(client); + region_requester + .do_get(ticket) + .await + .context(RequestDatanodeSnafu) + } } fn create_partitions_stmt(partitions: Vec) -> Result> { diff --git a/src/frontend/src/instance/standalone.rs b/src/frontend/src/instance/standalone.rs index 2607c3fb98..f63c198170 100644 --- a/src/frontend/src/instance/standalone.rs +++ b/src/frontend/src/instance/standalone.rs @@ -15,9 +15,13 @@ use std::sync::Arc; use api::v1::greptime_request::Request; -use api::v1::region::{region_request, RegionResponse}; +use api::v1::region::{region_request, QueryRequest, RegionResponse}; use async_trait::async_trait; +use client::error::{HandleRequestSnafu, Result as ClientResult}; +use client::region_handler::RegionRequestHandler; +use common_error::ext::BoxedError; use common_query::Output; +use common_recordbatch::SendableRecordBatchStream; use datanode::error::Error as DatanodeError; use datanode::region_server::RegionServer; use servers::grpc::region_server::RegionServerHandler; @@ -25,7 +29,6 @@ use servers::query_handler::grpc::{GrpcQueryHandler, GrpcQueryHandlerRef}; use session::context::QueryContextRef; use snafu::ResultExt; -use super::region_handler::RegionRequestHandler; use crate::error::{Error, InvokeDatanodeSnafu, InvokeRegionServerSnafu, Result}; pub(crate) struct StandaloneGrpcQueryHandler(GrpcQueryHandlerRef); @@ -64,10 +67,20 @@ impl RegionRequestHandler for StandaloneRegionRequestHandler { &self, request: region_request::Body, _ctx: QueryContextRef, - ) -> Result { + ) -> ClientResult { self.region_server .handle(request) .await .context(InvokeRegionServerSnafu) + .map_err(BoxedError::new) + .context(HandleRequestSnafu) + } + + async fn do_get(&self, request: QueryRequest) -> ClientResult { + self.region_server + .handle_read(request) + .await + .map_err(BoxedError::new) + .context(HandleRequestSnafu) } } diff --git a/src/frontend/src/statement.rs b/src/frontend/src/statement.rs index e42d107a3f..262a9b09bd 100644 --- a/src/frontend/src/statement.rs +++ b/src/frontend/src/statement.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use api::v1::region::region_request; use catalog::CatalogManagerRef; +use client::region_handler::RegionRequestHandlerRef; use common_error::ext::BoxedError; use common_query::Output; use common_time::range::TimestampRange; @@ -49,11 +50,10 @@ use table::TableRef; use crate::catalog::FrontendCatalogManager; use crate::error::{ self, CatalogSnafu, ExecLogicalPlanSnafu, ExecuteStatementSnafu, ExternalSnafu, InsertSnafu, - PlanStatementSnafu, Result, TableNotFoundSnafu, + PlanStatementSnafu, RequestDatanodeSnafu, Result, TableNotFoundSnafu, }; use crate::inserter::req_convert::TableToRegion; use crate::instance::distributed::deleter::DistDeleter; -use crate::instance::region_handler::RegionRequestHandlerRef; use crate::statement::backup::{COPY_DATABASE_TIME_END_KEY, COPY_DATABASE_TIME_START_KEY}; #[derive(Clone)] @@ -188,7 +188,8 @@ impl StatementExecutor { let region_response = self .region_request_handler .handle(region_request::Body::Inserts(request), query_ctx) - .await?; + .await + .context(RequestDatanodeSnafu)?; Ok(region_response.affected_rows as _) } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 606c0522ab..e803000521 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -151,7 +151,7 @@ impl EngineInner { #[async_trait] impl RegionEngine for MitoEngine { fn name(&self) -> &str { - "MitoEngine" + "mito" } async fn handle_request( diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index 3e59d1b44c..33e4564167 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -48,6 +48,7 @@ serde_json = "1.0" session.workspace = true snafu = { version = "0.7", features = ["backtraces"] } sql.workspace = true +store-api.workspace = true substrait.workspace = true table.workspace = true tokio.workspace = true diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index e317cb9d5c..237f4e750d 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -517,7 +517,7 @@ mod tests { }; let _ = catalog_manager.register_table(req).await.unwrap(); - QueryEngineFactory::new(catalog_manager, false).query_engine() + QueryEngineFactory::new(catalog_manager, None, false).query_engine() } #[tokio::test] diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 4edb9253b8..042ac5f7f8 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -17,14 +17,11 @@ use std::sync::Arc; use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use async_stream::stream; -use client::client_manager::DatanodeClients; -use client::Database; +use client::region_handler::RegionRequestHandlerRef; use common_base::bytes::Bytes; use common_error::ext::BoxedError; -use common_meta::peer::Peer; use common_meta::table_name::TableName; use common_query::physical_plan::TaskContext; -use common_query::Output; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ @@ -39,9 +36,11 @@ use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; +use greptime_proto::v1::region::QueryRequest; use snafu::ResultExt; +use store_api::storage::RegionId; -use crate::error::{ConvertSchemaSnafu, RemoteRequestSnafu, UnexpectedOutputKindSnafu}; +use crate::error::ConvertSchemaSnafu; #[derive(Debug, Hash, PartialEq, Eq, Clone)] pub struct MergeScanLogicalPlan { @@ -108,48 +107,52 @@ impl MergeScanLogicalPlan { } } -#[derive(Debug)] pub struct MergeScanExec { table: TableName, - peers: Vec, + regions: Vec, substrait_plan: Bytes, schema: SchemaRef, arrow_schema: ArrowSchemaRef, - clients: Arc, + request_handler: RegionRequestHandlerRef, metric: ExecutionPlanMetricsSet, } +impl std::fmt::Debug for MergeScanExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MergeScanExec") + .field("table", &self.table) + .field("regions", &self.regions) + .field("schema", &self.schema) + .finish() + } +} + impl MergeScanExec { pub fn new( table: TableName, - peers: Vec, + regions: Vec, substrait_plan: Bytes, arrow_schema: &ArrowSchema, - clients: Arc, + request_handler: RegionRequestHandlerRef, ) -> Result { let arrow_schema_without_metadata = Self::arrow_schema_without_metadata(arrow_schema); let schema_without_metadata = Self::arrow_schema_to_schema(arrow_schema_without_metadata.clone())?; Ok(Self { table, - peers, + regions, substrait_plan, schema: schema_without_metadata, arrow_schema: arrow_schema_without_metadata, - clients, + request_handler, metric: ExecutionPlanMetricsSet::new(), }) } - pub fn to_stream(&self, context: Arc) -> Result { + pub fn to_stream(&self, _context: Arc) -> Result { let substrait_plan = self.substrait_plan.to_vec(); - let peers = self.peers.clone(); - let clients = self.clients.clone(); - let table = self.table.clone(); - let trace_id = context - .task_id() - .and_then(|id| id.parse().ok()) - .unwrap_or_default(); + let regions = self.regions.clone(); + let request_handler = self.request_handler.clone(); let metric = MergeScanMetric::new(&self.metric); let stream = Box::pin(stream!({ @@ -157,27 +160,17 @@ impl MergeScanExec { let mut ready_timer = metric.ready_time().timer(); let mut first_consume_timer = Some(metric.first_consume_time().timer()); - for peer in peers { - let client = clients.get_client(&peer).await; - let database = Database::new(&table.catalog_name, &table.schema_name, client); - let output: Output = database - .logical_plan(substrait_plan.clone(), trace_id) + for region_id in regions { + let request = QueryRequest { + region_id: region_id.into(), + plan: substrait_plan.clone(), + }; + let mut stream = request_handler + .do_get(request) .await - .context(RemoteRequestSnafu) .map_err(BoxedError::new) .context(ExternalSnafu)?; - let Output::Stream(mut stream) = output else { - yield UnexpectedOutputKindSnafu { - expected: "Stream", - got: "RecordBatches or AffectedRows", - } - .fail() - .map_err(BoxedError::new) - .context(ExternalSnafu); - return; - }; - ready_timer.stop(); while let Some(batch) = stream.next().await { @@ -279,8 +272,8 @@ impl ExecutionPlan for MergeScanExec { impl DisplayAs for MergeScanExec { fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "MergeScanExec: peers=[")?; - for peer in self.peers.iter() { - write!(f, "{}, ", peer)?; + for region_id in self.regions.iter() { + write!(f, "{}, ", region_id)?; } write!(f, "]") } diff --git a/src/query/src/dist_plan/planner.rs b/src/query/src/dist_plan/planner.rs index a5261e1757..2886dd9c9d 100644 --- a/src/query/src/dist_plan/planner.rs +++ b/src/query/src/dist_plan/planner.rs @@ -18,9 +18,8 @@ use std::sync::Arc; use async_trait::async_trait; use catalog::CatalogManagerRef; -use client::client_manager::DatanodeClients; +use client::region_handler::RegionRequestHandlerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; -use common_meta::peer::Peer; use common_meta::table_name::TableName; use datafusion::common::Result; use datafusion::datasource::DefaultTableSource; @@ -28,11 +27,11 @@ use datafusion::execution::context::SessionState; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeVisitor, VisitRecursion}; -use datafusion_common::{DataFusionError, TableReference}; +use datafusion_common::TableReference; use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; use datafusion_optimizer::analyzer::Analyzer; -use partition::manager::PartitionRuleManager; use snafu::{OptionExt, ResultExt}; +use store_api::storage::RegionId; use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; pub use table::metadata::TableType; use table::table::adapter::DfTableProviderAdapter; @@ -42,21 +41,18 @@ use crate::error; use crate::error::{CatalogSnafu, TableNotFoundSnafu}; pub struct DistExtensionPlanner { - partition_manager: Arc, - clients: Arc, catalog_manager: CatalogManagerRef, + request_handler: RegionRequestHandlerRef, } impl DistExtensionPlanner { pub fn new( - partition_manager: Arc, - clients: Arc, catalog_manager: CatalogManagerRef, + request_handler: RegionRequestHandlerRef, ) -> Self { Self { - partition_manager, - clients, catalog_manager, + request_handler, } } } @@ -94,7 +90,7 @@ impl ExtensionPlanner for DistExtensionPlanner { return fallback(&optimized_plan).await; }; - let Ok(peers) = self.get_peers(&table_name).await else { + let Ok(regions) = self.get_regions(&table_name).await else { // no peers found, going to execute them locally return fallback(&optimized_plan).await; }; @@ -109,10 +105,10 @@ impl ExtensionPlanner for DistExtensionPlanner { .into(); let merge_scan_plan = MergeScanExec::new( table_name, - peers, + regions, substrait_plan, &schema, - self.clients.clone(), + self.request_handler.clone(), )?; Ok(Some(Arc::new(merge_scan_plan) as _)) } @@ -131,7 +127,7 @@ impl DistExtensionPlanner { plan.transform(&|plan| TableNameRewriter::rewrite_table_name(plan, name)) } - async fn get_peers(&self, table_name: &TableName) -> Result> { + async fn get_regions(&self, table_name: &TableName) -> Result> { let table = self .catalog_manager .table( @@ -144,15 +140,7 @@ impl DistExtensionPlanner { .with_context(|| TableNotFoundSnafu { table: table_name.to_string(), })?; - let table_id = table.table_info().table_id(); - - self.partition_manager - .find_table_region_leaders(table_id) - .await - .with_context(|_| error::RoutePartitionSnafu { - table: table_name.clone(), - }) - .map_err(|e| DataFusionError::External(Box::new(e))) + Ok(table.table_info().region_ids()) } // TODO(ruihang): find a more elegant way to optimize input logical plan diff --git a/src/query/src/query_engine.rs b/src/query/src/query_engine.rs index 3cfbf70ff7..1d137706c0 100644 --- a/src/query/src/query_engine.rs +++ b/src/query/src/query_engine.rs @@ -21,14 +21,13 @@ use std::sync::Arc; use async_trait::async_trait; use catalog::CatalogManagerRef; -use client::client_manager::DatanodeClients; +use client::region_handler::RegionRequestHandlerRef; use common_base::Plugins; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_function::scalars::{FunctionRef, FUNCTION_REGISTRY}; use common_query::prelude::ScalarUdf; use common_query::Output; use datatypes::schema::Schema; -use partition::manager::PartitionRuleManager; use session::context::QueryContextRef; use sql::statements::statement::Statement; use table::TableRef; @@ -86,28 +85,29 @@ pub struct QueryEngineFactory { } impl QueryEngineFactory { - pub fn new(catalog_manager: CatalogManagerRef, with_dist_planner: bool) -> Self { + pub fn new( + catalog_manager: CatalogManagerRef, + request_handler: Option, + with_dist_planner: bool, + ) -> Self { Self::new_with_plugins( catalog_manager, + request_handler, with_dist_planner, - None, - None, Default::default(), ) } pub fn new_with_plugins( catalog_manager: CatalogManagerRef, + request_handler: Option, with_dist_planner: bool, - partition_manager: Option>, - clients: Option>, plugins: Arc, ) -> Self { let state = Arc::new(QueryEngineState::new( catalog_manager, + request_handler, with_dist_planner, - partition_manager, - clients, plugins.clone(), )); let query_engine = Arc::new(DatafusionQueryEngine::new(state, plugins)); @@ -139,7 +139,7 @@ mod tests { #[test] fn test_query_engine_factory() { let catalog_list = catalog::local::new_memory_catalog_manager().unwrap(); - let factory = QueryEngineFactory::new(catalog_list, false); + let factory = QueryEngineFactory::new(catalog_list, None, false); let engine = factory.query_engine(); diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index e7343815cb..f4d6b51631 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -18,7 +18,7 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use catalog::CatalogManagerRef; -use client::client_manager::DatanodeClients; +use client::region_handler::RegionRequestHandlerRef; use common_base::Plugins; use common_function::scalars::aggregate::AggregateFunctionMetaRef; use common_query::physical_plan::SessionContext; @@ -37,7 +37,6 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, Phy use datafusion_expr::LogicalPlan as DfLogicalPlan; use datafusion_optimizer::analyzer::Analyzer; use datafusion_optimizer::optimizer::Optimizer; -use partition::manager::PartitionRuleManager; use promql::extension_plan::PromExtensionPlanner; use substrait::extension_serializer::ExtensionSerializer; use table::table::adapter::DfTableProviderAdapter; @@ -73,9 +72,8 @@ impl fmt::Debug for QueryEngineState { impl QueryEngineState { pub fn new( catalog_list: CatalogManagerRef, + request_handler: Option, with_dist_planner: bool, - partition_manager: Option>, - datanode_clients: Option>, plugins: Arc, ) -> Self { let runtime_env = Arc::new(RuntimeEnv::default()); @@ -114,9 +112,8 @@ impl QueryEngineState { .with_serializer_registry(Arc::new(ExtensionSerializer)) .with_analyzer_rules(analyzer.rules) .with_query_planner(Arc::new(DfQueryPlanner::new( - partition_manager, - datanode_clients, catalog_list.clone(), + request_handler, ))) .with_optimizer_rules(optimizer.rules) .with_physical_optimizer_rules(physical_optimizers); @@ -223,15 +220,16 @@ impl QueryPlanner for DfQueryPlanner { impl DfQueryPlanner { fn new( - partition_manager: Option>, - datanode_clients: Option>, catalog_manager: CatalogManagerRef, + request_handler: Option, ) -> Self { let mut planners: Vec> = vec![Arc::new(PromExtensionPlanner), Arc::new(RangeSelectPlanner)]; - if let Some(partition_manager) = partition_manager - && let Some(datanode_clients) = datanode_clients { - planners.push(Arc::new(DistExtensionPlanner::new(partition_manager, datanode_clients, catalog_manager))); + if let Some(request_handler) = request_handler { + planners.push(Arc::new(DistExtensionPlanner::new( + catalog_manager, + request_handler, + ))); } Self { physical_planner: DefaultPhysicalPlanner::with_extension_planners(planners), diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index aa6de6514c..eff319f456 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -389,7 +389,7 @@ mod test { }) .await .is_ok()); - QueryEngineFactory::new(catalog_list, false).query_engine() + QueryEngineFactory::new(catalog_list, None, false).query_engine() } async fn query_plan_compare(sql: &str, expected: String) { diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index 312bd4ea49..f36083d1f8 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -51,5 +51,5 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec { pub fn new_query_engine_with_table(table: TableRef) -> QueryEngineRef { let catalog_manager = MemoryCatalogManager::new_with_table(table); - QueryEngineFactory::new(catalog_manager, false).query_engine() + QueryEngineFactory::new(catalog_manager, None, false).query_engine() } diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index 39197fdb6a..83ec1e52bc 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -47,7 +47,7 @@ async fn test_datafusion_query_engine() -> Result<()> { let catalog_list = catalog::local::new_memory_catalog_manager() .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; - let factory = QueryEngineFactory::new(catalog_list, false); + let factory = QueryEngineFactory::new(catalog_list, None, false); let engine = factory.query_engine(); let column_schemas = vec![ColumnSchema::new( @@ -129,7 +129,7 @@ async fn test_query_validate() -> Result<()> { }); let plugins = Arc::new(plugins); - let factory = QueryEngineFactory::new_with_plugins(catalog_list, false, None, None, plugins); + let factory = QueryEngineFactory::new_with_plugins(catalog_list, None, false, plugins); let engine = factory.query_engine(); let stmt = QueryLanguageParser::parse_sql("select number from public.numbers").unwrap(); @@ -153,7 +153,7 @@ async fn test_udf() -> Result<()> { common_telemetry::init_default_ut_logging(); let catalog_list = catalog_manager()?; - let factory = QueryEngineFactory::new(catalog_list, false); + let factory = QueryEngineFactory::new(catalog_list, None, false); let engine = factory.query_engine(); let pow = make_scalar_function(pow); diff --git a/src/query/src/tests/time_range_filter_test.rs b/src/query/src/tests/time_range_filter_test.rs index de412c1e44..383ac2ea80 100644 --- a/src/query/src/tests/time_range_filter_test.rs +++ b/src/query/src/tests/time_range_filter_test.rs @@ -106,7 +106,7 @@ fn create_test_engine() -> TimeRangeTester { }; let _ = catalog_manager.register_table_sync(req).unwrap(); - let engine = QueryEngineFactory::new(catalog_manager, false).query_engine(); + let engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); TimeRangeTester { engine, filter } } diff --git a/src/script/benches/py_benchmark.rs b/src/script/benches/py_benchmark.rs index d1e415afd1..c5fbe17072 100644 --- a/src/script/benches/py_benchmark.rs +++ b/src/script/benches/py_benchmark.rs @@ -52,7 +52,7 @@ where pub(crate) fn sample_script_engine() -> PyEngine { let catalog_manager = MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID)); - let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine(); + let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); PyEngine::new(query_engine.clone()) } diff --git a/src/script/src/manager.rs b/src/script/src/manager.rs index 9a7b4130e9..ac18774251 100644 --- a/src/script/src/manager.rs +++ b/src/script/src/manager.rs @@ -41,7 +41,7 @@ impl ScriptManager { Ok(Self { compiled: RwLock::new(HashMap::default()), py_engine: PyEngine::new(query_engine.clone()), - table: ScriptsTable::new(catalog_manager, query_engine).await?, + table: ScriptsTable::new_empty(catalog_manager, query_engine)?, }) } @@ -139,6 +139,7 @@ mod tests { type DefaultEngine = MitoEngine>; + #[ignore = "script engine is temporary disabled"] #[tokio::test] async fn test_insert_find_compile_script() { let wal_dir = create_temp_dir("test_insert_find_compile_script_wal"); @@ -168,7 +169,7 @@ mod tests { .unwrap(), ); - let factory = QueryEngineFactory::new(catalog_manager.clone(), false); + let factory = QueryEngineFactory::new(catalog_manager.clone(), None, false); let query_engine = factory.query_engine(); let mgr = ScriptManager::new(catalog_manager.clone(), query_engine) .await diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index a1947b529a..f60b2914d7 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -372,7 +372,7 @@ mod tests { pub(crate) fn sample_script_engine() -> PyEngine { let catalog_manager = MemoryCatalogManager::new_with_table(NumbersTable::table(NUMBERS_TABLE_ID)); - let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine(); + let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); PyEngine::new(query_engine.clone()) } diff --git a/src/script/src/table.rs b/src/script/src/table.rs index e3890517ce..993a8c5395 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -131,6 +131,22 @@ impl ScriptsTable { } Ok(()) } + + pub fn new_empty( + catalog_manager: CatalogManagerRef, + query_engine: QueryEngineRef, + ) -> Result { + Ok(Self { + catalog_manager, + query_engine, + name: format_full_table_name( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + SCRIPTS_TABLE_NAME, + ), + }) + } + pub async fn new( catalog_manager: CatalogManagerRef, query_engine: QueryEngineRef, diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index bbd38ac27c..cb71ad4f29 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -204,7 +204,7 @@ impl GrpcQueryHandler for DummyInstance { fn create_testing_instance(table: TableRef) -> DummyInstance { let catalog_manager = MemoryCatalogManager::new_with_table(table); - let query_engine = QueryEngineFactory::new(catalog_manager, false).query_engine(); + let query_engine = QueryEngineFactory::new(catalog_manager, None, false).query_engine(); DummyInstance::new(query_engine) } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 2892338802..f384f86331 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -24,7 +24,7 @@ use datatypes::schema::{ColumnSchema, RawSchema, Schema, SchemaBuilder, SchemaRe use derive_builder::Builder; use serde::{Deserialize, Serialize}; use snafu::{ensure, ResultExt}; -use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId}; +use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId}; use crate::error::{self, Result}; use crate::requests::{AddColumnRequest, AlterKind, TableOptions}; @@ -469,6 +469,14 @@ impl TableInfo { pub fn table_id(&self) -> TableId { self.ident.table_id } + + pub fn region_ids(&self) -> Vec { + self.meta + .region_numbers + .iter() + .map(|id| RegionId::new(self.table_id(), *id)) + .collect() + } } impl TableInfoBuilder {