refactor: introduce RegionRequestBundle::Vector

This commit is contained in:
WenyXu
2025-02-10 12:08:40 +00:00
parent 95b388d819
commit 764a57b80a
3 changed files with 59 additions and 53 deletions

View File

@@ -38,6 +38,7 @@ use datafusion::datasource::{provider_as_source, TableProvider};
use datafusion::error::Result as DfResult;
use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRewriter};
use datafusion_expr::{LogicalPlan, TableSource};
use futures::future::try_join_all;
use metric_engine::engine::MetricEngine;
use mito2::engine::MITO_ENGINE_NAME;
use prost::Message;
@@ -367,6 +368,40 @@ impl RegionServer {
async fn handle_batch_request(&self, batch: BatchRegionRequest) -> Result<RegionResponse> {
self.inner.handle_batch_request(batch).await
}
async fn handle_vector_request(
&self,
requests: Vec<(RegionId, RegionRequest)>,
) -> Result<RegionResponse> {
let tracing_context = TracingContext::from_current_span();
let join_tasks = requests.into_iter().map(|(region_id, req)| {
let self_to_move = self.clone();
let span = tracing_context.attach(info_span!(
"RegionServer::handle_region_request",
region_id = region_id.to_string()
));
async move {
self_to_move
.handle_request(region_id, req)
.trace(span)
.await
}
});
let results = try_join_all(join_tasks).await?;
let mut affected_rows = 0;
let mut extensions = HashMap::new();
for result in results {
affected_rows += result.affected_rows;
extensions.extend(result.extensions);
}
Ok(RegionResponse {
affected_rows,
extensions,
})
}
}
#[async_trait]
@@ -383,6 +418,11 @@ impl RegionServerHandler for RegionServer {
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?,
RegionRequestBundle::Vector(requests) => self
.handle_vector_request(requests)
.await
.map_err(BoxedError::new)
.context(ExecuteGrpcRequestSnafu)?,
RegionRequestBundle::Batch(requests) => self
.handle_batch_request(requests)
.await
@@ -733,14 +773,6 @@ impl RegionServerInner {
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::None))
.collect::<Vec<_>>(),
BatchRegionRequest::Delete(requests) => requests
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::None))
.collect::<Vec<_>>(),
BatchRegionRequest::Put(requests) => requests
.iter()
.map(|(region_id, _)| (*region_id, RegionChange::None))
.collect::<Vec<_>>(),
};
let (first_region_id, first_region_change) = region_changes.first().unwrap();

View File

@@ -156,22 +156,6 @@ impl RegionEngine for MetricEngine {
)
.await
}
BatchRegionRequest::Delete(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Delete(req))),
)
.await
}
BatchRegionRequest::Put(requests) => {
self.handle_requests(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Put(req))),
)
.await
}
}
}

View File

@@ -85,15 +85,11 @@ pub fn convert_body_to_requests(body: region_request::Body) -> Result<RegionRequ
// Batch requests
region_request::Body::Inserts(inserts) => {
let requests = make_region_puts(inserts)?;
Ok(RegionRequestBundle::new_batch(BatchRegionRequest::Put(
requests,
)))
Ok(RegionRequestBundle::new_vector(requests))
}
region_request::Body::Deletes(deletes) => {
let requests = make_region_deletes(deletes)?;
Ok(RegionRequestBundle::new_batch(BatchRegionRequest::Delete(
requests,
)))
Ok(RegionRequestBundle::new_vector(requests))
}
region_request::Body::Creates(creates) => {
let requests = make_region_creates(creates.requests)?;
@@ -119,6 +115,7 @@ pub fn convert_body_to_requests(body: region_request::Body) -> Result<RegionRequ
/// A bundle of region requests.
pub enum RegionRequestBundle {
Batch(BatchRegionRequest),
Vector(Vec<(RegionId, RegionRequest)>),
Single((RegionId, RegionRequest)),
}
@@ -131,12 +128,8 @@ impl RegionRequestBundle {
Self::Single((region_id, request))
}
pub fn is_batch(&self) -> bool {
matches!(self, RegionRequestBundle::Batch(_))
}
pub fn is_single(&self) -> bool {
matches!(self, RegionRequestBundle::Single(_))
pub fn new_vector(requests: Vec<(RegionId, RegionRequest)>) -> Self {
Self::Vector(requests)
}
}
@@ -145,8 +138,6 @@ pub enum BatchRegionRequest {
Create(Vec<(RegionId, RegionCreateRequest)>),
Drop(Vec<(RegionId, RegionDropRequest)>),
Alter(Vec<(RegionId, RegionAlterRequest)>),
Put(Vec<(RegionId, RegionPutRequest)>),
Delete(Vec<(RegionId, RegionDeleteRequest)>),
}
impl BatchRegionRequest {
@@ -168,16 +159,6 @@ impl BatchRegionRequest {
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Alter(req))),
),
BatchRegionRequest::Put(requests) => Box::new(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Put(req))),
),
BatchRegionRequest::Delete(requests) => Box::new(
requests
.into_iter()
.map(|(region_id, req)| (region_id, RegionRequest::Delete(req))),
),
}
}
@@ -280,26 +261,35 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<(RegionId, RegionTr
Ok((region_id, RegionTruncateRequest {}))
}
fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionPutRequest)>> {
fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let requests = inserts
.requests
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows
.map(|rows| (region_id, RegionPutRequest { rows, hint: None }))
r.rows.map(|rows| {
(
region_id,
RegionRequest::Put(RegionPutRequest { rows, hint: None }),
)
})
})
.collect();
Ok(requests)
}
fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionDeleteRequest)>> {
fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let requests = deletes
.requests
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows.map(|rows| (region_id, RegionDeleteRequest { rows }))
r.rows.map(|rows| {
(
region_id,
RegionRequest::Delete(RegionDeleteRequest { rows }),
)
})
})
.collect();
Ok(requests)