feat: Only allow inserts and deletes operations to be executed in parallel (#3257)

* feat: Only allow inserts and deletes operations to be executed in parallel.

* feat: add comment
This commit is contained in:
JeremyHi
2024-01-29 19:27:06 +08:00
committed by GitHub
parent 691b649f67
commit e5a2b0463a

View File

@@ -217,29 +217,53 @@ impl RegionServer {
#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponse> {
let is_parallel = matches!(
request,
region_request::Body::Inserts(_) | region_request::Body::Deletes(_)
);
let requests = RegionRequest::try_from_request_body(request)
.context(BuildRegionRequestsSnafu)
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
let tracing_context = TracingContext::from_current_span();
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
async move {
self_to_move
let results = if is_parallel {
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
async move {
self_to_move
.handle_request(region_id, req)
.trace(span)
.await
}
});
try_join_all(join_tasks)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?
} else {
let mut results = Vec::with_capacity(requests.len());
// FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately
// modify this part to avoid inefficient serial loop calls.
for (region_id, req) in requests {
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
let result = self
.handle_request(region_id, req)
.trace(span)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
results.push(result);
}
});
let results = try_join_all(join_tasks)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?;
results
};
// merge results by simply sum up affected rows.
// only insert/delete will have multiple results.