From 27e339f628bc598716ff81a48e79a4b266a58839 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 23 May 2025 15:22:10 +0800 Subject: [PATCH] perf: optimize bulk encode decode (#6161) * main: **Enhancements to Flight Data Handling and Error Management** - **Flight Data Handling:** - Added `bytes` dependency in `Cargo.lock` and `Cargo.toml`. - Introduced `try_from_schema_bytes` and `try_decode_record_batch` methods in `FlightDecoder` to handle schema and record batch decoding more efficiently in `src/common/grpc/src/flight.rs`. - Updated `Inserter` in `src/operator/src/bulk_insert.rs` to utilize schema bytes directly, improving bulk insert operations. - **Error Management:** - Added `ArrowError` handling in `src/common/grpc/src/error.rs` to manage errors related to Arrow operations. - **Region Request Processing:** - Modified `make_region_bulk_inserts` in `src/store-api/src/region_request.rs` to use the new `FlightDecoder` methods for decoding Arrow IPC data. * - **Flight Data Handling:** - Added `bytes` dependency in `Cargo.lock` and `Cargo.toml`. - Introduced `try_from_schema_bytes` and `try_decode_record_batch` methods in `FlightDecoder` to handle schema and record batch decoding more efficiently in `src/common/grpc/src/flight.rs`. - Updated `Inserter` in `src/operator/src/bulk_insert.rs` to utilize schema bytes directly, improving bulk insert operations. - **Error Management:** - Added `ArrowError` handling in `src/common/grpc/src/error.rs` to manage errors related to Arrow operations. - **Region Request Processing:** - Modified `make_region_bulk_inserts` in `src/store-api/src/region_request.rs` to use the new `FlightDecoder` methods for decoding Arrow IPC data. * perf/optimize-bulk-encode-decode: Update `greptime-proto` dependency and refactor error handling - **Dependency Update**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`. - **Error Handling Refactor**: Removed the `Prost` error variant from `MetadataError` in `src/store-api/src/metadata.rs`. - **Error Handling Improvement**: Replaced `unwrap` with `context(FlightCodecSnafu)` for error handling in `make_region_bulk_inserts` function in `src/store-api/src/region_request.rs`. * fix: clippy * fix: toml * perf/optimize-bulk-encode-decode: ### Update `Cargo.toml` Dependencies - Updated the `bytes` dependency to use the workspace version in `Cargo.toml`. * perf/optimize-bulk-encode-decode: **Fix payload assignment in `bulk_insert.rs`** - Corrected the assignment of the `payload` field in the `ArrowIpc` struct within the `Inserter` implementation in `bulk_insert.rs`. * use main branch proto --- Cargo.lock | 3 +- Cargo.toml | 2 +- src/common/grpc/Cargo.toml | 1 + .../grpc/benches/bench_flight_decoder.rs | 143 ++++++++++++++++++ src/common/grpc/benches/bench_main.rs | 4 +- src/common/grpc/src/error.rs | 10 ++ src/common/grpc/src/flight.rs | 64 +++++++- src/operator/src/bulk_insert.rs | 40 ++--- src/store-api/src/metadata.rs | 8 - src/store-api/src/region_request.rs | 36 ++--- 10 files changed, 252 insertions(+), 59 deletions(-) create mode 100644 src/common/grpc/benches/bench_flight_decoder.rs diff --git a/Cargo.lock b/Cargo.lock index b38eaa9873..3b437dc7a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2223,6 +2223,7 @@ version = "0.15.0" dependencies = [ "api", "arrow-flight", + "bytes", "common-base", "common-error", "common-macro", @@ -4856,7 +4857,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4d4136692fe7fbbd509ebc8c902f6afcc0ce61e4#4d4136692fe7fbbd509ebc8c902f6afcc0ce61e4" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=67ee5f94e5da72314cda7d0eb90106eb1c16a1ae#67ee5f94e5da72314cda7d0eb90106eb1c16a1ae" dependencies = [ "prost 0.13.5", "serde", diff --git a/Cargo.toml b/Cargo.toml index 8f4bcf82f1..36b964cf13 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -132,7 +132,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4d4136692fe7fbbd509ebc8c902f6afcc0ce61e4" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "67ee5f94e5da72314cda7d0eb90106eb1c16a1ae" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/common/grpc/Cargo.toml b/src/common/grpc/Cargo.toml index f15d0761d1..b56167ee15 100644 --- a/src/common/grpc/Cargo.toml +++ b/src/common/grpc/Cargo.toml @@ -10,6 +10,7 @@ workspace = true [dependencies] api.workspace = true arrow-flight.workspace = true +bytes.workspace = true common-base.workspace = true common-error.workspace = true common-macro.workspace = true diff --git a/src/common/grpc/benches/bench_flight_decoder.rs b/src/common/grpc/benches/bench_flight_decoder.rs new file mode 100644 index 0000000000..ee014dff60 --- /dev/null +++ b/src/common/grpc/benches/bench_flight_decoder.rs @@ -0,0 +1,143 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow_flight::FlightData; +use bytes::Bytes; +use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; +use common_recordbatch::{DfRecordBatch, RecordBatch}; +use criterion::{criterion_group, criterion_main, Criterion}; +use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampMillisecondArray}; +use datatypes::data_type::ConcreteDataType; +use datatypes::prelude::VectorRef; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use datatypes::vectors::Helper; +use prost::Message; + +fn schema() -> SchemaRef { + let schema = Schema::new(vec![ + ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false), + ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false), + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), false), + ColumnSchema::new("v1", ConcreteDataType::int64_datatype(), false), + ]); + Arc::new(schema) +} + +/// Generate record batch according to provided schema and num rows. +fn prepare_random_record_batch(schema: SchemaRef, num_rows: usize) -> RecordBatch { + let tag_candidates = (0..10000).map(|i| i.to_string()).collect::>(); + + let columns: Vec = schema + .column_schemas() + .iter() + .map(|col| match &col.data_type { + ConcreteDataType::String(_) => { + let array = StringArray::from( + (0..num_rows) + .map(|_| { + let idx: usize = rand::random_range(0..10000); + format!("tag-{}", tag_candidates[idx]) + }) + .collect::>(), + ); + Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap() + } + ConcreteDataType::Timestamp(_) => { + let now = common_time::util::current_time_millis(); + let array = TimestampMillisecondArray::from( + (0..num_rows).map(|i| now + i as i64).collect::>(), + ); + Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap() + } + ConcreteDataType::Int64(_) => { + let array = Int64Array::from((0..num_rows).map(|i| i as i64).collect::>()); + Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap() + } + _ => unreachable!(), + }) + .collect(); + + RecordBatch::new(schema, columns).unwrap() +} + +fn prepare_flight_data(num_rows: usize) -> (FlightData, FlightData) { + let schema = schema(); + let mut encoder = FlightEncoder::default(); + let schema_data = encoder.encode(FlightMessage::Schema(schema.clone())); + let rb = prepare_random_record_batch(schema, num_rows); + let rb_data = encoder.encode(FlightMessage::Recordbatch(rb)); + (schema_data, rb_data) +} + +fn decode_flight_data_from_protobuf(schema: &Bytes, payload: &Bytes) -> DfRecordBatch { + let schema = FlightData::decode(&schema[..]).unwrap(); + let payload = FlightData::decode(&payload[..]).unwrap(); + let mut decoder = FlightDecoder::default(); + let _schema = decoder.try_decode(&schema).unwrap(); + let message = decoder.try_decode(&payload).unwrap(); + let FlightMessage::Recordbatch(batch) = message else { + unreachable!("unexpected message"); + }; + batch.into_df_record_batch() +} + +fn decode_flight_data_from_header_and_body( + schema: &Bytes, + data_header: &Bytes, + data_body: &Bytes, +) -> DfRecordBatch { + let mut decoder = FlightDecoder::try_from_schema_bytes(schema).unwrap(); + decoder + .try_decode_record_batch(data_header, data_body) + .unwrap() +} + +fn bench_decode_flight_data(c: &mut Criterion) { + let row_counts = [100000, 200000, 1000000]; + + for row_count in row_counts { + let (schema, payload) = prepare_flight_data(row_count); + + // arguments for decode_flight_data_from_protobuf + let schema_bytes = Bytes::from(schema.encode_to_vec()); + let payload_bytes = Bytes::from(payload.encode_to_vec()); + + let mut group = c.benchmark_group(format!("flight_decoder_{}_rows", row_count)); + group.bench_function("decode_from_protobuf", |b| { + b.iter(|| decode_flight_data_from_protobuf(&schema_bytes, &payload_bytes)); + }); + + group.bench_function("decode_from_header_and_body", |b| { + b.iter(|| { + decode_flight_data_from_header_and_body( + &schema.data_header, + &payload.data_header, + &payload.data_body, + ) + }); + }); + + group.finish(); + } +} + +criterion_group!(benches, bench_decode_flight_data); +criterion_main!(benches); diff --git a/src/common/grpc/benches/bench_main.rs b/src/common/grpc/benches/bench_main.rs index b67e1817a5..ce1f274ace 100644 --- a/src/common/grpc/benches/bench_main.rs +++ b/src/common/grpc/benches/bench_main.rs @@ -14,8 +14,10 @@ use criterion::criterion_main; +mod bench_flight_decoder; mod channel_manager; criterion_main! { - channel_manager::benches + channel_manager::benches, + bench_flight_decoder::benches } diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index af194f2501..7c134122eb 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -18,6 +18,7 @@ use std::io; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use datatypes::arrow::error::ArrowError; use snafu::{Location, Snafu}; pub type Result = std::result::Result; @@ -105,6 +106,14 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed arrow operation"))] + Arrow { + #[snafu(implicit)] + location: Location, + #[snafu(source)] + error: ArrowError, + }, } impl ErrorExt for Error { @@ -123,6 +132,7 @@ impl ErrorExt for Error { Error::CreateRecordBatch { source, .. } => source.status_code(), Error::ConvertArrowSchema { source, .. } => source.status_code(), + Error::Arrow { .. } => StatusCode::Internal, } } diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 63f4d05289..7248a71a8a 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -21,16 +21,19 @@ use api::v1::{AffectedRows, FlightMetadata, Metrics}; use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_flight::{FlightData, SchemaAsIpc}; use common_base::bytes::Bytes; -use common_recordbatch::{RecordBatch, RecordBatches}; +use common_recordbatch::{DfRecordBatch, RecordBatch, RecordBatches}; use datatypes::arrow; +use datatypes::arrow::buffer::Buffer; use datatypes::arrow::datatypes::Schema as ArrowSchema; -use datatypes::arrow::ipc::{root_as_message, writer, MessageHeader}; +use datatypes::arrow::error::ArrowError; +use datatypes::arrow::ipc::{convert, reader, root_as_message, writer, MessageHeader}; use datatypes::schema::{Schema, SchemaRef}; use flatbuffers::FlatBufferBuilder; use prost::bytes::Bytes as ProstBytes; use prost::Message; use snafu::{OptionExt, ResultExt}; +use crate::error; use crate::error::{ ConvertArrowSchemaSnafu, CreateRecordBatchSnafu, DecodeFlightDataSnafu, InvalidFlightDataSnafu, Result, @@ -124,9 +127,60 @@ impl FlightEncoder { #[derive(Default)] pub struct FlightDecoder { schema: Option, + schema_bytes: Option, } impl FlightDecoder { + /// Build a [FlightDecoder] instance from provided schema bytes. + pub fn try_from_schema_bytes(schema_bytes: &bytes::Bytes) -> Result { + let arrow_schema = convert::try_schema_from_flatbuffer_bytes(&schema_bytes[..]) + .context(error::ArrowSnafu)?; + let schema = Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?); + Ok(Self { + schema: Some(schema), + schema_bytes: Some(schema_bytes.clone()), + }) + } + + pub fn try_decode_record_batch( + &mut self, + data_header: &bytes::Bytes, + data_body: &bytes::Bytes, + ) -> Result { + let schema = self + .schema + .as_ref() + .context(InvalidFlightDataSnafu { + reason: "Should have decoded schema first!", + })? + .clone(); + let arrow_schema = schema.arrow_schema().clone(); + let message = root_as_message(&data_header[..]) + .map_err(|err| { + ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) + }) + .context(error::ArrowSnafu)?; + let result = message + .header_as_record_batch() + .ok_or_else(|| { + ArrowError::ParseError( + "Unable to convert flight data header to a record batch".to_string(), + ) + }) + .and_then(|batch| { + reader::read_record_batch( + &Buffer::from(data_body.as_ref()), + batch, + arrow_schema, + &HashMap::new(), + None, + &message.version(), + ) + }) + .context(error::ArrowSnafu)?; + Ok(result) + } + pub fn try_decode(&mut self, flight_data: &FlightData) -> Result { let message = root_as_message(&flight_data.data_header).map_err(|e| { InvalidFlightDataSnafu { @@ -162,7 +216,7 @@ impl FlightDecoder { Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?); self.schema = Some(schema.clone()); - + self.schema_bytes = Some(flight_data.data_header.clone()); Ok(FlightMessage::Schema(schema)) } MessageHeader::RecordBatch => { @@ -196,6 +250,10 @@ impl FlightDecoder { pub fn schema(&self) -> Option<&SchemaRef> { self.schema.as_ref() } + + pub fn schema_bytes(&self) -> Option { + self.schema_bytes.clone() + } } pub fn flight_messages_to_recordbatches(messages: Vec) -> Result { diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index 7a51bd4904..e0c8f28cef 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -19,14 +19,12 @@ use api::v1::region::{ bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest, RegionRequestHeader, }; -use bytes::Bytes; use common_base::AffectedRows; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::FlightData; use common_recordbatch::RecordBatch; use common_telemetry::tracing_context::TracingContext; use datatypes::schema::Schema; -use prost::Message; use snafu::ResultExt; use store_api::storage::RegionId; use table::metadata::TableId; @@ -60,13 +58,8 @@ impl Inserter { .with_label_values(&["raw"]) .observe(record_batch.num_rows() as f64); - // todo(hl): find a way to embed raw FlightData messages in greptimedb proto files so we don't have to encode here. - // safety: when reach here schema must be present. - let schema_message = FlightEncoder::default() - .encode(FlightMessage::Schema(decoder.schema().unwrap().clone())); - let schema_bytes = Bytes::from(schema_message.encode_to_vec()); - + let schema_bytes = decoder.schema_bytes().unwrap(); let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["partition"]) .start_timer(); @@ -96,12 +89,6 @@ impl Inserter { .find_region_leader(region_id) .await .context(error::FindRegionLeaderSnafu)?; - let payload = { - let _encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED - .with_label_values(&["encode"]) - .start_timer(); - Bytes::from(data.encode_to_vec()) - }; let request = RegionRequest { header: Some(RegionRequestHeader { tracing_context: TracingContext::from_current_span().to_w3c(), @@ -111,7 +98,8 @@ impl Inserter { body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { region_id: region_id.as_u64(), schema: schema_bytes, - payload, + data_header: data.data_header, + payload: data.data_body, })), })), }; @@ -149,6 +137,7 @@ impl Inserter { let record_batch_schema = Arc::new(Schema::try_from(record_batch.schema()).context(error::ConvertSchemaSnafu)?); + // raw daya header and payload bytes. let mut raw_data_bytes = None; for (peer, masks) in mask_per_datanode { for (region_id, mask) in masks { @@ -157,10 +146,12 @@ impl Inserter { let record_batch_schema = record_batch_schema.clone(); let node_manager = self.node_manager.clone(); let peer = peer.clone(); - let raw_data = if mask.select_all() { + let raw_header_and_data = if mask.select_all() { Some( raw_data_bytes - .get_or_insert_with(|| Bytes::from(data.encode_to_vec())) + .get_or_insert_with(|| { + (data.data_header.clone(), data.data_body.clone()) + }) .clone(), ) } else { @@ -168,9 +159,9 @@ impl Inserter { }; let handle: common_runtime::JoinHandle> = common_runtime::spawn_global(async move { - let payload = if mask.select_all() { + let (header, payload) = if mask.select_all() { // SAFETY: raw data must be present, we can avoid re-encoding. - raw_data.unwrap() + raw_header_and_data.unwrap() } else { let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["filter"]) @@ -188,13 +179,10 @@ impl Inserter { let batch = RecordBatch::try_from_df_record_batch(record_batch_schema, rb) .context(error::BuildRecordBatchSnafu)?; - let payload = Bytes::from( - FlightEncoder::default() - .encode(FlightMessage::Recordbatch(batch)) - .encode_to_vec(), - ); + let flight_data = + FlightEncoder::default().encode(FlightMessage::Recordbatch(batch)); encode_timer.observe_duration(); - payload + (flight_data.data_header, flight_data.data_body) }; let _datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["datanode_handle"]) @@ -208,6 +196,7 @@ impl Inserter { body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { region_id: region_id.as_u64(), schema: schema_bytes, + data_header: header, payload, })), })), @@ -231,6 +220,7 @@ impl Inserter { for res in region_responses { rows_inserted += res?.affected_rows; } + crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(rows_inserted as u64); Ok(rows_inserted) } } diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index c3b1025876..453e0a1383 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -1027,14 +1027,6 @@ pub enum MetadataError { #[snafu(implicit)] location: Location, }, - - #[snafu(display("Failed to decode prost message"))] - Prost { - #[snafu(source)] - error: prost::DecodeError, - #[snafu(implicit)] - location: Location, - }, } impl ErrorExt for MetadataError { diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 6d7f2af57e..bdfbff362e 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -22,22 +22,20 @@ use api::v1::column_def::{ }; use api::v1::region::bulk_insert_request::Body; use api::v1::region::{ - alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest, - CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, - DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest, + alter_request, compact_request, region_request, AlterRequest, AlterRequests, ArrowIpc, + BulkInsertRequest, CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, + DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest, }; use api::v1::{ self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows, SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint, }; pub use common_base::AffectedRows; -use common_grpc::flight::{FlightDecoder, FlightMessage}; -use common_grpc::FlightData; +use common_grpc::flight::FlightDecoder; use common_recordbatch::DfRecordBatch; use common_time::TimeToLive; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{FulltextOptions, SkippingIndexOptions}; -use prost::Message; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use strum::{AsRefStr, IntoStaticStr}; @@ -46,8 +44,7 @@ use crate::logstore::entry; use crate::metadata::{ ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu, - InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result, - UnexpectedSnafu, + InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu, }; use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use crate::metrics; @@ -332,22 +329,21 @@ fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result