refactor: refactor region server request handling (#5504)

* refactor: refactor region server requests handling

* chore: apply suggestions from CR
This commit is contained in:
Weny Xu
2025-02-12 17:34:42 +09:00
committed by GitHub
parent 8026b1d72c
commit 44fffdec8b
4 changed files with 274 additions and 67 deletions

View File

@@ -260,6 +260,14 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to handle batch ddl request, ddl_type: {}", ddl_type))]
HandleBatchDdlRequest {
#[snafu(implicit)]
location: Location,
source: BoxedError,
ddl_type: String,
},
#[snafu(display("RegionId {} not found", region_id))]
RegionNotFound {
region_id: RegionId,
@@ -438,7 +446,8 @@ impl ErrorExt for Error {
UnsupportedOutput { .. } => StatusCode::Unsupported,
HandleRegionRequest { source, .. }
| GetRegionMetadata { source, .. }
| HandleBatchOpenRequest { source, .. } => source.status_code(),
| HandleBatchOpenRequest { source, .. }
| HandleBatchDdlRequest { source, .. } => source.status_code(),
StopRegionEngine { source, .. } => source.status_code(),
FindLogicalRegions { source, .. } => source.status_code(),

View File

@@ -59,7 +59,7 @@ use store_api::region_engine::{
SettableRegionRoleState,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
AffectedRows, BatchRegionDdlRequest, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
@@ -69,9 +69,10 @@ use tonic::{Request, Response, Result as TonicResult};
use crate::error::{
self, BuildRegionRequestsSnafu, ConcurrentQueryLimiterClosedSnafu,
ConcurrentQueryLimiterTimeoutSnafu, DataFusionSnafu, DecodeLogicalPlanSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchOpenRequestSnafu,
HandleRegionRequestSnafu, NewPlanDecoderSnafu, RegionEngineNotFoundSnafu, RegionNotFoundSnafu,
RegionNotReadySnafu, Result, StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
ExecuteLogicalPlanSnafu, FindLogicalRegionsSnafu, HandleBatchDdlRequestSnafu,
HandleBatchOpenRequestSnafu, HandleRegionRequestSnafu, NewPlanDecoderSnafu,
RegionEngineNotFoundSnafu, RegionNotFoundSnafu, RegionNotReadySnafu, Result,
StopRegionEngineSnafu, UnexpectedSnafu, UnsupportedOutputSnafu,
};
use crate::event_listener::RegionServerEventListenerRef;
@@ -344,62 +345,47 @@ impl RegionServer {
.region_map
.insert(region_id, RegionEngineWithStatus::Ready(engine));
}
}
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
let is_parallel = matches!(
request,
region_request::Body::Inserts(_) | region_request::Body::Deletes(_)
);
let requests = RegionRequest::try_from_request_body(request)
.context(BuildRegionRequestsSnafu)
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
async fn handle_batch_ddl_requests(
&self,
request: region_request::Body,
) -> Result<RegionResponse> {
// Safety: we have already checked the request type in `RegionServer::handle()`.
let batch_request = BatchRegionDdlRequest::try_from_request_body(request)
.context(BuildRegionRequestsSnafu)?
.unwrap();
let tracing_context = TracingContext::from_current_span();
let results = if is_parallel {
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
async move {
self_to_move
.handle_request(region_id, req)
.trace(span)
.await
}
});
let span = tracing_context.attach(info_span!("RegionServer::handle_batch_ddl_requests"));
self.inner
.handle_batch_request(batch_request)
.trace(span)
.await
}
try_join_all(join_tasks)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?
} else {
let mut results = Vec::with_capacity(requests.len());
// FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
// modify this part to avoid inefficient serial loop calls.
for (region_id, req) in requests {
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
let result = self
async fn handle_requests_in_parallel(
&self,
request: region_request::Body,
) -> Result<RegionResponse> {
let requests =
RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
let tracing_context = TracingContext::from_current_span();
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self;
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
async move {
self_to_move
.handle_request(region_id, req)
.trace(span)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
results.push(result);
}
results
};
});
// merge results by sum up affected rows and merge extensions.
let results = try_join_all(join_tasks).await?;
let mut affected_rows = 0;
let mut extensions = HashMap::new();
for result in results {
@@ -407,6 +393,57 @@ impl RegionServerHandler for RegionServer {
extensions.extend(result.extensions);
}
Ok(RegionResponse {
affected_rows,
extensions,
})
}
async fn handle_requests_in_serial(
&self,
request: region_request::Body,
) -> Result<RegionResponse> {
let requests =
RegionRequest::try_from_request_body(request).context(BuildRegionRequestsSnafu)?;
let tracing_context = TracingContext::from_current_span();
let mut affected_rows = 0;
let mut extensions = HashMap::new();
// FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
// modify this part to avoid inefficient serial loop calls.
for (region_id, req) in requests {
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
let result = self.handle_request(region_id, req).trace(span).await?;
affected_rows += result.affected_rows;
extensions.extend(result.extensions);
}
Ok(RegionResponse {
affected_rows,
extensions,
})
}
}
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
let response = match &request {
region_request::Body::Creates(_)
| region_request::Body::Drops(_)
| region_request::Body::Alters(_) => self.handle_batch_ddl_requests(request).await,
region_request::Body::Inserts(_) | region_request::Body::Deletes(_) => {
self.handle_requests_in_parallel(request).await
}
_ => self.handle_requests_in_serial(request).await,
}
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
Ok(RegionResponseV1 {
header: Some(ResponseHeader {
status: Some(Status {
@@ -414,8 +451,8 @@ impl RegionServerHandler for RegionServer {
..Default::default()
}),
}),
affected_rows: affected_rows as _,
extensions,
affected_rows: response.affected_rows as _,
extensions: response.extensions,
})
}
}
@@ -727,6 +764,71 @@ impl RegionServerInner {
.collect::<Vec<_>>())
}
// Handle requests in batch.
//
// limitation: all create requests must be in the same engine.
pub async fn handle_batch_request(
&self,
batch_request: BatchRegionDdlRequest,
) -> Result<RegionResponse> {
let region_changes = match &batch_request {
BatchRegionDdlRequest::Create(requests) => requests
.iter()
.map(|(region_id, create)| {
let attribute = parse_region_attribute(&create.engine, &create.options)?;
Ok((*region_id, RegionChange::Register(attribute)))
})
.collect::<Result<Vec<_>>>()?,
BatchRegionDdlRequest::Drop(requests) => requests
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::Deregisters))
.collect::<Vec<_>>(),
BatchRegionDdlRequest::Alter(requests) => requests
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::None))
.collect::<Vec<_>>(),
};
// The ddl procedure will ensure all requests are in the same engine.
// Therefore, we can get the engine from the first request.
let (first_region_id, first_region_change) = region_changes.first().unwrap();
let engine = match self.get_engine(*first_region_id, first_region_change)? {
CurrentEngine::Engine(engine) => engine,
CurrentEngine::EarlyReturn(rows) => return Ok(RegionResponse::new(rows)),
};
for (region_id, region_change) in region_changes.iter() {
self.set_region_status_not_ready(*region_id, &engine, region_change);
}
let ddl_type = batch_request.request_type();
let result = engine
.handle_batch_ddl_requests(batch_request)
.await
.context(HandleBatchDdlRequestSnafu { ddl_type });
match result {
Ok(result) => {
for (region_id, region_change) in region_changes {
self.set_region_status_ready(region_id, engine.clone(), region_change)
.await?;
}
Ok(RegionResponse {
affected_rows: result.affected_rows,
extensions: result.extensions,
})
}
Err(err) => {
for (region_id, region_change) in region_changes {
self.unset_region_status(region_id, &engine, region_change);
}
Err(err)
}
}
}
pub async fn handle_request(
&self,
region_id: RegionId,

View File

@@ -15,6 +15,7 @@
//! Region Engine's definition
use std::any::Any;
use std::collections::HashMap;
use std::fmt::{Debug, Display};
use std::sync::{Arc, Mutex};
@@ -33,7 +34,7 @@ use tokio::sync::Semaphore;
use crate::logstore::entry;
use crate::metadata::RegionMetadataRef;
use crate::region_request::{RegionOpenRequest, RegionRequest};
use crate::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
use crate::storage::{RegionId, ScanRequest};
/// The settable region role state.
@@ -407,6 +408,27 @@ pub trait RegionEngine: Send + Sync {
Ok(join_all(tasks).await)
}
async fn handle_batch_ddl_requests(
&self,
request: BatchRegionDdlRequest,
) -> Result<RegionResponse, BoxedError> {
let requests = request.into_region_requests();
let mut affected_rows = 0;
let mut extensions = HashMap::new();
for (region_id, request) in requests {
let result = self.handle_request(region_id, request).await?;
affected_rows += result.affected_rows;
extensions.extend(result.extensions);
}
Ok(RegionResponse {
affected_rows,
extensions,
})
}
/// Handles non-query request to the region. Returns the count of affected rows.
async fn handle_request(
&self,

View File

@@ -47,6 +47,67 @@ use crate::mito_engine_options::{
use crate::path_utils::region_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};
#[derive(Debug, IntoStaticStr)]
pub enum BatchRegionDdlRequest {
Create(Vec<(RegionId, RegionCreateRequest)>),
Drop(Vec<(RegionId, RegionDropRequest)>),
Alter(Vec<(RegionId, RegionAlterRequest)>),
}
impl BatchRegionDdlRequest {
/// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
match body {
region_request::Body::Creates(creates) => {
let requests = creates
.requests
.into_iter()
.map(parse_region_create)
.collect::<Result<Vec<_>>>()?;
Ok(Some(Self::Create(requests)))
}
region_request::Body::Drops(drops) => {
let requests = drops
.requests
.into_iter()
.map(parse_region_drop)
.collect::<Result<Vec<_>>>()?;
Ok(Some(Self::Drop(requests)))
}
region_request::Body::Alters(alters) => {
let requests = alters
.requests
.into_iter()
.map(parse_region_alter)
.collect::<Result<Vec<_>>>()?;
Ok(Some(Self::Alter(requests)))
}
_ => Ok(None),
}
}
pub fn request_type(&self) -> &'static str {
self.into()
}
pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
match self {
Self::Create(requests) => requests
.into_iter()
.map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
.collect(),
Self::Drop(requests) => requests
.into_iter()
.map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
.collect(),
Self::Alter(requests) => requests
.into_iter()
.map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
.collect(),
}
}
}
#[derive(Debug, IntoStaticStr)]
pub enum RegionRequest {
Put(RegionPutRequest),
@@ -123,7 +184,7 @@ fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionR
Ok(requests)
}
fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
let column_metadatas = create
.column_defs
.into_iter()
@@ -131,16 +192,21 @@ fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequ
.collect::<Result<Vec<_>>>()?;
let region_id = create.region_id.into();
let region_dir = region_dir(&create.path, region_id);
Ok(vec![(
Ok((
region_id,
RegionRequest::Create(RegionCreateRequest {
RegionCreateRequest {
engine: create.engine,
column_metadatas,
primary_key: create.primary_key,
options: create.options,
region_dir,
}),
)])
},
))
}
fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let (region_id, request) = parse_region_create(create)?;
Ok(vec![(region_id, RegionRequest::Create(request))])
}
fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
@@ -151,9 +217,14 @@ fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionR
Ok(requests)
}
fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
let region_id = drop.region_id.into();
Ok(vec![(region_id, RegionRequest::Drop(RegionDropRequest {}))])
Ok((region_id, RegionDropRequest {}))
}
fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let (region_id, request) = parse_region_drop(drop)?;
Ok(vec![(region_id, RegionRequest::Drop(request))])
}
fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
@@ -186,12 +257,15 @@ fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest
)])
}
fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
let region_id = alter.region_id.into();
Ok(vec![(
region_id,
RegionRequest::Alter(RegionAlterRequest::try_from(alter)?),
)])
let request = RegionAlterRequest::try_from(alter)?;
Ok((region_id, request))
}
fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let (region_id, request) = parse_region_alter(alter)?;
Ok(vec![(region_id, RegionRequest::Alter(request))])
}
fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {