diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index d54138c3e3..f54d0219c0 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -38,6 +38,7 @@ use datafusion::datasource::{provider_as_source, TableProvider}; use datafusion::error::Result as DfResult; use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter}; use datafusion_expr::{LogicalPlan, TableSource}; +use futures::future::try_join_all; use metric_engine::engine::MetricEngine; use mito2::engine::MITO_ENGINE_NAME; use prost::Message; @@ -367,6 +368,40 @@ impl RegionServer { async fn handle_batch_request(&self, batch: BatchRegionRequest) -> Result { self.inner.handle_batch_request(batch).await } + + async fn handle_vector_request( + &self, + requests: Vec<(RegionId, RegionRequest)>, + ) -> Result { + let tracing_context = TracingContext::from_current_span(); + + let join_tasks = requests.into_iter().map(|(region_id, req)| { + let self_to_move = self.clone(); + let span = tracing_context.attach(info_span!( + "RegionServer::handle_region_request", + region_id = region_id.to_string() + )); + async move { + self_to_move + .handle_request(region_id, req) + .trace(span) + .await + } + }); + + let results = try_join_all(join_tasks).await?; + let mut affected_rows = 0; + let mut extensions = HashMap::new(); + for result in results { + affected_rows += result.affected_rows; + extensions.extend(result.extensions); + } + + Ok(RegionResponse { + affected_rows, + extensions, + }) + } } #[async_trait] @@ -383,6 +418,11 @@ impl RegionServerHandler for RegionServer { .await .map_err(BoxedError::new) .context(ExecuteGrpcRequestSnafu)?, + RegionRequestBundle::Vector(requests) => self + .handle_vector_request(requests) + .await + .map_err(BoxedError::new) + .context(ExecuteGrpcRequestSnafu)?, RegionRequestBundle::Batch(requests) => self .handle_batch_request(requests) .await @@ -733,14 +773,6 @@ impl RegionServerInner { .iter() .map(|(region_id, _)| (*region_id, RegionChange::None)) .collect::>(), - BatchRegionRequest::Delete(requests) => requests - .iter() - .map(|(region_id, _)| (*region_id, RegionChange::None)) - .collect::>(), - BatchRegionRequest::Put(requests) => requests - .iter() - .map(|(region_id, _)| (*region_id, RegionChange::None)) - .collect::>(), }; let (first_region_id, first_region_change) = region_changes.first().unwrap(); diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 8fcc5052f6..39af8fe253 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -156,22 +156,6 @@ impl RegionEngine for MetricEngine { ) .await } - BatchRegionRequest::Delete(requests) => { - self.handle_requests( - requests - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Delete(req))), - ) - .await - } - BatchRegionRequest::Put(requests) => { - self.handle_requests( - requests - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Put(req))), - ) - .await - } } } diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index ea61bb66d6..f24475298d 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -85,15 +85,11 @@ pub fn convert_body_to_requests(body: region_request::Body) -> Result { let requests = make_region_puts(inserts)?; - Ok(RegionRequestBundle::new_batch(BatchRegionRequest::Put( - requests, - ))) + Ok(RegionRequestBundle::new_vector(requests)) } region_request::Body::Deletes(deletes) => { let requests = make_region_deletes(deletes)?; - Ok(RegionRequestBundle::new_batch(BatchRegionRequest::Delete( - requests, - ))) + Ok(RegionRequestBundle::new_vector(requests)) } region_request::Body::Creates(creates) => { let requests = make_region_creates(creates.requests)?; @@ -119,6 +115,7 @@ pub fn convert_body_to_requests(body: region_request::Body) -> Result), Single((RegionId, RegionRequest)), } @@ -131,12 +128,8 @@ impl RegionRequestBundle { Self::Single((region_id, request)) } - pub fn is_batch(&self) -> bool { - matches!(self, RegionRequestBundle::Batch(_)) - } - - pub fn is_single(&self) -> bool { - matches!(self, RegionRequestBundle::Single(_)) + pub fn new_vector(requests: Vec<(RegionId, RegionRequest)>) -> Self { + Self::Vector(requests) } } @@ -145,8 +138,6 @@ pub enum BatchRegionRequest { Create(Vec<(RegionId, RegionCreateRequest)>), Drop(Vec<(RegionId, RegionDropRequest)>), Alter(Vec<(RegionId, RegionAlterRequest)>), - Put(Vec<(RegionId, RegionPutRequest)>), - Delete(Vec<(RegionId, RegionDeleteRequest)>), } impl BatchRegionRequest { @@ -168,16 +159,6 @@ impl BatchRegionRequest { .into_iter() .map(|(region_id, req)| (region_id, RegionRequest::Alter(req))), ), - BatchRegionRequest::Put(requests) => Box::new( - requests - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Put(req))), - ), - BatchRegionRequest::Delete(requests) => Box::new( - requests - .into_iter() - .map(|(region_id, req)| (region_id, RegionRequest::Delete(req))), - ), } } @@ -280,26 +261,35 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<(RegionId, RegionTr Ok((region_id, RegionTruncateRequest {})) } -fn make_region_puts(inserts: InsertRequests) -> Result> { +fn make_region_puts(inserts: InsertRequests) -> Result> { let requests = inserts .requests .into_iter() .filter_map(|r| { let region_id = r.region_id.into(); - r.rows - .map(|rows| (region_id, RegionPutRequest { rows, hint: None })) + r.rows.map(|rows| { + ( + region_id, + RegionRequest::Put(RegionPutRequest { rows, hint: None }), + ) + }) }) .collect(); Ok(requests) } -fn make_region_deletes(deletes: DeleteRequests) -> Result> { +fn make_region_deletes(deletes: DeleteRequests) -> Result> { let requests = deletes .requests .into_iter() .filter_map(|r| { let region_id = r.region_id.into(); - r.rows.map(|rows| (region_id, RegionDeleteRequest { rows })) + r.rows.map(|rows| { + ( + region_id, + RegionRequest::Delete(RegionDeleteRequest { rows }), + ) + }) }) .collect(); Ok(requests)