mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 04:12:55 +00:00
* feat/bulk-wal: ### Refactor: Simplify Data Handling in LogStore Implementations - **`kafka/log_store.rs`, `raft_engine/log_store.rs`, `wal.rs`, `raw_entry_reader.rs`, `logstore.rs`:** - Refactored `entry` and `build_entry` functions to accept `Vec<u8>` directly instead of `&mut Vec<u8>`. - Removed usage of `std::mem::take` for data handling, simplifying the code and improving readability. - Updated test cases to align with the new function signatures. * feat/bulk-wal: ### Add Support for Bulk WAL Entries and Flight Data Encoding - **Add `raw_data` field to `BulkPart` and related structs**: Updated `BulkPart` and related structures in `src/mito2/src/memtable/bulk/part.rs`, `src/mito2/src/memtable/simple_bulk_memtable.rs`, `src/mito2/src/memtable/time_partition.rs`, `src/mito2/src/region_write_ctx.rs`, `src/mito2/src/worker/handle_bulk_insert.rs`, and `src/store-api/src/region_request.rs` to include a new `raw_data` field for handling Arrow IPC data. - **Implement Flight Data Encoding**: Added a new module `flight` in `src/common/test-util/src/flight.rs` to encode record batches to Flight data format. - **Update `greptime-proto` dependency**: Changed the revision of the `greptime-proto` dependency in `Cargo.lock` and `Cargo.toml`. - **Enhance WAL Writer and Tests**: Modified `src/mito2/src/wal.rs` and related test files to support bulk WAL entries and added tests for encoding and handling bulk data. * feat/bulk-wal: - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`. - **Add `common-grpc` Dependency**: Added `common-grpc` as a dependency in `Cargo.lock` and `src/mito2/Cargo.toml`. - **Refactor `BulkPart` Structure**: Removed `num_rows` field and added `num_rows()` method in `src/mito2/src/memtable/bulk/part.rs`. Updated related usages in `src/mito2/src/memtable/simple_bulk_memtable.rs`, `src/mito2/src/memtable/time_partition.rs`, `src/mito2/src/memtable/time_series.rs`, `src/mito2/src/region_write_ctx.rs`, and `src/mito2/src/worker/handle_bulk_insert.rs`. - **Implement `TryFrom` and `From` for `BulkWalEntry`**: Added implementations for converting between `BulkPart` and `BulkWalEntry` in `src/mito2/src/memtable/bulk/part.rs`. - **Handle Bulk Entries in Region Opener**: Added logic to process bulk entries in `src/mito2/src/region/opener.rs`. - **Fix `BulkInsertRequest` Handling**: Corrected `region_id` handling in `src/operator/src/bulk_insert.rs` and `src/store-api/src/region_request.rs`. - **Add Error Variant for `ConvertBulkWalEntry`**: Added a new error variant in `src/mito2/src/error.rs` for handling bulk WAL entry conversion errors. * fix: ci * feat/bulk-wal: Add bulk write operation in `opener.rs` - Enhanced the region write context by adding a call to `write_bulk()` after `write_memtable()` in `opener.rs`. - This change aims to improve the efficiency of writing operations by enabling bulk writes. * feat/bulk-wal: Enhance error handling and metrics in `bulk_insert.rs` - Updated `Inserter` to improve error handling by capturing the result of `datanode.handle(request)` and incrementing the `DIST_INGEST_ROW_COUNT` metric with the number of affected rows. * feat/bulk-wal: ### Remove Encode Error Handling for WAL Entries - **`error.rs`**: Removed the `EncodeWal` error variant and its associated handling. - **`wal.rs`**: Eliminated the `entry_encode_buf` buffer and its usage for encoding WAL entries. Replaced with direct encoding to a vector using `encode_to_vec()`.
220 lines
9.3 KiB
Rust
220 lines
9.3 KiB
Rust
// Copyright 2023 Greptime Team
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
use ahash::{HashMap, HashMapExt};
|
|
use api::v1::region::{
|
|
bulk_insert_request, region_request, BulkInsertRequest, RegionRequest, RegionRequestHeader,
|
|
};
|
|
use api::v1::ArrowIpc;
|
|
use common_base::AffectedRows;
|
|
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
|
use common_grpc::FlightData;
|
|
use common_telemetry::tracing_context::TracingContext;
|
|
use snafu::ResultExt;
|
|
use store_api::storage::RegionId;
|
|
use table::metadata::TableId;
|
|
|
|
use crate::insert::Inserter;
|
|
use crate::{error, metrics};
|
|
|
|
impl Inserter {
|
|
/// Handle bulk insert request.
|
|
pub async fn handle_bulk_insert(
|
|
&self,
|
|
table_id: TableId,
|
|
decoder: &mut FlightDecoder,
|
|
data: FlightData,
|
|
) -> error::Result<AffectedRows> {
|
|
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
|
.with_label_values(&["decode_request"])
|
|
.start_timer();
|
|
let body_size = data.data_body.len();
|
|
// Build region server requests
|
|
let message = decoder
|
|
.try_decode(&data)
|
|
.context(error::DecodeFlightDataSnafu)?;
|
|
let FlightMessage::RecordBatch(record_batch) = message else {
|
|
return Ok(0);
|
|
};
|
|
decode_timer.observe_duration();
|
|
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
|
|
metrics::BULK_REQUEST_ROWS
|
|
.with_label_values(&["raw"])
|
|
.observe(record_batch.num_rows() as f64);
|
|
|
|
// safety: when reach here schema must be present.
|
|
let schema_bytes = decoder.schema_bytes().unwrap();
|
|
let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
|
.with_label_values(&["partition"])
|
|
.start_timer();
|
|
let partition_rule = self
|
|
.partition_manager
|
|
.find_table_partition_rule(table_id)
|
|
.await
|
|
.context(error::InvalidPartitionSnafu)?;
|
|
|
|
// find partitions for each row in the record batch
|
|
let region_masks = partition_rule
|
|
.split_record_batch(&record_batch)
|
|
.context(error::SplitInsertSnafu)?;
|
|
partition_timer.observe_duration();
|
|
|
|
// fast path: only one region.
|
|
if region_masks.len() == 1 {
|
|
metrics::BULK_REQUEST_ROWS
|
|
.with_label_values(&["rows_per_region"])
|
|
.observe(record_batch.num_rows() as f64);
|
|
|
|
// SAFETY: region masks length checked
|
|
let (region_number, _) = region_masks.into_iter().next().unwrap();
|
|
let region_id = RegionId::new(table_id, region_number);
|
|
let datanode = self
|
|
.partition_manager
|
|
.find_region_leader(region_id)
|
|
.await
|
|
.context(error::FindRegionLeaderSnafu)?;
|
|
let request = RegionRequest {
|
|
header: Some(RegionRequestHeader {
|
|
tracing_context: TracingContext::from_current_span().to_w3c(),
|
|
..Default::default()
|
|
}),
|
|
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
|
|
region_id: region_id.as_u64(),
|
|
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
|
|
schema: schema_bytes,
|
|
data_header: data.data_header,
|
|
payload: data.data_body,
|
|
})),
|
|
})),
|
|
};
|
|
|
|
let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
|
.with_label_values(&["datanode_handle"])
|
|
.start_timer();
|
|
let datanode = self.node_manager.datanode(&datanode).await;
|
|
let result = datanode
|
|
.handle(request)
|
|
.await
|
|
.context(error::RequestRegionSnafu)
|
|
.map(|r| r.affected_rows);
|
|
if let Ok(rows) = result {
|
|
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows as u64);
|
|
}
|
|
return result;
|
|
}
|
|
|
|
let mut mask_per_datanode = HashMap::with_capacity(region_masks.len());
|
|
for (region_number, mask) in region_masks {
|
|
let region_id = RegionId::new(table_id, region_number);
|
|
let datanode = self
|
|
.partition_manager
|
|
.find_region_leader(region_id)
|
|
.await
|
|
.context(error::FindRegionLeaderSnafu)?;
|
|
mask_per_datanode
|
|
.entry(datanode)
|
|
.or_insert_with(Vec::new)
|
|
.push((region_id, mask));
|
|
}
|
|
|
|
let wait_all_datanode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
|
.with_label_values(&["wait_all_datanode"])
|
|
.start_timer();
|
|
|
|
let mut handles = Vec::with_capacity(mask_per_datanode.len());
|
|
|
|
// raw daya header and payload bytes.
|
|
let mut raw_data_bytes = None;
|
|
for (peer, masks) in mask_per_datanode {
|
|
for (region_id, mask) in masks {
|
|
let rb = record_batch.clone();
|
|
let schema_bytes = schema_bytes.clone();
|
|
let node_manager = self.node_manager.clone();
|
|
let peer = peer.clone();
|
|
let raw_header_and_data = if mask.select_all() {
|
|
Some(
|
|
raw_data_bytes
|
|
.get_or_insert_with(|| {
|
|
(data.data_header.clone(), data.data_body.clone())
|
|
})
|
|
.clone(),
|
|
)
|
|
} else {
|
|
None
|
|
};
|
|
let handle: common_runtime::JoinHandle<error::Result<api::region::RegionResponse>> =
|
|
common_runtime::spawn_global(async move {
|
|
let (header, payload) = if mask.select_all() {
|
|
// SAFETY: raw data must be present, we can avoid re-encoding.
|
|
raw_header_and_data.unwrap()
|
|
} else {
|
|
let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
|
.with_label_values(&["filter"])
|
|
.start_timer();
|
|
let batch = arrow::compute::filter_record_batch(&rb, mask.array())
|
|
.context(error::ComputeArrowSnafu)?;
|
|
filter_timer.observe_duration();
|
|
metrics::BULK_REQUEST_ROWS
|
|
.with_label_values(&["rows_per_region"])
|
|
.observe(batch.num_rows() as f64);
|
|
|
|
let encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
|
.with_label_values(&["encode"])
|
|
.start_timer();
|
|
let flight_data =
|
|
FlightEncoder::default().encode(FlightMessage::RecordBatch(batch));
|
|
encode_timer.observe_duration();
|
|
(flight_data.data_header, flight_data.data_body)
|
|
};
|
|
let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
|
|
.with_label_values(&["datanode_handle"])
|
|
.start_timer();
|
|
let request = RegionRequest {
|
|
header: Some(RegionRequestHeader {
|
|
tracing_context: TracingContext::from_current_span().to_w3c(),
|
|
..Default::default()
|
|
}),
|
|
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
|
|
region_id: region_id.as_u64(),
|
|
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
|
|
schema: schema_bytes,
|
|
data_header: header,
|
|
payload,
|
|
})),
|
|
})),
|
|
};
|
|
|
|
let datanode = node_manager.datanode(&peer).await;
|
|
datanode
|
|
.handle(request)
|
|
.await
|
|
.context(error::RequestRegionSnafu)
|
|
});
|
|
handles.push(handle);
|
|
}
|
|
}
|
|
|
|
let region_responses = futures::future::try_join_all(handles)
|
|
.await
|
|
.context(error::JoinTaskSnafu)?;
|
|
wait_all_datanode_timer.observe_duration();
|
|
let mut rows_inserted: usize = 0;
|
|
for res in region_responses {
|
|
rows_inserted += res?.affected_rows;
|
|
}
|
|
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
|
|
Ok(rows_inserted)
|
|
}
|
|
}
|