mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-24 00:40:40 +00:00
fix: fast path for single region bulk insert (#6104)
* fix/fast-path-for-single-region-bulk-insert: ### Commit Summary - **Refactor `try_decode` Method**: Updated the `try_decode` method in `FlightDecoder` to accept a reference to `FlightData` instead of consuming it. This change affects multiple files including `database.rs`, `region.rs`, `flight.rs`, `bulk_insert.rs`, `stream.rs`, and `region_request.rs`. - **Optimize Bulk Insert Handling**: Added a fast path for handling bulk inserts when only one region is involved in `bulk_insert.rs`. * fix/fast-path-for-single-region-bulk-insert: Improve `FlightDecoder` usage in tests - Updated `try_decode` method calls in `flight.rs` to remove unnecessary references for `d1`, `d2`, and `d3`. - Ensured consistency in handling `FlightMessage` variants within test cases. * fix/fast-path-for-single-region-bulk-insert: **Enhancement: Skip Empty Regions in Bulk Insert** - Updated `bulk_insert.rs` to improve efficiency by skipping regions without data during the bulk insert process. This change ensures that regions with a `true_count` of zero are not processed, optimizing resource usage and performance. * fix/fast-path-for-single-region-bulk-insert: ### Commit Summary - **Refactor `RegionMask` Handling**: - Introduced `RegionMask` struct to encapsulate boolean array and selected rows count. - Updated methods to use `RegionMask` instead of `BooleanArray` for region selection. - Affected files: `bulk_insert.rs`, `multi_dim.rs`, `partition.rs`, `splitter.rs`. - **Optimize Region Selection**: - Removed unnecessary checks for empty regions in `bulk_insert.rs`. - Improved logic for handling default regions in `multi_dim.rs`. - **Update Tests**: - Modified test cases to accommodate `RegionMask` changes. - Affected files: `multi_dim.rs`, `splitter.rs`. * fix/fast-path-for-single-region-bulk-insert: **Enhancements to MultiDimPartitionRule Logic and Tests** - **`multi_dim.rs`**: Improved the logic for selecting rows in `MultiDimPartitionRule` by optimizing the selection process when only one region is present. - **Tests**: Added new test cases to verify the behavior of default regions with unselected rows, existing default regions, and scenarios where all rows are selected. These tests ensure robust handling of partition rules and validate the correct assignment of rows to regions.
This commit is contained in:
@@ -48,7 +48,7 @@ impl Inserter {
|
||||
let body_size = data.data_body.len();
|
||||
// Build region server requests
|
||||
let message = decoder
|
||||
.try_decode(data)
|
||||
.try_decode(&data)
|
||||
.context(error::DecodeFlightDataSnafu)?;
|
||||
let FlightMessage::Recordbatch(rb) = message else {
|
||||
return Ok(0);
|
||||
@@ -82,6 +82,51 @@ impl Inserter {
|
||||
.context(error::SplitInsertSnafu)?;
|
||||
partition_timer.observe_duration();
|
||||
|
||||
// fast path: only one region.
|
||||
if region_masks.len() == 1 {
|
||||
metrics::BULK_REQUEST_ROWS
|
||||
.with_label_values(&["rows_per_region"])
|
||||
.observe(record_batch.num_rows() as f64);
|
||||
|
||||
// SAFETY: region masks length checked
|
||||
let (region_number, _) = region_masks.into_iter().next().unwrap();
|
||||
let region_id = RegionId::new(table_id, region_number);
|
||||
let datanode = self
|
||||
.partition_manager
|
||||
.find_region_leader(region_id)
|
||||
.await
|
||||
.context(error::FindRegionLeaderSnafu)?;
|
||||
let payload = {
|
||||
let _encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["encode"])
|
||||
.start_timer();
|
||||
Bytes::from(data.encode_to_vec())
|
||||
};
|
||||
let request = RegionRequest {
|
||||
header: Some(RegionRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
..Default::default()
|
||||
}),
|
||||
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
|
||||
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
|
||||
region_id: region_id.as_u64(),
|
||||
schema: schema_bytes,
|
||||
payload,
|
||||
})),
|
||||
})),
|
||||
};
|
||||
|
||||
let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["datanode_handle"])
|
||||
.start_timer();
|
||||
let datanode = self.node_manager.datanode(&datanode).await;
|
||||
return datanode
|
||||
.handle(request)
|
||||
.await
|
||||
.context(error::RequestRegionSnafu)
|
||||
.map(|r| r.affected_rows);
|
||||
}
|
||||
|
||||
let mut mask_per_datanode = HashMap::with_capacity(region_masks.len());
|
||||
for (region_number, mask) in region_masks {
|
||||
let region_id = RegionId::new(table_id, region_number);
|
||||
@@ -104,6 +149,7 @@ impl Inserter {
|
||||
let record_batch_schema =
|
||||
Arc::new(Schema::try_from(record_batch.schema()).context(error::ConvertSchemaSnafu)?);
|
||||
|
||||
let mut raw_data_bytes = None;
|
||||
for (peer, masks) in mask_per_datanode {
|
||||
for (region_id, mask) in masks {
|
||||
let rb = record_batch.clone();
|
||||
@@ -111,30 +157,45 @@ impl Inserter {
|
||||
let record_batch_schema = record_batch_schema.clone();
|
||||
let node_manager = self.node_manager.clone();
|
||||
let peer = peer.clone();
|
||||
let raw_data = if mask.select_all() {
|
||||
Some(
|
||||
raw_data_bytes
|
||||
.get_or_insert_with(|| Bytes::from(data.encode_to_vec()))
|
||||
.clone(),
|
||||
)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let handle: common_runtime::JoinHandle<error::Result<api::region::RegionResponse>> =
|
||||
common_runtime::spawn_global(async move {
|
||||
let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["filter"])
|
||||
.start_timer();
|
||||
let rb = arrow::compute::filter_record_batch(&rb, &mask)
|
||||
.context(error::ComputeArrowSnafu)?;
|
||||
filter_timer.observe_duration();
|
||||
metrics::BULK_REQUEST_ROWS
|
||||
.with_label_values(&["rows_per_region"])
|
||||
.observe(rb.num_rows() as f64);
|
||||
|
||||
let encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["encode"])
|
||||
.start_timer();
|
||||
let batch = RecordBatch::try_from_df_record_batch(record_batch_schema, rb)
|
||||
.context(error::BuildRecordBatchSnafu)?;
|
||||
let payload = Bytes::from(
|
||||
FlightEncoder::default()
|
||||
.encode(FlightMessage::Recordbatch(batch))
|
||||
.encode_to_vec(),
|
||||
);
|
||||
encode_timer.observe_duration();
|
||||
let payload = if mask.select_all() {
|
||||
// SAFETY: raw data must be present, we can avoid re-encoding.
|
||||
raw_data.unwrap()
|
||||
} else {
|
||||
let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["filter"])
|
||||
.start_timer();
|
||||
let rb = arrow::compute::filter_record_batch(&rb, mask.array())
|
||||
.context(error::ComputeArrowSnafu)?;
|
||||
filter_timer.observe_duration();
|
||||
metrics::BULK_REQUEST_ROWS
|
||||
.with_label_values(&["rows_per_region"])
|
||||
.observe(rb.num_rows() as f64);
|
||||
|
||||
let encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["encode"])
|
||||
.start_timer();
|
||||
let batch =
|
||||
RecordBatch::try_from_df_record_batch(record_batch_schema, rb)
|
||||
.context(error::BuildRecordBatchSnafu)?;
|
||||
let payload = Bytes::from(
|
||||
FlightEncoder::default()
|
||||
.encode(FlightMessage::Recordbatch(batch))
|
||||
.encode_to_vec(),
|
||||
);
|
||||
encode_timer.observe_duration();
|
||||
payload
|
||||
};
|
||||
let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
||||
.with_label_values(&["datanode_handle"])
|
||||
.start_timer();
|
||||
|
||||
Reference in New Issue
Block a user