feat: batch put logical regions

This commit is contained in:
evenyag
2025-02-10 15:38:55 +08:00
parent 06ebe6b3fb
commit c2b556e321
4 changed files with 154 additions and 3 deletions

View File

@@ -59,7 +59,8 @@ use store_api::region_engine::{
SettableRegionRoleState,
};
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionPutRequest, RegionRequest,
AffectedRows, BatchRegionRequest, RegionCloseRequest, RegionOpenRequest, RegionPutRequest,
RegionRequest,
};
use store_api::storage::RegionId;
use tokio::sync::{Semaphore, SemaphorePermit};
@@ -830,6 +831,14 @@ impl RegionServerInner {
_ => unreachable!(),
}
for (_, (engine, request)) in engine_requests {
// TODO(yingwen): Error for batch request.
engine
.handle_batch_request(BatchRegionRequest::Put(request))
.await
.unwrap();
}
// match engine
// .handle_request(region_id, request)
// .await

View File

@@ -42,7 +42,7 @@ use store_api::region_engine::{
RegionEngine, RegionRole, RegionScannerRef, RegionStatistic, SetRegionRoleStateResponse,
SettableRegionRoleState,
};
use store_api::region_request::RegionRequest;
use store_api::region_request::{BatchRegionRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use self::state::MetricEngineState;
@@ -175,6 +175,16 @@ impl RegionEngine for MetricEngine {
})
}
async fn handle_batch_request(&self, request: BatchRegionRequest) -> Result<(), BoxedError> {
match request {
BatchRegionRequest::Put(put) => self
.inner
.batch_put_region(put)
.await
.map_err(BoxedError::new),
}
}
async fn handle_query(
&self,
region_id: RegionId,

View File

@@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use api::v1::{Rows, WriteHint};
use common_telemetry::{error, info};
use snafu::{ensure, OptionExt};
@@ -50,6 +52,27 @@ impl MetricEngineInner {
}
}
/// Dispatch batch region put request
pub async fn batch_put_region(
&self,
requests: Vec<(RegionId, RegionPutRequest)>,
) -> Result<()> {
{
let state = self.state.read().unwrap();
for region_id in requests.iter().map(|(region_id, _)| region_id) {
if state.physical_region_states().contains_key(region_id) {
info!("Metric region received put request on physical region {region_id:?}");
FORBIDDEN_OPERATION_COUNT.inc();
return ForbiddenPhysicalAlterSnafu.fail();
}
}
}
self.batch_put_logical_regions(requests).await?;
Ok(())
}
async fn put_logical_region(
&self,
logical_region_id: RegionId,
@@ -98,6 +121,110 @@ impl MetricEngineInner {
self.data_region.write_data(data_region_id, request).await
}
async fn batch_put_logical_regions(
&self,
requests: Vec<(RegionId, RegionPutRequest)>,
) -> Result<AffectedRows> {
let _timer = MITO_OPERATION_ELAPSED
.with_label_values(&["put"])
.start_timer();
let mut physical_requests = HashMap::with_capacity(1);
// Group requests by physical region, also verify put requests.
{
let state = self.state.read().unwrap();
for (logical_region_id, request) in requests {
let physical_region_id = *state
.logical_regions()
.get(&logical_region_id)
.with_context(|| LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
})?;
let data_region_id = to_data_region_id(physical_region_id);
// Check if a physical column exists.
let physical_columns = state
.physical_region_states()
.get(&data_region_id)
.context(PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.physical_columns();
for col in &request.rows.schema {
ensure!(
physical_columns.contains_key(&col.column_name),
ColumnNotFoundSnafu {
name: col.column_name.clone(),
region_id: logical_region_id,
}
);
}
physical_requests
.entry(physical_region_id)
.or_insert_with(Vec::new)
.push((logical_region_id, request));
}
}
let mut affected_rows = 0;
for (physical_region_id, mut requests) in physical_requests {
if requests.is_empty() {
continue;
}
let data_region_id = to_data_region_id(physical_region_id);
let primary_key_encoding = {
let state = self.state.read().unwrap();
state.get_primary_key_encoding(data_region_id).context(
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
},
)?
};
for (logical_region_id, request) in &mut requests {
self.modify_rows(
physical_region_id,
logical_region_id.table_id(),
&mut request.rows,
primary_key_encoding,
)?;
}
let total_rows = requests
.iter()
.map(|(_, request)| request.rows.rows.len())
.sum::<usize>();
if primary_key_encoding == PrimaryKeyEncoding::Sparse {
let mut merged_request = RegionPutRequest {
rows: Rows {
schema: requests[0].1.rows.schema.clone(),
rows: Vec::with_capacity(total_rows),
},
hint: None,
};
merged_request.hint = Some(WriteHint {
primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
});
for (_, mut request) in requests {
merged_request.rows.rows.append(&mut request.rows.rows);
}
self.data_region
.write_data(data_region_id, merged_request)
.await?;
} else {
for (_, request) in requests {
self.data_region.write_data(data_region_id, request).await?;
}
}
affected_rows += total_rows;
}
Ok(affected_rows)
}
/// Verifies a put request for a logical region against its corresponding metadata region.
///
/// Includes:

View File

@@ -33,7 +33,7 @@ use tokio::sync::Semaphore;
use crate::logstore::entry;
use crate::metadata::RegionMetadataRef;
use crate::region_request::{RegionOpenRequest, RegionRequest};
use crate::region_request::{BatchRegionRequest, RegionOpenRequest, RegionRequest};
use crate::storage::{RegionId, ScanRequest};
/// The settable region role state.
@@ -414,6 +414,11 @@ pub trait RegionEngine: Send + Sync {
request: RegionRequest,
) -> Result<RegionResponse, BoxedError>;
async fn handle_batch_request(&self, request: BatchRegionRequest) -> Result<(), BoxedError> {
let _ = request;
unimplemented!()
}
/// Handles query and return a scanner that can be used to scan the region concurrently.
async fn handle_query(
&self,