feat: implement Flight and gRPC services for RegionServer (#2226)

* extract FlightCraft trait

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* split service handler in GrpcServer

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* left grpc server implement

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* start region server if configured

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-08-22 08:30:09 -05:00
committed by GitHub
parent 18fa0e01ed
commit 18250c4803
18 changed files with 453 additions and 71 deletions

View File

@@ -9,6 +9,7 @@ testing = ["meta-srv/mock"]
[dependencies]
api = { workspace = true }
arrow-flight.workspace = true
async-compat = "0.2"
async-stream.workspace = true
async-trait.workspace = true

View File

@@ -14,9 +14,11 @@
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, RwLock};
use api::v1::region::QueryRequest;
use api::v1::region::region_request::Request as RequestBody;
use api::v1::region::{QueryRequest, RegionResponse};
use arrow_flight::{FlightData, Ticket};
use async_trait::async_trait;
use bytes::Bytes;
use common_query::logical_plan::Expr;
@@ -33,7 +35,12 @@ use datafusion::execution::context::SessionState;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr as DfExpr, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use prost::Message;
use query::QueryEngineRef;
use servers::error as servers_error;
use servers::error::Result as ServerResult;
use servers::grpc::flight::{FlightCraft, FlightRecordBatchStream, TonicStream};
use servers::grpc::region_server::RegionServerHandler;
use session::context::QueryContext;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
@@ -42,6 +49,7 @@ use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan};
use table::table::scan::StreamScanAdapter;
use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
DecodeLogicalPlanSnafu, ExecuteLogicalPlanSnafu, GetRegionMetadataSnafu,
@@ -49,24 +57,80 @@ use crate::error::{
UnsupportedOutputSnafu,
};
#[derive(Clone)]
pub struct RegionServer {
engines: HashMap<String, RegionEngineRef>,
region_map: DashMap<RegionId, RegionEngineRef>,
query_engine: QueryEngineRef,
inner: Arc<RegionServerInner>,
}
impl RegionServer {
pub fn new(query_engine: QueryEngineRef) -> Self {
Self {
engines: HashMap::new(),
inner: Arc::new(RegionServerInner::new(query_engine)),
}
}
pub fn register_engine(&mut self, engine: RegionEngineRef) {
self.inner.register_engine(engine);
}
pub async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> Result<Output> {
self.inner.handle_request(region_id, request).await
}
pub async fn handle_read(&self, request: QueryRequest) -> Result<SendableRecordBatchStream> {
self.inner.handle_read(request).await
}
}
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, _request: RequestBody) -> ServerResult<RegionResponse> {
todo!()
}
}
#[async_trait]
impl FlightCraft for RegionServer {
async fn do_get(
&self,
request: Request<Ticket>,
) -> TonicResult<Response<TonicStream<FlightData>>> {
let ticket = request.into_inner().ticket;
let request = QueryRequest::decode(ticket.as_ref())
.context(servers_error::InvalidFlightTicketSnafu)?;
let result = self.handle_read(request).await?;
let stream = Box::pin(FlightRecordBatchStream::new(result));
Ok(Response::new(stream))
}
}
struct RegionServerInner {
engines: RwLock<HashMap<String, RegionEngineRef>>,
region_map: DashMap<RegionId, RegionEngineRef>,
query_engine: QueryEngineRef,
}
impl RegionServerInner {
pub fn new(query_engine: QueryEngineRef) -> Self {
Self {
engines: RwLock::new(HashMap::new()),
region_map: DashMap::new(),
query_engine,
}
}
pub fn register_engine(&mut self, engine: RegionEngineRef) {
pub fn register_engine(&self, engine: RegionEngineRef) {
let engine_name = engine.name();
self.engines.insert(engine_name.to_string(), engine);
self.engines
.write()
.unwrap()
.insert(engine_name.to_string(), engine);
}
pub async fn handle_request(
@@ -90,6 +154,8 @@ impl RegionServer {
let engine = match &region_change {
RegionChange::Register(engine_type) => self
.engines
.read()
.unwrap()
.get(engine_type)
.with_context(|| RegionEngineNotFoundSnafu { name: engine_type })?
.clone(),

View File

@@ -31,6 +31,7 @@ use crate::error::{
WaitForGrpcServingSnafu,
};
use crate::instance::InstanceRef;
use crate::region_server::RegionServer;
pub mod grpc;
@@ -42,6 +43,9 @@ pub struct Services {
impl Services {
pub async fn try_new(instance: InstanceRef, opts: &DatanodeOptions) -> Result<Self> {
// TODO(ruihang): remove database service once region server is ready.
let enable_region_server = option_env!("ENABLE_REGION_SERVER").is_some();
let grpc_runtime = Arc::new(
RuntimeBuilder::default()
.worker_threads(opts.rpc_runtime_size)
@@ -50,10 +54,24 @@ impl Services {
.context(RuntimeResourceSnafu)?,
);
let region_server = RegionServer::new(instance.query_engine());
let flight_handler = if enable_region_server {
Some(Arc::new(region_server.clone()) as _)
} else {
None
};
let region_server_handler = if enable_region_server {
Some(Arc::new(region_server.clone()) as _)
} else {
None
};
Ok(Self {
grpc_server: GrpcServer::new(
ServerGrpcQueryHandlerAdaptor::arc(instance),
None,
flight_handler,
region_server_handler,
None,
grpc_runtime,
),