diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index 6bec1ebf13..13a39ef7ce 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -59,7 +59,8 @@ use store_api::region_engine::{ SettableRegionRoleState, }; use store_api::region_request::{ - AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest, + AffectedRows, BatchRegionRequest, RegionCloseRequest, RegionOpenRequest, RegionPutRequest, + RegionRequest, }; use store_api::storage::RegionId; use tokio::sync::{Semaphore, SemaphorePermit}; @@ -830,6 +831,14 @@ impl RegionServerInner { _ => unreachable!(), } + for (_, (engine, request)) in engine_requests { + // TODO(yingwen): Error for batch request. + engine + .handle_batch_request(BatchRegionRequest::Put(request)) + .await + .unwrap(); + } + // match engine // .handle_request(region_id, request) // .await diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 3ada60f824..9e47094fc4 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -42,7 +42,7 @@ use store_api::region_engine::{ RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse, SettableRegionRoleState, }; -use store_api::region_request::RegionRequest; +use store_api::region_request::{BatchRegionRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use self::state::MetricEngineState; @@ -175,6 +175,16 @@ impl RegionEngine for MetricEngine { }) } + async fn handle_batch_request(&self, request: BatchRegionRequest) -> Result<(), BoxedError> { + match request { + BatchRegionRequest::Put(put) => self + .inner + .batch_put_region(put) + .await + .map_err(BoxedError::new), + } + } + async fn handle_query( &self, region_id: RegionId, diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index c7dc44be1e..0154adb84e 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use api::v1::{Rows, WriteHint}; use common_telemetry::{error, info}; use snafu::{ensure, OptionExt}; @@ -50,6 +52,27 @@ impl MetricEngineInner { } } + /// Dispatch batch region put request + pub async fn batch_put_region( + &self, + requests: Vec<(RegionId, RegionPutRequest)>, + ) -> Result<()> { + { + let state = self.state.read().unwrap(); + for region_id in requests.iter().map(|(region_id, _)| region_id) { + if state.physical_region_states().contains_key(region_id) { + info!("Metric region received put request on physical region {region_id:?}"); + FORBIDDEN_OPERATION_COUNT.inc(); + + return ForbiddenPhysicalAlterSnafu.fail(); + } + } + } + + self.batch_put_logical_regions(requests).await?; + Ok(()) + } + async fn put_logical_region( &self, logical_region_id: RegionId, @@ -98,6 +121,110 @@ impl MetricEngineInner { self.data_region.write_data(data_region_id, request).await } + async fn batch_put_logical_regions( + &self, + requests: Vec<(RegionId, RegionPutRequest)>, + ) -> Result { + let _timer = MITO_OPERATION_ELAPSED + .with_label_values(&["put"]) + .start_timer(); + + let mut physical_requests = HashMap::with_capacity(1); + // Group requests by physical region, also verify put requests. + { + let state = self.state.read().unwrap(); + for (logical_region_id, request) in requests { + let physical_region_id = *state + .logical_regions() + .get(&logical_region_id) + .with_context(|| LogicalRegionNotFoundSnafu { + region_id: logical_region_id, + })?; + let data_region_id = to_data_region_id(physical_region_id); + // Check if a physical column exists. + let physical_columns = state + .physical_region_states() + .get(&data_region_id) + .context(PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + })? + .physical_columns(); + for col in &request.rows.schema { + ensure!( + physical_columns.contains_key(&col.column_name), + ColumnNotFoundSnafu { + name: col.column_name.clone(), + region_id: logical_region_id, + } + ); + } + + physical_requests + .entry(physical_region_id) + .or_insert_with(Vec::new) + .push((logical_region_id, request)); + } + } + + let mut affected_rows = 0; + for (physical_region_id, mut requests) in physical_requests { + if requests.is_empty() { + continue; + } + + let data_region_id = to_data_region_id(physical_region_id); + let primary_key_encoding = { + let state = self.state.read().unwrap(); + state.get_primary_key_encoding(data_region_id).context( + PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + }, + )? + }; + + for (logical_region_id, request) in &mut requests { + self.modify_rows( + physical_region_id, + logical_region_id.table_id(), + &mut request.rows, + primary_key_encoding, + )?; + } + + let total_rows = requests + .iter() + .map(|(_, request)| request.rows.rows.len()) + .sum::(); + if primary_key_encoding == PrimaryKeyEncoding::Sparse { + let mut merged_request = RegionPutRequest { + rows: Rows { + schema: requests[0].1.rows.schema.clone(), + rows: Vec::with_capacity(total_rows), + }, + hint: None, + }; + merged_request.hint = Some(WriteHint { + primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(), + }); + for (_, mut request) in requests { + merged_request.rows.rows.append(&mut request.rows.rows); + } + + self.data_region + .write_data(data_region_id, merged_request) + .await?; + } else { + for (_, request) in requests { + self.data_region.write_data(data_region_id, request).await?; + } + } + + affected_rows += total_rows; + } + + Ok(affected_rows) + } + /// Verifies a put request for a logical region against its corresponding metadata region. /// /// Includes: diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index c9b0ac53db..1b1ef45a00 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -33,7 +33,7 @@ use tokio::sync::Semaphore; use crate::logstore::entry; use crate::metadata::RegionMetadataRef; -use crate::region_request::{RegionOpenRequest, RegionRequest}; +use crate::region_request::{BatchRegionRequest, RegionOpenRequest, RegionRequest}; use crate::storage::{RegionId, ScanRequest}; /// The settable region role state. @@ -414,6 +414,11 @@ pub trait RegionEngine: Send + Sync { request: RegionRequest, ) -> Result; + async fn handle_batch_request(&self, request: BatchRegionRequest) -> Result<(), BoxedError> { + let _ = request; + unimplemented!() + } + /// Handles query and return a scanner that can be used to scan the region concurrently. async fn handle_query( &self,