From 6b8cf0bbf069e6fee9ead4a23b37ad5f50841c6a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Mon, 28 Aug 2023 04:24:12 -0500 Subject: [PATCH] feat: impl region engine for mito (#2269) * update proto Signed-off-by: Ruihang Xia * convert request Signed-off-by: Ruihang Xia * update proto Signed-off-by: Ruihang Xia * import result convertor Signed-off-by: Ruihang Xia * rename symbols Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/api/src/helper.rs | 24 ++++---- src/datanode/src/error.rs | 11 ++++ src/datanode/src/region_server.rs | 68 +++++++++++++++++---- src/datanode/src/server.rs | 2 +- src/mito2/src/engine.rs | 49 +++++++++++++++ src/servers/src/error.rs | 7 +++ src/servers/src/grpc.rs | 4 +- src/servers/src/grpc/region_server.rs | 11 ++-- src/store-api/src/metadata.rs | 40 ++++++++++++- src/store-api/src/region_request.rs | 85 ++++++++++++++++++++++++++- 12 files changed, 269 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 89a4729c58..1a3c89a186 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4153,7 +4153,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3489b4742150abe0a769faf1bb60fbb95b061fc8#3489b4742150abe0a769faf1bb60fbb95b061fc8" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=39b0ea8d086d0ab762046b0f473aa3ef8bd347f9#39b0ea8d086d0ab762046b0f473aa3ef8bd347f9" dependencies = [ "prost", "serde", diff --git a/Cargo.toml b/Cargo.toml index 4c2eaa2e16..d81105b40f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git derive_builder = "0.12" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3489b4742150abe0a769faf1bb60fbb95b061fc8" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "39b0ea8d086d0ab762046b0f473aa3ef8bd347f9" } itertools = "0.10" lazy_static = "1.4" once_cell = "1.18" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index bb049ecb26..20c27c82aa 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -56,6 +56,10 @@ impl ColumnDataTypeWrapper { Ok(Self(datatype)) } + pub fn new(datatype: ColumnDataType) -> Self { + Self(datatype) + } + pub fn datatype(&self) -> ColumnDataType { self.0 } @@ -330,17 +334,17 @@ fn query_request_type(request: &QueryRequest) -> &'static str { } /// Returns the type name of the [RegionRequest]. -pub fn region_request_type(request: ®ion_request::Request) -> &'static str { +pub fn region_request_type(request: ®ion_request::Body) -> &'static str { match request { - region_request::Request::Inserts(_) => "region.inserts", - region_request::Request::Deletes(_) => "region.deletes", - region_request::Request::Create(_) => "region.create", - region_request::Request::Drop(_) => "region.drop ", - region_request::Request::Open(_) => "region.open", - region_request::Request::Close(_) => "region.close", - region_request::Request::Alter(_) => "region.alter", - region_request::Request::Flush(_) => "region.flush", - region_request::Request::Compact(_) => "region.compact", + region_request::Body::Inserts(_) => "region.inserts", + region_request::Body::Deletes(_) => "region.deletes", + region_request::Body::Create(_) => "region.create", + region_request::Body::Drop(_) => "region.drop", + region_request::Body::Open(_) => "region.open", + region_request::Body::Close(_) => "region.close", + region_request::Body::Alter(_) => "region.alter", + region_request::Body::Flush(_) => "region.flush", + region_request::Body::Compact(_) => "region.compact", } } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 0b4186a3e9..099edcbcef 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -556,6 +556,16 @@ pub enum Error { location: Location, source: BoxedError, }, + + #[snafu(display( + "Failed to build region requests, location:{}, source: {}", + location, + source + ))] + BuildRegionRequests { + location: Location, + source: store_api::metadata::MetadataError, + }, } pub type Result = std::result::Result; @@ -569,6 +579,7 @@ impl ErrorExt for Error { | ExecuteStatement { source, .. } | ExecuteLogicalPlan { source, .. } => source.status_code(), + BuildRegionRequests { source, .. } => source.status_code(), HandleHeartbeatResponse { source, .. } => source.status_code(), DecodeLogicalPlan { source, .. } => source.status_code(), diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index ae41a00f80..0719d9e336 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -16,15 +16,18 @@ use std::any::Any; use std::collections::HashMap; use std::sync::{Arc, Mutex, RwLock}; -use api::v1::region::region_request::Request as RequestBody; -use api::v1::region::{QueryRequest, RegionResponse}; +use api::v1::region::{region_request, QueryRequest, RegionResponse}; +use api::v1::{ResponseHeader, Status}; use arrow_flight::{FlightData, Ticket}; use async_trait::async_trait; use bytes::Bytes; +use common_error::ext::BoxedError; +use common_error::status_code::StatusCode; use common_query::logical_plan::Expr; use common_query::physical_plan::DfPhysicalPlanAdapter; use common_query::{DfPhysicalPlan, Output}; use common_recordbatch::SendableRecordBatchStream; +use common_runtime::Runtime; use common_telemetry::info; use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; @@ -35,10 +38,10 @@ use datafusion::execution::context::SessionState; use datafusion_common::DataFusionError; use datafusion_expr::{Expr as DfExpr, TableType}; use datatypes::arrow::datatypes::SchemaRef; +use futures_util::future::try_join_all; use prost::Message; use query::QueryEngineRef; -use servers::error as servers_error; -use servers::error::Result as ServerResult; +use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult}; use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream}; use servers::grpc::region_server::RegionServerHandler; use session::context::QueryContext; @@ -52,9 +55,9 @@ use table::table::scan::StreamScanAdapter; use tonic::{Request, Response, Result as TonicResult}; use crate::error::{ - DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu, - HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result, - UnsupportedOutputSnafu, + BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, + GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, + RegionNotFoundSnafu, Result, UnsupportedOutputSnafu, }; #[derive(Clone)] @@ -63,9 +66,9 @@ pub struct RegionServer { } impl RegionServer { - pub fn new(query_engine: QueryEngineRef) -> Self { + pub fn new(query_engine: QueryEngineRef, runtime: Arc) -> Self { Self { - inner: Arc::new(RegionServerInner::new(query_engine)), + inner: Arc::new(RegionServerInner::new(query_engine, runtime)), } } @@ -88,8 +91,47 @@ impl RegionServer { #[async_trait] impl RegionServerHandler for RegionServer { - async fn handle(&self, _request: RequestBody) -> ServerResult { - todo!() + async fn handle(&self, request: region_request::Body) -> ServerResult { + let requests = RegionRequest::try_from_request_body(request) + .context(BuildRegionRequestsSnafu) + .map_err(BoxedError::new) + .context(ExecuteGrpcRequestSnafu)?; + let join_tasks = requests.into_iter().map(|(region_id, req)| { + let self_to_move = self.clone(); + self.inner + .runtime + .spawn(async move { self_to_move.handle_request(region_id, req).await }) + }); + + let results = try_join_all(join_tasks) + .await + .context(servers_error::JoinTaskSnafu)?; + + // merge results by simply sum up affected rows. + // only insert/delete will have multiple results. + let mut affected_rows = 0; + for result in results { + match result + .map_err(BoxedError::new) + .context(servers_error::ExecuteGrpcRequestSnafu)? + { + Output::AffectedRows(rows) => affected_rows += rows, + Output::Stream(_) | Output::RecordBatches(_) => { + // TODO: change the output type to only contains `affected_rows` + unreachable!() + } + } + } + + Ok(RegionResponse { + header: Some(ResponseHeader { + status: Some(Status { + status_code: StatusCode::Success as _, + ..Default::default() + }), + }), + affected_rows: affected_rows as _, + }) } } @@ -114,14 +156,16 @@ struct RegionServerInner { engines: RwLock>, region_map: DashMap, query_engine: QueryEngineRef, + runtime: Arc, } impl RegionServerInner { - pub fn new(query_engine: QueryEngineRef) -> Self { + pub fn new(query_engine: QueryEngineRef, runtime: Arc) -> Self { Self { engines: RwLock::new(HashMap::new()), region_map: DashMap::new(), query_engine, + runtime, } } diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index dbc2ececc9..6a00241095 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -54,7 +54,7 @@ impl Services { .context(RuntimeResourceSnafu)?, ); - let region_server = RegionServer::new(instance.query_engine()); + let region_server = RegionServer::new(instance.query_engine(), grpc_runtime.clone()); let flight_handler = if enable_region_server { Some(Arc::new(region_server.clone()) as _) } else { diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 1b9c938b72..736c2a41a2 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -19,10 +19,15 @@ mod tests; use std::sync::Arc; +use async_trait::async_trait; +use common_error::ext::BoxedError; use common_query::Output; +use common_recordbatch::SendableRecordBatchStream; use object_store::ObjectStore; use snafu::{OptionExt, ResultExt}; use store_api::logstore::LogStore; +use store_api::metadata::RegionMetadataRef; +use store_api::region_engine::RegionEngine; use store_api::region_request::RegionRequest; use store_api::storage::{RegionId, ScanRequest}; @@ -106,6 +111,15 @@ impl EngineInner { self.workers.stop().await } + fn get_metadata(&self, region_id: RegionId) -> Result { + // Reading a region doesn't need to go through the region worker thread. + let region = self + .workers + .get_region(region_id) + .context(RegionNotFoundSnafu { region_id })?; + Ok(region.metadata()) + } + /// Handles [RequestBody] and return its executed result. async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result { // We validate and then convert the `request` into an inner `RequestBody` for ease of handling. @@ -134,3 +148,38 @@ impl EngineInner { scan_region.scanner() } } + +#[async_trait] +impl RegionEngine for MitoEngine { + fn name(&self) -> &str { + "MitoEngine" + } + + async fn handle_request( + &self, + region_id: RegionId, + request: RegionRequest, + ) -> std::result::Result { + self.inner + .handle_request(region_id, request) + .await + .map_err(BoxedError::new) + } + + /// Handle substrait query and return a stream of record batches + async fn handle_query( + &self, + _region_id: RegionId, + _request: ScanRequest, + ) -> std::result::Result { + todo!() + } + + /// Retrieve region's metadata. + async fn get_metadata( + &self, + region_id: RegionId, + ) -> std::result::Result { + self.inner.get_metadata(region_id).map_err(BoxedError::new) + } +} diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 1e583c2c2f..0cc6b9b1db 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -89,6 +89,12 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("{source}"))] + ExecuteGrpcRequest { + location: Location, + source: BoxedError, + }, + #[snafu(display("Failed to check database validity, source: {}", source))] CheckDatabaseValidity { location: Location, @@ -374,6 +380,7 @@ impl ErrorExt for Error { | ExecuteQuery { source, .. } | ExecutePlan { source, .. } | ExecuteGrpcQuery { source, .. } + | ExecuteGrpcRequest { source, .. } | CheckDatabaseValidity { source, .. } => source.status_code(), NotSupported { .. } diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index c3beda0f70..1e71b15b71 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -26,7 +26,7 @@ use api::v1::greptime_database_server::GreptimeDatabase; use api::v1::greptime_database_server::GreptimeDatabaseServer; use api::v1::health_check_server::{HealthCheck, HealthCheckServer}; use api::v1::prometheus_gateway_server::{PrometheusGateway, PrometheusGatewayServer}; -use api::v1::region::region_server_server::RegionServerServer; +use api::v1::region::region_server::RegionServer; use api::v1::{HealthCheckRequest, HealthCheckResponse}; #[cfg(feature = "testing")] use arrow_flight::flight_service_server::FlightService; @@ -224,7 +224,7 @@ impl Server for GrpcServer { ))) } if let Some(region_server_handler) = &self.region_server_handler { - builder = builder.add_service(RegionServerServer::new(region_server_handler.clone())) + builder = builder.add_service(RegionServer::new(region_server_handler.clone())) } let (serve_state_tx, serve_state_rx) = oneshot::channel(); diff --git a/src/servers/src/grpc/region_server.rs b/src/servers/src/grpc/region_server.rs index e3a7c06673..1bcccc6c30 100644 --- a/src/servers/src/grpc/region_server.rs +++ b/src/servers/src/grpc/region_server.rs @@ -16,9 +16,8 @@ use std::sync::Arc; use api::helper::region_request_type; use api::v1::auth_header::AuthScheme; -use api::v1::region::region_request::Request as RequestBody; -use api::v1::region::region_server_server::RegionServer as RegionServerService; -use api::v1::region::{RegionRequest, RegionResponse}; +use api::v1::region::region_server::Region as RegionServer; +use api::v1::region::{region_request, RegionRequest, RegionResponse}; use api::v1::{Basic, RequestHeader}; use async_trait::async_trait; use auth::{Identity, Password, UserInfoRef, UserProviderRef}; @@ -42,7 +41,7 @@ use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_CODE_LABEL}; #[async_trait] pub trait RegionServerHandler: Send + Sync { - async fn handle(&self, request: RequestBody) -> Result; + async fn handle(&self, request: region_request::Body) -> Result; } pub type RegionServerHandlerRef = Arc; @@ -68,7 +67,7 @@ impl RegionServerRequestHandler { } async fn handle(&self, request: RegionRequest) -> Result { - let query = request.request.context(InvalidQuerySnafu { + let query = request.body.context(InvalidQuerySnafu { reason: "Expecting non-empty GreptimeRequest.", })?; @@ -183,7 +182,7 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte } #[async_trait] -impl RegionServerService for RegionServerRequestHandler { +impl RegionServer for RegionServerRequestHandler { async fn handle( &self, request: Request, diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index 17aadef896..863d3411c3 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -20,12 +20,14 @@ use std::any::Any; use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use api::helper::ColumnDataTypeWrapper; +use api::v1::region::ColumnDef; use api::v1::SemanticType; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use datatypes::arrow::datatypes::FieldRef; use datatypes::prelude::DataType; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef}; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize}; use snafu::{ensure, Location, OptionExt, ResultExt, Snafu}; @@ -45,6 +47,32 @@ pub struct ColumnMetadata { pub column_id: ColumnId, } +impl ColumnMetadata { + /// Construct `Self` from protobuf struct [ColumnDef] + pub fn try_from_column_def(column_def: ColumnDef) -> Result { + let semantic_type = column_def.semantic_type(); + let column_id = column_def.column_id; + + let default_constrain = if column_def.default_constraint.is_empty() { + None + } else { + Some( + ColumnDefaultConstraint::try_from(column_def.default_constraint.as_slice()) + .context(ConvertDatatypesSnafu)?, + ) + }; + let data_type = ColumnDataTypeWrapper::new(column_def.datatype()).into(); + let column_schema = ColumnSchema::new(column_def.name, data_type, column_def.is_nullable) + .with_default_constraint(default_constrain) + .context(ConvertDatatypesSnafu)?; + Ok(Self { + column_schema, + semantic_type, + column_id, + }) + } +} + #[cfg_attr(doc, aquamarine::aquamarine)] /// General static metadata of a region. /// @@ -460,6 +488,16 @@ pub enum MetadataError { location: Location, source: serde_json::Error, }, + + #[snafu(display( + "Failed to convert with struct from datatypes, location: {}, source: {}", + location, + source + ))] + ConvertDatatypes { + location: Location, + source: datatypes::error::Error, + }, } impl ErrorExt for MetadataError { diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 818bbe4f20..dfcca0a98d 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -14,13 +14,15 @@ use std::collections::HashMap; +use api::v1::region::region_request; use api::v1::Rows; -use crate::metadata::ColumnMetadata; -use crate::storage::{AlterRequest, ColumnId, ScanRequest}; +use crate::metadata::{ColumnMetadata, MetadataError}; +use crate::storage::{AlterRequest, ColumnId, RegionId, ScanRequest}; #[derive(Debug)] pub enum RegionRequest { + // TODO: rename to InsertRequest Put(RegionPutRequest), Delete(RegionDeleteRequest), Create(RegionCreateRequest), @@ -32,6 +34,85 @@ pub enum RegionRequest { Compact(RegionCompactRequest), } +impl RegionRequest { + /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id. + /// Inserts/Deletes request might become multiple requests. Others are one-to-one. + // TODO: implement alter request + #[allow(unreachable_code)] + pub fn try_from_request_body( + body: region_request::Body, + ) -> Result, MetadataError> { + match body { + region_request::Body::Inserts(inserts) => Ok(inserts + .requests + .into_iter() + .filter_map(|r| { + let region_id = r.region_id.into(); + r.rows + .map(|rows| (region_id, Self::Put(RegionPutRequest { rows }))) + }) + .collect()), + region_request::Body::Deletes(deletes) => Ok(deletes + .requests + .into_iter() + .filter_map(|r| { + let region_id = r.region_id.into(); + r.rows + .map(|rows| (region_id, Self::Delete(RegionDeleteRequest { rows }))) + }) + .collect()), + region_request::Body::Create(create) => { + let column_metadatas = create + .column_defs + .into_iter() + .map(ColumnMetadata::try_from_column_def) + .collect::, _>>()?; + Ok(vec![( + create.region_id.into(), + Self::Create(RegionCreateRequest { + engine: create.engine, + column_metadatas, + primary_key: create.primary_key, + create_if_not_exists: create.create_if_not_exists, + options: create.options, + region_dir: create.region_dir, + }), + )]) + } + region_request::Body::Drop(drop) => Ok(vec![( + drop.region_id.into(), + Self::Drop(RegionDropRequest {}), + )]), + region_request::Body::Open(open) => Ok(vec![( + open.region_id.into(), + Self::Open(RegionOpenRequest { + engine: open.engine, + region_dir: open.region_dir, + options: open.options, + }), + )]), + region_request::Body::Close(close) => Ok(vec![( + close.region_id.into(), + Self::Close(RegionCloseRequest {}), + )]), + region_request::Body::Alter(alter) => Ok(vec![( + alter.region_id.into(), + Self::Alter(RegionAlterRequest { + request: unimplemented!(), + }), + )]), + region_request::Body::Flush(flush) => Ok(vec![( + flush.region_id.into(), + Self::Flush(RegionFlushRequest {}), + )]), + region_request::Body::Compact(compact) => Ok(vec![( + compact.region_id.into(), + Self::Compact(RegionCompactRequest {}), + )]), + } + } +} + /// Request to put data into a region. #[derive(Debug)] pub struct RegionPutRequest {