diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 43baf32ec3..6bec1ebf13 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -59,7 +59,7 @@ use store_api::region_engine::{ SettableRegionRoleState, }; use store_api::region_request::{ - AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, + AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, }; use store_api::storage::RegionId; use tokio::sync::{Semaphore, SemaphorePermit}; @@ -158,6 +158,11 @@ impl RegionServer { self.inner.handle_request(region_id, request).await } + #[tracing::instrument(skip_all, fields(request_type = "Put"))] + pub async fn handle_batch_body(&self, body: region_request::Body) -> Result { + self.inner.handle_batch_body(body).await + } + async fn table_provider(&self, region_id: RegionId) -> Result> { let status = self .inner @@ -786,6 +791,64 @@ impl RegionServerInner { } } + async fn handle_batch_body(&self, body: region_request::Body) -> Result { + let _timer = crate::metrics::HANDLE_REGION_REQUEST_ELAPSED + .with_label_values(&["Put"]) + .start_timer(); + + // Group requests by engine. + let mut engine_requests: HashMap< + String, + (RegionEngineRef, Vec<(RegionId, RegionPutRequest)>), + > = HashMap::with_capacity(1); + match body { + region_request::Body::Inserts(inserts) => { + let num_requests = inserts.requests.len(); + for request in inserts.requests { + let region_id = RegionId::from_u64(request.region_id); + let CurrentEngine::Engine(engine) = + self.get_engine(region_id, &RegionChange::None)? + else { + continue; + }; + let Some(rows) = request.rows else { + continue; + }; + + match engine_requests.get_mut(engine.name()) { + Some((_, requests)) => { + requests.push((region_id, RegionPutRequest { rows, hint: None })) + } + None => { + let mut requests = Vec::with_capacity(num_requests); + requests.push((region_id, RegionPutRequest { rows, hint: None })); + engine_requests.insert(engine.name().to_string(), (engine, requests)); + } + } + } + } + _ => unreachable!(), + } + + // match engine + // .handle_request(region_id, request) + // .await + // .with_context(|_| HandleRegionRequestSnafu { region_id }) + // { + // Ok(result) => { + // Ok(RegionResponse { + // affected_rows: result.affected_rows, + // extensions: result.extensions, + // }) + // } + // Err(err) => { + // Err(err) + // } + // } + + todo!() + } + fn set_region_status_not_ready( &self, region_id: RegionId, diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index b391441bd5..eaf79d6d27 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -89,6 +89,27 @@ impl RegionRequest { } } +/// Requests for multiple regions. +#[derive(Debug)] +pub enum BatchRegionRequest { + Put(Vec<(RegionId, RegionPutRequest)>), +} + +impl BatchRegionRequest { + /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id. + pub fn try_from_request_body(body: region_request::Body) -> Result { + match body { + region_request::Body::Inserts(inserts) => { + make_batch_region_puts(inserts).map(BatchRegionRequest::Put) + } + _ => InvalidRawRegionRequestSnafu { + err: "unsupported batch region request", + } + .fail(), + } + } +} + fn make_region_puts(inserts: InsertRequests) -> Result> { let requests = inserts .requests @@ -106,6 +127,19 @@ fn make_region_puts(inserts: InsertRequests) -> Result Result> { + let requests = inserts + .requests + .into_iter() + .filter_map(|r| { + let region_id = r.region_id.into(); + r.rows + .map(|rows| (region_id, RegionPutRequest { rows, hint: None })) + }) + .collect(); + Ok(requests) +} + fn make_region_deletes(deletes: DeleteRequests) -> Result> { let requests = deletes .requests