diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index adde0e8d07..e798f273e1 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -217,29 +217,53 @@ impl RegionServer { #[async_trait] impl RegionServerHandler for RegionServer { async fn handle(&self, request: region_request::Body) -> ServerResult { + let is_parallel = matches!( + request, + region_request::Body::Inserts(_) | region_request::Body::Deletes(_) + ); let requests = RegionRequest::try_from_request_body(request) .context(BuildRegionRequestsSnafu) .map_err(BoxedError::new) .context(ExecuteGrpcRequestSnafu)?; let tracing_context = TracingContext::from_current_span(); - let join_tasks = requests.into_iter().map(|(region_id, req)| { - let self_to_move = self.clone(); - let span = tracing_context.attach(info_span!( - "RegionServer::handle_region_request", - region_id = region_id.to_string() - )); - async move { - self_to_move + let results = if is_parallel { + let join_tasks = requests.into_iter().map(|(region_id, req)| { + let self_to_move = self.clone(); + let span = tracing_context.attach(info_span!( + "RegionServer::handle_region_request", + region_id = region_id.to_string() + )); + async move { + self_to_move + .handle_request(region_id, req) + .trace(span) + .await + } + }); + + try_join_all(join_tasks) + .await + .map_err(BoxedError::new) + .context(ExecuteGrpcRequestSnafu)? + } else { + let mut results = Vec::with_capacity(requests.len()); + // FIXME(jeremy, ruihang): Once the engine supports merged calls, we should immediately + // modify this part to avoid inefficient serial loop calls. + for (region_id, req) in requests { + let span = tracing_context.attach(info_span!( + "RegionServer::handle_region_request", + region_id = region_id.to_string() + )); + let result = self .handle_request(region_id, req) .trace(span) .await + .map_err(BoxedError::new) + .context(ExecuteGrpcRequestSnafu)?; + results.push(result); } - }); - - let results = try_join_all(join_tasks) - .await - .map_err(BoxedError::new) - .context(ExecuteGrpcRequestSnafu)?; + results + }; // merge results by simply sum up affected rows. // only insert/delete will have multiple results.