feat: handle body

This commit is contained in:
evenyag
2025-02-08 17:59:38 +08:00
parent bec8245e75
commit 06ebe6b3fb
2 changed files with 98 additions and 1 deletions

View File

@@ -59,7 +59,7 @@ use store_api::region_engine::{
SettableRegionRoleState,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
@@ -158,6 +158,11 @@ impl RegionServer {
self.inner.handle_request(region_id, request).await
}
#[tracing::instrument(skip_all, fields(request_type = "Put"))]
pub async fn handle_batch_body(&self, body: region_request::Body) -> Result<RegionResponse> {
self.inner.handle_batch_body(body).await
}
async fn table_provider(&self, region_id: RegionId) -> Result<Arc<dyn TableProvider>> {
let status = self
.inner
@@ -786,6 +791,64 @@ impl RegionServerInner {
}
}
async fn handle_batch_body(&self, body: region_request::Body) -> Result<RegionResponse> {
let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED
.with_label_values(&["Put"])
.start_timer();
// Group requests by engine.
let mut engine_requests: HashMap<
String,
(RegionEngineRef, Vec<(RegionId, RegionPutRequest)>),
> = HashMap::with_capacity(1);
match body {
region_request::Body::Inserts(inserts) => {
let num_requests = inserts.requests.len();
for request in inserts.requests {
let region_id = RegionId::from_u64(request.region_id);
let CurrentEngine::Engine(engine) =
self.get_engine(region_id, &RegionChange::None)?
else {
continue;
};
let Some(rows) = request.rows else {
continue;
};
match engine_requests.get_mut(engine.name()) {
Some((_, requests)) => {
requests.push((region_id, RegionPutRequest { rows, hint: None }))
}
None => {
let mut requests = Vec::with_capacity(num_requests);
requests.push((region_id, RegionPutRequest { rows, hint: None }));
engine_requests.insert(engine.name().to_string(), (engine, requests));
}
}
}
}
_ => unreachable!(),
}
// match engine
// .handle_request(region_id, request)
// .await
// .with_context(|_| HandleRegionRequestSnafu { region_id })
// {
// Ok(result) => {
// Ok(RegionResponse {
// affected_rows: result.affected_rows,
// extensions: result.extensions,
// })
// }
// Err(err) => {
// Err(err)
// }
// }
todo!()
}
fn set_region_status_not_ready(
&self,
region_id: RegionId,

View File

@@ -89,6 +89,27 @@ impl RegionRequest {
}
}
/// Requests for multiple regions.
#[derive(Debug)]
pub enum BatchRegionRequest {
Put(Vec<(RegionId, RegionPutRequest)>),
}
impl BatchRegionRequest {
/// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
pub fn try_from_request_body(body: region_request::Body) -> Result<Self> {
match body {
region_request::Body::Inserts(inserts) => {
make_batch_region_puts(inserts).map(BatchRegionRequest::Put)
}
_ => InvalidRawRegionRequestSnafu {
err: "unsupported batch region request",
}
.fail(),
}
}
}
fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let requests = inserts
.requests
@@ -106,6 +127,19 @@ fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequ
Ok(requests)
}
fn make_batch_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionPutRequest)>> {
let requests = inserts
.requests
.into_iter()
.filter_map(|r| {
let region_id = r.region_id.into();
r.rows
.map(|rows| (region_id, RegionPutRequest { rows, hint: None }))
})
.collect();
Ok(requests)
}
fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
let requests = deletes
.requests