perf: optimize bulk encode decode (#6161)

* main:
 **Enhancements to Flight Data Handling and Error Management**

 - **Flight Data Handling:**
   - Added `bytes` dependency in `Cargo.lock` and `Cargo.toml`.
   - Introduced `try_from_schema_bytes` and `try_decode_record_batch` methods in `FlightDecoder` to handle schema and record batch decoding more efficiently in `src/common/grpc/src/flight.rs`.
   - Updated `Inserter` in `src/operator/src/bulk_insert.rs` to utilize schema bytes directly, improving bulk insert operations.

 - **Error Management:**
   - Added `ArrowError` handling in `src/common/grpc/src/error.rs` to manage errors related to Arrow operations.

 - **Region Request Processing:**
   - Modified `make_region_bulk_inserts` in `src/store-api/src/region_request.rs` to use the new `FlightDecoder` methods for decoding Arrow IPC data.

* - **Flight Data Handling:**
 - Added `bytes` dependency in `Cargo.lock` and `Cargo.toml`.
 - Introduced `try_from_schema_bytes` and `try_decode_record_batch` methods in `FlightDecoder` to handle schema and record batch decoding more efficiently in `src/common/grpc/src/flight.rs`.
 - Updated `Inserter` in `src/operator/src/bulk_insert.rs` to utilize schema bytes directly, improving bulk insert operations.
- **Error Management:**
 - Added `ArrowError` handling in `src/common/grpc/src/error.rs` to manage errors related to Arrow operations.
- **Region Request Processing:**
 - Modified `make_region_bulk_inserts` in `src/store-api/src/region_request.rs` to use the new `FlightDecoder` methods for decoding Arrow IPC data.

* perf/optimize-bulk-encode-decode:
 Update `greptime-proto` dependency and refactor error handling

 - **Dependency Update**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Error Handling Refactor**: Removed the `Prost` error variant from `MetadataError` in `src/store-api/src/metadata.rs`.
 - **Error Handling Improvement**: Replaced `unwrap` with `context(FlightCodecSnafu)` for error handling in `make_region_bulk_inserts` function in `src/store-api/src/region_request.rs`.

* fix: clippy

* fix: toml

* perf/optimize-bulk-encode-decode:
 ### Update `Cargo.toml` Dependencies

 - Updated the `bytes` dependency to use the workspace version in `Cargo.toml`.

* perf/optimize-bulk-encode-decode:
 **Fix payload assignment in `bulk_insert.rs`**

 - Corrected the assignment of the `payload` field in the `ArrowIpc` struct within the `Inserter` implementation in `bulk_insert.rs`.

* use main branch proto
This commit is contained in:
Lei, HUANG
2025-05-23 15:22:10 +08:00
committed by GitHub
parent cf2712e6f4
commit 27e339f628
10 changed files with 252 additions and 59 deletions

3
Cargo.lock generated
View File

@@ -2223,6 +2223,7 @@ version = "0.15.0"
dependencies = [
"api",
"arrow-flight",
"bytes",
"common-base",
"common-error",
"common-macro",
@@ -4856,7 +4857,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4d4136692fe7fbbd509ebc8c902f6afcc0ce61e4#4d4136692fe7fbbd509ebc8c902f6afcc0ce61e4"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=67ee5f94e5da72314cda7d0eb90106eb1c16a1ae#67ee5f94e5da72314cda7d0eb90106eb1c16a1ae"
dependencies = [
"prost 0.13.5",
"serde",

View File

@@ -132,7 +132,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4d4136692fe7fbbd509ebc8c902f6afcc0ce61e4" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "67ee5f94e5da72314cda7d0eb90106eb1c16a1ae" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -10,6 +10,7 @@ workspace = true
[dependencies]
api.workspace = true
arrow-flight.workspace = true
bytes.workspace = true
common-base.workspace = true
common-error.workspace = true
common-macro.workspace = true

View File

@@ -0,0 +1,143 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use arrow_flight::FlightData;
use bytes::Bytes;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_recordbatch::{DfRecordBatch, RecordBatch};
use criterion::{criterion_group, criterion_main, Criterion};
use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampMillisecondArray};
use datatypes::data_type::ConcreteDataType;
use datatypes::prelude::VectorRef;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::Helper;
use prost::Message;
fn schema() -> SchemaRef {
let schema = Schema::new(vec![
ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), false),
ColumnSchema::new("v1", ConcreteDataType::int64_datatype(), false),
]);
Arc::new(schema)
}
/// Generate record batch according to provided schema and num rows.
fn prepare_random_record_batch(schema: SchemaRef, num_rows: usize) -> RecordBatch {
let tag_candidates = (0..10000).map(|i| i.to_string()).collect::<Vec<_>>();
let columns: Vec<VectorRef> = schema
.column_schemas()
.iter()
.map(|col| match &col.data_type {
ConcreteDataType::String(_) => {
let array = StringArray::from(
(0..num_rows)
.map(|_| {
let idx: usize = rand::random_range(0..10000);
format!("tag-{}", tag_candidates[idx])
})
.collect::<Vec<_>>(),
);
Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap()
}
ConcreteDataType::Timestamp(_) => {
let now = common_time::util::current_time_millis();
let array = TimestampMillisecondArray::from(
(0..num_rows).map(|i| now + i as i64).collect::<Vec<_>>(),
);
Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap()
}
ConcreteDataType::Int64(_) => {
let array = Int64Array::from((0..num_rows).map(|i| i as i64).collect::<Vec<_>>());
Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap()
}
_ => unreachable!(),
})
.collect();
RecordBatch::new(schema, columns).unwrap()
}
fn prepare_flight_data(num_rows: usize) -> (FlightData, FlightData) {
let schema = schema();
let mut encoder = FlightEncoder::default();
let schema_data = encoder.encode(FlightMessage::Schema(schema.clone()));
let rb = prepare_random_record_batch(schema, num_rows);
let rb_data = encoder.encode(FlightMessage::Recordbatch(rb));
(schema_data, rb_data)
}
fn decode_flight_data_from_protobuf(schema: &Bytes, payload: &Bytes) -> DfRecordBatch {
let schema = FlightData::decode(&schema[..]).unwrap();
let payload = FlightData::decode(&payload[..]).unwrap();
let mut decoder = FlightDecoder::default();
let _schema = decoder.try_decode(&schema).unwrap();
let message = decoder.try_decode(&payload).unwrap();
let FlightMessage::Recordbatch(batch) = message else {
unreachable!("unexpected message");
};
batch.into_df_record_batch()
}
fn decode_flight_data_from_header_and_body(
schema: &Bytes,
data_header: &Bytes,
data_body: &Bytes,
) -> DfRecordBatch {
let mut decoder = FlightDecoder::try_from_schema_bytes(schema).unwrap();
decoder
.try_decode_record_batch(data_header, data_body)
.unwrap()
}
fn bench_decode_flight_data(c: &mut Criterion) {
let row_counts = [100000, 200000, 1000000];
for row_count in row_counts {
let (schema, payload) = prepare_flight_data(row_count);
// arguments for decode_flight_data_from_protobuf
let schema_bytes = Bytes::from(schema.encode_to_vec());
let payload_bytes = Bytes::from(payload.encode_to_vec());
let mut group = c.benchmark_group(format!("flight_decoder_{}_rows", row_count));
group.bench_function("decode_from_protobuf", |b| {
b.iter(|| decode_flight_data_from_protobuf(&schema_bytes, &payload_bytes));
});
group.bench_function("decode_from_header_and_body", |b| {
b.iter(|| {
decode_flight_data_from_header_and_body(
&schema.data_header,
&payload.data_header,
&payload.data_body,
)
});
});
group.finish();
}
}
criterion_group!(benches, bench_decode_flight_data);
criterion_main!(benches);

View File

@@ -14,8 +14,10 @@
use criterion::criterion_main;
mod bench_flight_decoder;
mod channel_manager;
criterion_main! {
channel_manager::benches
channel_manager::benches,
bench_flight_decoder::benches
}

View File

@@ -18,6 +18,7 @@ use std::io;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::arrow::error::ArrowError;
use snafu::{Location, Snafu};
pub type Result<T> = std::result::Result<T, Error>;
@@ -105,6 +106,14 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed arrow operation"))]
Arrow {
#[snafu(implicit)]
location: Location,
#[snafu(source)]
error: ArrowError,
},
}
impl ErrorExt for Error {
@@ -123,6 +132,7 @@ impl ErrorExt for Error {
Error::CreateRecordBatch { source, .. } => source.status_code(),
Error::ConvertArrowSchema { source, .. } => source.status_code(),
Error::Arrow { .. } => StatusCode::Internal,
}
}

View File

@@ -21,16 +21,19 @@ use api::v1::{AffectedRows, FlightMetadata, Metrics};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, SchemaAsIpc};
use common_base::bytes::Bytes;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_recordbatch::{DfRecordBatch, RecordBatch, RecordBatches};
use datatypes::arrow;
use datatypes::arrow::buffer::Buffer;
use datatypes::arrow::datatypes::Schema as ArrowSchema;
use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader};
use datatypes::arrow::error::ArrowError;
use datatypes::arrow::ipc::{convert, reader, root_as_message, writer, MessageHeader};
use datatypes::schema::{Schema, SchemaRef};
use flatbuffers::FlatBufferBuilder;
use prost::bytes::Bytes as ProstBytes;
use prost::Message;
use snafu::{OptionExt, ResultExt};
use crate::error;
use crate::error::{
ConvertArrowSchemaSnafu, CreateRecordBatchSnafu, DecodeFlightDataSnafu, InvalidFlightDataSnafu,
Result,
@@ -124,9 +127,60 @@ impl FlightEncoder {
#[derive(Default)]
pub struct FlightDecoder {
schema: Option<SchemaRef>,
schema_bytes: Option<bytes::Bytes>,
}
impl FlightDecoder {
/// Build a [FlightDecoder] instance from provided schema bytes.
pub fn try_from_schema_bytes(schema_bytes: &bytes::Bytes) -> Result<Self> {
let arrow_schema = convert::try_schema_from_flatbuffer_bytes(&schema_bytes[..])
.context(error::ArrowSnafu)?;
let schema = Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?);
Ok(Self {
schema: Some(schema),
schema_bytes: Some(schema_bytes.clone()),
})
}
pub fn try_decode_record_batch(
&mut self,
data_header: &bytes::Bytes,
data_body: &bytes::Bytes,
) -> Result<DfRecordBatch> {
let schema = self
.schema
.as_ref()
.context(InvalidFlightDataSnafu {
reason: "Should have decoded schema first!",
})?
.clone();
let arrow_schema = schema.arrow_schema().clone();
let message = root_as_message(&data_header[..])
.map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})
.context(error::ArrowSnafu)?;
let result = message
.header_as_record_batch()
.ok_or_else(|| {
ArrowError::ParseError(
"Unable to convert flight data header to a record batch".to_string(),
)
})
.and_then(|batch| {
reader::read_record_batch(
&Buffer::from(data_body.as_ref()),
batch,
arrow_schema,
&HashMap::new(),
None,
&message.version(),
)
})
.context(error::ArrowSnafu)?;
Ok(result)
}
pub fn try_decode(&mut self, flight_data: &FlightData) -> Result<FlightMessage> {
let message = root_as_message(&flight_data.data_header).map_err(|e| {
InvalidFlightDataSnafu {
@@ -162,7 +216,7 @@ impl FlightDecoder {
Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?);
self.schema = Some(schema.clone());
self.schema_bytes = Some(flight_data.data_header.clone());
Ok(FlightMessage::Schema(schema))
}
MessageHeader::RecordBatch => {
@@ -196,6 +250,10 @@ impl FlightDecoder {
pub fn schema(&self) -> Option<&SchemaRef> {
self.schema.as_ref()
}
pub fn schema_bytes(&self) -> Option<bytes::Bytes> {
self.schema_bytes.clone()
}
}
pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<RecordBatches> {

View File

@@ -19,14 +19,12 @@ use api::v1::region::{
bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest,
RegionRequestHeader,
};
use bytes::Bytes;
use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData;
use common_recordbatch::RecordBatch;
use common_telemetry::tracing_context::TracingContext;
use datatypes::schema::Schema;
use prost::Message;
use snafu::ResultExt;
use store_api::storage::RegionId;
use table::metadata::TableId;
@@ -60,13 +58,8 @@ impl Inserter {
.with_label_values(&["raw"])
.observe(record_batch.num_rows() as f64);
// todo(hl): find a way to embed raw FlightData messages in greptimedb proto files so we don't have to encode here.
// safety: when reach here schema must be present.
let schema_message = FlightEncoder::default()
.encode(FlightMessage::Schema(decoder.schema().unwrap().clone()));
let schema_bytes = Bytes::from(schema_message.encode_to_vec());
let schema_bytes = decoder.schema_bytes().unwrap();
let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["partition"])
.start_timer();
@@ -96,12 +89,6 @@ impl Inserter {
.find_region_leader(region_id)
.await
.context(error::FindRegionLeaderSnafu)?;
let payload = {
let _encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["encode"])
.start_timer();
Bytes::from(data.encode_to_vec())
};
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
@@ -111,7 +98,8 @@ impl Inserter {
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
region_id: region_id.as_u64(),
schema: schema_bytes,
payload,
data_header: data.data_header,
payload: data.data_body,
})),
})),
};
@@ -149,6 +137,7 @@ impl Inserter {
let record_batch_schema =
Arc::new(Schema::try_from(record_batch.schema()).context(error::ConvertSchemaSnafu)?);
// raw daya header and payload bytes.
let mut raw_data_bytes = None;
for (peer, masks) in mask_per_datanode {
for (region_id, mask) in masks {
@@ -157,10 +146,12 @@ impl Inserter {
let record_batch_schema = record_batch_schema.clone();
let node_manager = self.node_manager.clone();
let peer = peer.clone();
let raw_data = if mask.select_all() {
let raw_header_and_data = if mask.select_all() {
Some(
raw_data_bytes
.get_or_insert_with(|| Bytes::from(data.encode_to_vec()))
.get_or_insert_with(|| {
(data.data_header.clone(), data.data_body.clone())
})
.clone(),
)
} else {
@@ -168,9 +159,9 @@ impl Inserter {
};
let handle: common_runtime::JoinHandle<error::Result<api::region::RegionResponse>> =
common_runtime::spawn_global(async move {
let payload = if mask.select_all() {
let (header, payload) = if mask.select_all() {
// SAFETY: raw data must be present, we can avoid re-encoding.
raw_data.unwrap()
raw_header_and_data.unwrap()
} else {
let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["filter"])
@@ -188,13 +179,10 @@ impl Inserter {
let batch =
RecordBatch::try_from_df_record_batch(record_batch_schema, rb)
.context(error::BuildRecordBatchSnafu)?;
let payload = Bytes::from(
FlightEncoder::default()
.encode(FlightMessage::Recordbatch(batch))
.encode_to_vec(),
);
let flight_data =
FlightEncoder::default().encode(FlightMessage::Recordbatch(batch));
encode_timer.observe_duration();
payload
(flight_data.data_header, flight_data.data_body)
};
let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["datanode_handle"])
@@ -208,6 +196,7 @@ impl Inserter {
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
region_id: region_id.as_u64(),
schema: schema_bytes,
data_header: header,
payload,
})),
})),
@@ -231,6 +220,7 @@ impl Inserter {
for res in region_responses {
rows_inserted += res?.affected_rows;
}
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64);
Ok(rows_inserted)
}
}

View File

@@ -1027,14 +1027,6 @@ pub enum MetadataError {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode prost message"))]
Prost {
#[snafu(source)]
error: prost::DecodeError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for MetadataError {

View File

@@ -22,22 +22,20 @@ use api::v1::column_def::{
};
use api::v1::region::bulk_insert_request::Body;
use api::v1::region::{
alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
alter_request, compact_request, region_request, AlterRequest, AlterRequests, ArrowIpc,
BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests,
DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{
self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
};
pub use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_grpc::FlightData;
use common_grpc::flight::FlightDecoder;
use common_recordbatch::DfRecordBatch;
use common_time::TimeToLive;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use strum::{AsRefStr, IntoStaticStr};
@@ -46,8 +44,7 @@ use crate::logstore::entry;
use crate::metadata::{
ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu,
InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result,
UnexpectedSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
};
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::metrics;
@@ -332,22 +329,21 @@ fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId,
return Ok(vec![]);
};
let ArrowIpc {
region_id,
schema,
payload,
data_header,
} = request;
let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
.with_label_values(&["decode"])
.start_timer();
let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?;
let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?;
let mut decoder = FlightDecoder::default();
let _ = decoder.try_decode(&schema_data).context(FlightCodecSnafu)?;
let FlightMessage::Recordbatch(rb) = decoder
.try_decode(&payload_data)
.context(FlightCodecSnafu)?
else {
unreachable!("Always expect record batch message after schema");
};
let mut decoder = FlightDecoder::try_from_schema_bytes(&schema).context(FlightCodecSnafu)?;
let payload = decoder
.try_decode_record_batch(&data_header, &payload)
.context(FlightCodecSnafu)?;
decoder_timer.observe_duration();
let payload = rb.into_df_record_batch();
let region_id: RegionId = request.region_id.into();
let region_id: RegionId = region_id.into();
Ok(vec![(
region_id,
RegionRequest::BulkInserts(RegionBulkInsertsRequest { region_id, payload }),