refactor: replace FlightMessage with arrow RecordBatch and Schema (#6175)

* refactor/flight-codec:
 ### Refactor and Enhance Schema and RecordBatch Handling

 - **Add `datatypes` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include the `datatypes` dependency.
 - **Schema Conversion and Error Handling**:
   - Updated `src/client/src/database.rs` and `src/client/src/region.rs` to handle schema conversion using `Arc` and added error handling for schema conversion.
   - Enhanced error handling in `src/client/src/error.rs` and `src/common/grpc/src/error.rs` by adding `ConvertSchema` error and removing unused errors.
 - **FlightMessage and RecordBatch Refactoring**:
   - Refactored `FlightMessage` enum in `src/common/grpc/src/flight.rs` to use `RecordBatch` instead of `Recordbatch`.
   - Updated related functions and tests in `src/common/grpc/benches/bench_flight_decoder.rs`, `src/operator/src/bulk_insert.rs`, `src/servers/src/grpc/flight/stream.rs`, and `tests-integration/src/grpc/flight.rs` to align with the new `FlightMessage` structure.

* refactor/flight-codec:
 Remove `ConvertArrowSchema` Error Variant

 - Removed the `ConvertArrowSchema` error variant from `error.rs`.
 - Updated the `ErrorExt` implementation to exclude `ConvertArrowSchema`.
 - Affected file: `src/common/query/src/error.rs`.

* fix: cr
This commit is contained in:
Lei, HUANG
2025-05-26 18:06:50 +08:00
committed by GitHub
parent 77e2fee755
commit 493440a802
12 changed files with 124 additions and 144 deletions

View File

@@ -136,7 +136,7 @@ mod test {
async fn test_put_record_batches(client: &Database, record_batches: Vec<RecordBatch>) {
let requests_count = record_batches.len();
let schema = record_batches[0].schema.clone();
let schema = record_batches[0].schema.arrow_schema().clone();
let stream = futures::stream::once(async move {
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
@@ -155,7 +155,7 @@ mod test {
.enumerate()
.map(|(i, x)| {
let mut encoder = FlightEncoder::default();
let message = FlightMessage::Recordbatch(x);
let message = FlightMessage::RecordBatch(x.into_df_record_batch());
let mut data = encoder.encode(message);
let metadata = DoPutMetadata::new((i + 1) as i64);
data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();