feat: impl region engine for mito (#2269)

* update proto

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* convert request

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update proto

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* import result convertor

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename symbols

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-08-28 04:24:12 -05:00
committed by GitHub
parent e2522dff21
commit 6b8cf0bbf0
12 changed files with 269 additions and 36 deletions

2
Cargo.lock generated
View File

@@ -4153,7 +4153,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=3489b4742150abe0a769faf1bb60fbb95b061fc8#3489b4742150abe0a769faf1bb60fbb95b061fc8"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=39b0ea8d086d0ab762046b0f473aa3ef8bd347f9#39b0ea8d086d0ab762046b0f473aa3ef8bd347f9"
dependencies = [
"prost",
"serde",

View File

@@ -77,7 +77,7 @@ datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git
derive_builder = "0.12"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "3489b4742150abe0a769faf1bb60fbb95b061fc8" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "39b0ea8d086d0ab762046b0f473aa3ef8bd347f9" }
itertools = "0.10"
lazy_static = "1.4"
once_cell = "1.18"

View File

@@ -56,6 +56,10 @@ impl ColumnDataTypeWrapper {
Ok(Self(datatype))
}
pub fn new(datatype: ColumnDataType) -> Self {
Self(datatype)
}
pub fn datatype(&self) -> ColumnDataType {
self.0
}
@@ -330,17 +334,17 @@ fn query_request_type(request: &QueryRequest) -> &'static str {
}
/// Returns the type name of the [RegionRequest].
pub fn region_request_type(request: &region_request::Request) -> &'static str {
pub fn region_request_type(request: &region_request::Body) -> &'static str {
match request {
region_request::Request::Inserts(_) => "region.inserts",
region_request::Request::Deletes(_) => "region.deletes",
region_request::Request::Create(_) => "region.create",
region_request::Request::Drop(_) => "region.drop ",
region_request::Request::Open(_) => "region.open",
region_request::Request::Close(_) => "region.close",
region_request::Request::Alter(_) => "region.alter",
region_request::Request::Flush(_) => "region.flush",
region_request::Request::Compact(_) => "region.compact",
region_request::Body::Inserts(_) => "region.inserts",
region_request::Body::Deletes(_) => "region.deletes",
region_request::Body::Create(_) => "region.create",
region_request::Body::Drop(_) => "region.drop",
region_request::Body::Open(_) => "region.open",
region_request::Body::Close(_) => "region.close",
region_request::Body::Alter(_) => "region.alter",
region_request::Body::Flush(_) => "region.flush",
region_request::Body::Compact(_) => "region.compact",
}
}

View File

@@ -556,6 +556,16 @@ pub enum Error {
location: Location,
source: BoxedError,
},
#[snafu(display(
"Failed to build region requests, location:{}, source: {}",
location,
source
))]
BuildRegionRequests {
location: Location,
source: store_api::metadata::MetadataError,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -569,6 +579,7 @@ impl ErrorExt for Error {
| ExecuteStatement { source, .. }
| ExecuteLogicalPlan { source, .. } => source.status_code(),
BuildRegionRequests { source, .. } => source.status_code(),
HandleHeartbeatResponse { source, .. } => source.status_code(),
DecodeLogicalPlan { source, .. } => source.status_code(),

View File

@@ -16,15 +16,18 @@ use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use api::v1::region::region_request::Request as RequestBody;
use api::v1::region::{QueryRequest, RegionResponse};
use api::v1::region::{region_request, QueryRequest, RegionResponse};
use api::v1::{ResponseHeader, Status};
use arrow_flight::{FlightData, Ticket};
use async_trait::async_trait;
use bytes::Bytes;
use common_error::ext::BoxedError;
use common_error::status_code::StatusCode;
use common_query::logical_plan::Expr;
use common_query::physical_plan::DfPhysicalPlanAdapter;
use common_query::{DfPhysicalPlan, Output};
use common_recordbatch::SendableRecordBatchStream;
use common_runtime::Runtime;
use common_telemetry::info;
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
@@ -35,10 +38,10 @@ use datafusion::execution::context::SessionState;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr as DfExpr, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use futures_util::future::try_join_all;
use prost::Message;
use query::QueryEngineRef;
use servers::error as servers_error;
use servers::error::Result as ServerResult;
use servers::error::{self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use session::context::QueryContext;
@@ -52,9 +55,9 @@ use table::table::scan::StreamScanAdapter;
use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu,
HandleRegionRequestSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu, Result,
UnsupportedOutputSnafu,
BuildRegionRequestsSnafu, DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu,
GetRegionMetadataSnafu, HandleRegionRequestSnafu, RegionEngineNotFoundSnafu,
RegionNotFoundSnafu, Result, UnsupportedOutputSnafu,
};
#[derive(Clone)]
@@ -63,9 +66,9 @@ pub struct RegionServer {
}
impl RegionServer {
pub fn new(query_engine: QueryEngineRef) -> Self {
pub fn new(query_engine: QueryEngineRef, runtime: Arc<Runtime>) -> Self {
Self {
inner: Arc::new(RegionServerInner::new(query_engine)),
inner: Arc::new(RegionServerInner::new(query_engine, runtime)),
}
}
@@ -88,8 +91,47 @@ impl RegionServer {
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, _request: RequestBody) -> ServerResult<RegionResponse> {
todo!()
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponse> {
let requests = RegionRequest::try_from_request_body(request)
.context(BuildRegionRequestsSnafu)
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
self.inner
.runtime
.spawn(async move { self_to_move.handle_request(region_id, req).await })
});
let results = try_join_all(join_tasks)
.await
.context(servers_error::JoinTaskSnafu)?;
// merge results by simply sum up affected rows.
// only insert/delete will have multiple results.
let mut affected_rows = 0;
for result in results {
match result
.map_err(BoxedError::new)
.context(servers_error::ExecuteGrpcRequestSnafu)?
{
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
// TODO: change the output type to only contains `affected_rows`
unreachable!()
}
}
}
Ok(RegionResponse {
header: Some(ResponseHeader {
status: Some(Status {
status_code: StatusCode::Success as _,
..Default::default()
}),
}),
affected_rows: affected_rows as _,
})
}
}
@@ -114,14 +156,16 @@ struct RegionServerInner {
engines: RwLock<HashMap<String, RegionEngineRef>>,
region_map: DashMap<RegionId, RegionEngineRef>,
query_engine: QueryEngineRef,
runtime: Arc<Runtime>,
}
impl RegionServerInner {
pub fn new(query_engine: QueryEngineRef) -> Self {
pub fn new(query_engine: QueryEngineRef, runtime: Arc<Runtime>) -> Self {
Self {
engines: RwLock::new(HashMap::new()),
region_map: DashMap::new(),
query_engine,
runtime,
}
}

View File

@@ -54,7 +54,7 @@ impl Services {
.context(RuntimeResourceSnafu)?,
);
let region_server = RegionServer::new(instance.query_engine());
let region_server = RegionServer::new(instance.query_engine(), grpc_runtime.clone());
let flight_handler = if enable_region_server {
Some(Arc::new(region_server.clone()) as _)
} else {

View File

@@ -19,10 +19,15 @@ mod tests;
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use object_store::ObjectStore;
use snafu::{OptionExt, ResultExt};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
@@ -106,6 +111,15 @@ impl EngineInner {
self.workers.stop().await
}
fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef> {
// Reading a region doesn't need to go through the region worker thread.
let region = self
.workers
.get_region(region_id)
.context(RegionNotFoundSnafu { region_id })?;
Ok(region.metadata())
}
/// Handles [RequestBody] and return its executed result.
async fn handle_request(&self, region_id: RegionId, request: RegionRequest) -> Result<Output> {
// We validate and then convert the `request` into an inner `RequestBody` for ease of handling.
@@ -134,3 +148,38 @@ impl EngineInner {
scan_region.scanner()
}
}
#[async_trait]
impl RegionEngine for MitoEngine {
fn name(&self) -> &str {
"MitoEngine"
}
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> std::result::Result<Output, BoxedError> {
self.inner
.handle_request(region_id, request)
.await
.map_err(BoxedError::new)
}
/// Handle substrait query and return a stream of record batches
async fn handle_query(
&self,
_region_id: RegionId,
_request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
todo!()
}
/// Retrieve region's metadata.
async fn get_metadata(
&self,
region_id: RegionId,
) -> std::result::Result<RegionMetadataRef, BoxedError> {
self.inner.get_metadata(region_id).map_err(BoxedError::new)
}
}

View File

@@ -89,6 +89,12 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("{source}"))]
ExecuteGrpcRequest {
location: Location,
source: BoxedError,
},
#[snafu(display("Failed to check database validity, source: {}", source))]
CheckDatabaseValidity {
location: Location,
@@ -374,6 +380,7 @@ impl ErrorExt for Error {
| ExecuteQuery { source, .. }
| ExecutePlan { source, .. }
| ExecuteGrpcQuery { source, .. }
| ExecuteGrpcRequest { source, .. }
| CheckDatabaseValidity { source, .. } => source.status_code(),
NotSupported { .. }

View File

@@ -26,7 +26,7 @@ use api::v1::greptime_database_server::GreptimeDatabase;
use api::v1::greptime_database_server::GreptimeDatabaseServer;
use api::v1::health_check_server::{HealthCheck, HealthCheckServer};
use api::v1::prometheus_gateway_server::{PrometheusGateway, PrometheusGatewayServer};
use api::v1::region::region_server_server::RegionServerServer;
use api::v1::region::region_server::RegionServer;
use api::v1::{HealthCheckRequest, HealthCheckResponse};
#[cfg(feature = "testing")]
use arrow_flight::flight_service_server::FlightService;
@@ -224,7 +224,7 @@ impl Server for GrpcServer {
)))
}
if let Some(region_server_handler) = &self.region_server_handler {
builder = builder.add_service(RegionServerServer::new(region_server_handler.clone()))
builder = builder.add_service(RegionServer::new(region_server_handler.clone()))
}
let (serve_state_tx, serve_state_rx) = oneshot::channel();

View File

@@ -16,9 +16,8 @@ use std::sync::Arc;
use api::helper::region_request_type;
use api::v1::auth_header::AuthScheme;
use api::v1::region::region_request::Request as RequestBody;
use api::v1::region::region_server_server::RegionServer as RegionServerService;
use api::v1::region::{RegionRequest, RegionResponse};
use api::v1::region::region_server::Region as RegionServer;
use api::v1::region::{region_request, RegionRequest, RegionResponse};
use api::v1::{Basic, RequestHeader};
use async_trait::async_trait;
use auth::{Identity, Password, UserInfoRef, UserProviderRef};
@@ -42,7 +41,7 @@ use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_CODE_LABEL};
#[async_trait]
pub trait RegionServerHandler: Send + Sync {
async fn handle(&self, request: RequestBody) -> Result<RegionResponse>;
async fn handle(&self, request: region_request::Body) -> Result<RegionResponse>;
}
pub type RegionServerHandlerRef = Arc<dyn RegionServerHandler>;
@@ -68,7 +67,7 @@ impl RegionServerRequestHandler {
}
async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
let query = request.request.context(InvalidQuerySnafu {
let query = request.body.context(InvalidQuerySnafu {
reason: "Expecting non-empty GreptimeRequest.",
})?;
@@ -183,7 +182,7 @@ pub(crate) fn create_query_context(header: Option<&RequestHeader>) -> QueryConte
}
#[async_trait]
impl RegionServerService for RegionServerRequestHandler {
impl RegionServer for RegionServerRequestHandler {
async fn handle(
&self,
request: Request<RegionRequest>,

View File

@@ -20,12 +20,14 @@ use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::helper::ColumnDataTypeWrapper;
use api::v1::region::ColumnDef;
use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use datatypes::arrow::datatypes::FieldRef;
use datatypes::prelude::DataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema, SchemaRef};
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ensure, Location, OptionExt, ResultExt, Snafu};
@@ -45,6 +47,32 @@ pub struct ColumnMetadata {
pub column_id: ColumnId,
}
impl ColumnMetadata {
/// Construct `Self` from protobuf struct [ColumnDef]
pub fn try_from_column_def(column_def: ColumnDef) -> Result<Self> {
let semantic_type = column_def.semantic_type();
let column_id = column_def.column_id;
let default_constrain = if column_def.default_constraint.is_empty() {
None
} else {
Some(
ColumnDefaultConstraint::try_from(column_def.default_constraint.as_slice())
.context(ConvertDatatypesSnafu)?,
)
};
let data_type = ColumnDataTypeWrapper::new(column_def.datatype()).into();
let column_schema = ColumnSchema::new(column_def.name, data_type, column_def.is_nullable)
.with_default_constraint(default_constrain)
.context(ConvertDatatypesSnafu)?;
Ok(Self {
column_schema,
semantic_type,
column_id,
})
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
/// General static metadata of a region.
///
@@ -460,6 +488,16 @@ pub enum MetadataError {
location: Location,
source: serde_json::Error,
},
#[snafu(display(
"Failed to convert with struct from datatypes, location: {}, source: {}",
location,
source
))]
ConvertDatatypes {
location: Location,
source: datatypes::error::Error,
},
}
impl ErrorExt for MetadataError {

View File

@@ -14,13 +14,15 @@
use std::collections::HashMap;
use api::v1::region::region_request;
use api::v1::Rows;
use crate::metadata::ColumnMetadata;
use crate::storage::{AlterRequest, ColumnId, ScanRequest};
use crate::metadata::{ColumnMetadata, MetadataError};
use crate::storage::{AlterRequest, ColumnId, RegionId, ScanRequest};
#[derive(Debug)]
pub enum RegionRequest {
// TODO: rename to InsertRequest
Put(RegionPutRequest),
Delete(RegionDeleteRequest),
Create(RegionCreateRequest),
@@ -32,6 +34,85 @@ pub enum RegionRequest {
Compact(RegionCompactRequest),
}
impl RegionRequest {
/// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
/// Inserts/Deletes request might become multiple requests. Others are one-to-one.
// TODO: implement alter request
#[allow(unreachable_code)]
pub fn try_from_request_body(
body: region_request::Body,
) -> Result<Vec<(RegionId, Self)>, MetadataError> {
match body {
region_request::Body::Inserts(inserts) => Ok(inserts
.requests
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows
.map(|rows| (region_id, Self::Put(RegionPutRequest { rows })))
})
.collect()),
region_request::Body::Deletes(deletes) => Ok(deletes
.requests
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows
.map(|rows| (region_id, Self::Delete(RegionDeleteRequest { rows })))
})
.collect()),
region_request::Body::Create(create) => {
let column_metadatas = create
.column_defs
.into_iter()
.map(ColumnMetadata::try_from_column_def)
.collect::<Result<Vec<_>, _>>()?;
Ok(vec![(
create.region_id.into(),
Self::Create(RegionCreateRequest {
engine: create.engine,
column_metadatas,
primary_key: create.primary_key,
create_if_not_exists: create.create_if_not_exists,
options: create.options,
region_dir: create.region_dir,
}),
)])
}
region_request::Body::Drop(drop) => Ok(vec![(
drop.region_id.into(),
Self::Drop(RegionDropRequest {}),
)]),
region_request::Body::Open(open) => Ok(vec![(
open.region_id.into(),
Self::Open(RegionOpenRequest {
engine: open.engine,
region_dir: open.region_dir,
options: open.options,
}),
)]),
region_request::Body::Close(close) => Ok(vec![(
close.region_id.into(),
Self::Close(RegionCloseRequest {}),
)]),
region_request::Body::Alter(alter) => Ok(vec![(
alter.region_id.into(),
Self::Alter(RegionAlterRequest {
request: unimplemented!(),
}),
)]),
region_request::Body::Flush(flush) => Ok(vec![(
flush.region_id.into(),
Self::Flush(RegionFlushRequest {}),
)]),
region_request::Body::Compact(compact) => Ok(vec![(
compact.region_id.into(),
Self::Compact(RegionCompactRequest {}),
)]),
}
}
}
/// Request to put data into a region.
#[derive(Debug)]
pub struct RegionPutRequest {