From 493440a8021a1507720221e7a22995d6caa6f498 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Mon, 26 May 2025 18:06:50 +0800 Subject: [PATCH] refactor: replace FlightMessage with arrow `RecordBatch` and `Schema` (#6175) * refactor/flight-codec: ### Refactor and Enhance Schema and RecordBatch Handling - **Add `datatypes` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include the `datatypes` dependency. - **Schema Conversion and Error Handling**: - Updated `src/client/src/database.rs` and `src/client/src/region.rs` to handle schema conversion using `Arc` and added error handling for schema conversion. - Enhanced error handling in `src/client/src/error.rs` and `src/common/grpc/src/error.rs` by adding `ConvertSchema` error and removing unused errors. - **FlightMessage and RecordBatch Refactoring**: - Refactored `FlightMessage` enum in `src/common/grpc/src/flight.rs` to use `RecordBatch` instead of `Recordbatch`. - Updated related functions and tests in `src/common/grpc/benches/bench_flight_decoder.rs`, `src/operator/src/bulk_insert.rs`, `src/servers/src/grpc/flight/stream.rs`, and `tests-integration/src/grpc/flight.rs` to align with the new `FlightMessage` structure. * refactor/flight-codec: Remove `ConvertArrowSchema` Error Variant - Removed the `ConvertArrowSchema` error variant from `error.rs`. - Updated the `ErrorExt` implementation to exclude `ConvertArrowSchema`. - Affected file: `src/common/query/src/error.rs`. * fix: cr --- Cargo.lock | 1 + src/client/Cargo.toml | 1 + src/client/src/database.rs | 19 ++- src/client/src/error.rs | 8 ++ src/client/src/region.rs | 13 +- .../grpc/benches/bench_flight_decoder.rs | 43 ++++--- src/common/grpc/src/error.rs | 16 --- src/common/grpc/src/flight.rs | 119 +++++++----------- src/common/query/src/error.rs | 8 -- src/operator/src/bulk_insert.rs | 21 +--- src/servers/src/grpc/flight/stream.rs | 15 ++- tests-integration/src/grpc/flight.rs | 4 +- 12 files changed, 124 insertions(+), 144 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3b437dc7a4..3c8afffc28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1889,6 +1889,7 @@ dependencies = [ "common-query", "common-recordbatch", "common-telemetry", + "datatypes", "enum_dispatch", "futures", "futures-util", diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index 99d0c97806..27327bbe56 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -25,6 +25,7 @@ common-meta.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-telemetry.workspace = true +datatypes.workspace = true enum_dispatch = "0.3" futures.workspace = true futures-util.workspace = true diff --git a/src/client/src/database.rs b/src/client/src/database.rs index f786186388..bd54f89809 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -14,6 +14,7 @@ use std::pin::Pin; use std::str::FromStr; +use std::sync::Arc; use api::v1::auth_header::AuthScheme; use api::v1::ddl_request::Expr as DdlExpr; @@ -35,7 +36,7 @@ use common_grpc::flight::do_put::DoPutResponse; use common_grpc::flight::{FlightDecoder, FlightMessage}; use common_query::Output; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::RecordBatchStreamWrapper; +use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper}; use common_telemetry::tracing_context::W3cTrace; use common_telemetry::{error, warn}; use futures::future; @@ -49,7 +50,7 @@ use crate::error::{ ConvertFlightDataSnafu, Error, FlightGetSnafu, IllegalFlightMessagesSnafu, InvalidTonicMetadataValueSnafu, ServerSnafu, }; -use crate::{from_grpc_response, Client, Result}; +use crate::{error, from_grpc_response, Client, Result}; type FlightDataStream = Pin + Send>>; @@ -337,20 +338,30 @@ impl Database { ); Ok(Output::new_with_affected_rows(rows)) } - FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => { + FlightMessage::RecordBatch(_) | FlightMessage::Metrics(_) => { IllegalFlightMessagesSnafu { reason: "The first flight message cannot be a RecordBatch or Metrics message", } .fail() } FlightMessage::Schema(schema) => { + let schema = Arc::new( + datatypes::schema::Schema::try_from(schema) + .context(error::ConvertSchemaSnafu)?, + ); + let schema_cloned = schema.clone(); let stream = Box::pin(stream!({ while let Some(flight_message) = flight_message_stream.next().await { let flight_message = flight_message .map_err(BoxedError::new) .context(ExternalSnafu)?; match flight_message { - FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch), + FlightMessage::RecordBatch(arrow_batch) => { + yield RecordBatch::try_from_df_record_batch( + schema_cloned.clone(), + arrow_batch, + ) + } FlightMessage::Metrics(_) => {} FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => { yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)} diff --git a/src/client/src/error.rs b/src/client/src/error.rs index 858318c736..fa2ce1ea41 100644 --- a/src/client/src/error.rs +++ b/src/client/src/error.rs @@ -117,6 +117,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to convert Schema"))] + ConvertSchema { + #[snafu(implicit)] + location: Location, + source: datatypes::error::Error, + }, } pub type Result = std::result::Result; @@ -137,6 +144,7 @@ impl ErrorExt for Error { | Error::CreateTlsChannel { source, .. } => source.status_code(), Error::IllegalGrpcClientState { .. } => StatusCode::Unexpected, Error::InvalidTonicMetadataValue { .. } => StatusCode::InvalidArguments, + Error::ConvertSchema { source, .. } => source.status_code(), } } diff --git a/src/client/src/region.rs b/src/client/src/region.rs index c31d3a1e17..1c50c08625 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -28,7 +28,7 @@ use common_meta::error::{self as meta_error, Result as MetaResult}; use common_meta::node_manager::Datanode; use common_query::request::QueryRequest; use common_recordbatch::error::ExternalSnafu; -use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream}; +use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream}; use common_telemetry::error; use common_telemetry::tracing_context::TracingContext; use prost::Message; @@ -146,6 +146,10 @@ impl RegionRequester { let tracing_context = TracingContext::from_current_span(); + let schema = Arc::new( + datatypes::schema::Schema::try_from(schema).context(error::ConvertSchemaSnafu)?, + ); + let schema_cloned = schema.clone(); let stream = Box::pin(stream!({ let _span = tracing_context.attach(common_telemetry::tracing::info_span!( "poll_flight_data_stream" @@ -156,7 +160,12 @@ impl RegionRequester { .context(ExternalSnafu)?; match flight_message { - FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch), + FlightMessage::RecordBatch(record_batch) => { + yield RecordBatch::try_from_df_record_batch( + schema_cloned.clone(), + record_batch, + ) + } FlightMessage::Metrics(s) => { let m = serde_json::from_str(&s).ok().map(Arc::new); metrics_ref.swap(m); diff --git a/src/common/grpc/benches/bench_flight_decoder.rs b/src/common/grpc/benches/bench_flight_decoder.rs index ee014dff60..9e4de20982 100644 --- a/src/common/grpc/benches/bench_flight_decoder.rs +++ b/src/common/grpc/benches/bench_flight_decoder.rs @@ -17,16 +17,16 @@ use std::sync::Arc; use arrow_flight::FlightData; use bytes::Bytes; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; -use common_recordbatch::{DfRecordBatch, RecordBatch}; +use common_recordbatch::DfRecordBatch; use criterion::{criterion_group, criterion_main, Criterion}; +use datatypes::arrow; use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, TimestampMillisecondArray}; +use datatypes::arrow::datatypes::DataType; use datatypes::data_type::ConcreteDataType; -use datatypes::prelude::VectorRef; -use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; -use datatypes::vectors::Helper; +use datatypes::schema::{ColumnSchema, Schema}; use prost::Message; -fn schema() -> SchemaRef { +fn schema() -> arrow::datatypes::SchemaRef { let schema = Schema::new(vec![ ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false), ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false), @@ -38,18 +38,21 @@ fn schema() -> SchemaRef { ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), false), ColumnSchema::new("v1", ConcreteDataType::int64_datatype(), false), ]); - Arc::new(schema) + schema.arrow_schema().clone() } /// Generate record batch according to provided schema and num rows. -fn prepare_random_record_batch(schema: SchemaRef, num_rows: usize) -> RecordBatch { +fn prepare_random_record_batch( + schema: arrow::datatypes::SchemaRef, + num_rows: usize, +) -> DfRecordBatch { let tag_candidates = (0..10000).map(|i| i.to_string()).collect::>(); - let columns: Vec = schema - .column_schemas() + let columns: Vec = schema + .fields .iter() - .map(|col| match &col.data_type { - ConcreteDataType::String(_) => { + .map(|col| match col.data_type() { + DataType::Utf8 => { let array = StringArray::from( (0..num_rows) .map(|_| { @@ -58,24 +61,24 @@ fn prepare_random_record_batch(schema: SchemaRef, num_rows: usize) -> RecordBatc }) .collect::>(), ); - Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap() + Arc::new(array) as ArrayRef } - ConcreteDataType::Timestamp(_) => { + DataType::Timestamp(_, _) => { let now = common_time::util::current_time_millis(); let array = TimestampMillisecondArray::from( (0..num_rows).map(|i| now + i as i64).collect::>(), ); - Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap() + Arc::new(array) as ArrayRef } - ConcreteDataType::Int64(_) => { + DataType::Int64 => { let array = Int64Array::from((0..num_rows).map(|i| i as i64).collect::>()); - Helper::try_into_vector(Arc::new(array) as ArrayRef).unwrap() + Arc::new(array) as ArrayRef } _ => unreachable!(), }) .collect(); - RecordBatch::new(schema, columns).unwrap() + DfRecordBatch::try_new(schema, columns).unwrap() } fn prepare_flight_data(num_rows: usize) -> (FlightData, FlightData) { @@ -83,7 +86,7 @@ fn prepare_flight_data(num_rows: usize) -> (FlightData, FlightData) { let mut encoder = FlightEncoder::default(); let schema_data = encoder.encode(FlightMessage::Schema(schema.clone())); let rb = prepare_random_record_batch(schema, num_rows); - let rb_data = encoder.encode(FlightMessage::Recordbatch(rb)); + let rb_data = encoder.encode(FlightMessage::RecordBatch(rb)); (schema_data, rb_data) } @@ -93,10 +96,10 @@ fn decode_flight_data_from_protobuf(schema: &Bytes, payload: &Bytes) -> DfRecord let mut decoder = FlightDecoder::default(); let _schema = decoder.try_decode(&schema).unwrap(); let message = decoder.try_decode(&payload).unwrap(); - let FlightMessage::Recordbatch(batch) = message else { + let FlightMessage::RecordBatch(batch) = message else { unreachable!("unexpected message"); }; - batch.into_df_record_batch() + batch } fn decode_flight_data_from_header_and_body( diff --git a/src/common/grpc/src/error.rs b/src/common/grpc/src/error.rs index 7c134122eb..af051e0516 100644 --- a/src/common/grpc/src/error.rs +++ b/src/common/grpc/src/error.rs @@ -60,13 +60,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to create RecordBatch"))] - CreateRecordBatch { - #[snafu(implicit)] - location: Location, - source: common_recordbatch::error::Error, - }, - #[snafu(display("Failed to convert Arrow type: {}", from))] Conversion { from: String, @@ -89,13 +82,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to convert Arrow Schema"))] - ConvertArrowSchema { - #[snafu(implicit)] - location: Location, - source: datatypes::error::Error, - }, - #[snafu(display("Not supported: {}", feat))] NotSupported { feat: String }, @@ -130,8 +116,6 @@ impl ErrorExt for Error { | Error::DecodeFlightData { .. } | Error::SerdeJson { .. } => StatusCode::Internal, - Error::CreateRecordBatch { source, .. } => source.status_code(), - Error::ConvertArrowSchema { source, .. } => source.status_code(), Error::Arrow { .. } => StatusCode::Internal, } } diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 7248a71a8a..3d4d94dc60 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -21,28 +21,24 @@ use api::v1::{AffectedRows, FlightMetadata, Metrics}; use arrow_flight::utils::flight_data_to_arrow_batch; use arrow_flight::{FlightData, SchemaAsIpc}; use common_base::bytes::Bytes; -use common_recordbatch::{DfRecordBatch, RecordBatch, RecordBatches}; +use common_recordbatch::DfRecordBatch; use datatypes::arrow; use datatypes::arrow::buffer::Buffer; -use datatypes::arrow::datatypes::Schema as ArrowSchema; +use datatypes::arrow::datatypes::{Schema as ArrowSchema, SchemaRef}; use datatypes::arrow::error::ArrowError; use datatypes::arrow::ipc::{convert, reader, root_as_message, writer, MessageHeader}; -use datatypes::schema::{Schema, SchemaRef}; use flatbuffers::FlatBufferBuilder; use prost::bytes::Bytes as ProstBytes; use prost::Message; use snafu::{OptionExt, ResultExt}; use crate::error; -use crate::error::{ - ConvertArrowSchemaSnafu, CreateRecordBatchSnafu, DecodeFlightDataSnafu, InvalidFlightDataSnafu, - Result, -}; +use crate::error::{DecodeFlightDataSnafu, InvalidFlightDataSnafu, Result}; #[derive(Debug, Clone)] pub enum FlightMessage { Schema(SchemaRef), - Recordbatch(RecordBatch), + RecordBatch(DfRecordBatch), AffectedRows(usize), Metrics(String), } @@ -70,14 +66,12 @@ impl Default for FlightEncoder { impl FlightEncoder { pub fn encode(&mut self, flight_message: FlightMessage) -> FlightData { match flight_message { - FlightMessage::Schema(schema) => { - SchemaAsIpc::new(schema.arrow_schema(), &self.write_options).into() - } - FlightMessage::Recordbatch(recordbatch) => { + FlightMessage::Schema(schema) => SchemaAsIpc::new(&schema, &self.write_options).into(), + FlightMessage::RecordBatch(record_batch) => { let (encoded_dictionaries, encoded_batch) = self .data_gen .encoded_batch( - recordbatch.df_record_batch(), + &record_batch, &mut self.dictionary_tracker, &self.write_options, ) @@ -135,9 +129,8 @@ impl FlightDecoder { pub fn try_from_schema_bytes(schema_bytes: &bytes::Bytes) -> Result { let arrow_schema = convert::try_schema_from_flatbuffer_bytes(&schema_bytes[..]) .context(error::ArrowSnafu)?; - let schema = Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?); Ok(Self { - schema: Some(schema), + schema: Some(Arc::new(arrow_schema)), schema_bytes: Some(schema_bytes.clone()), }) } @@ -154,7 +147,6 @@ impl FlightDecoder { reason: "Should have decoded schema first!", })? .clone(); - let arrow_schema = schema.arrow_schema().clone(); let message = root_as_message(&data_header[..]) .map_err(|err| { ArrowError::ParseError(format!("Unable to get root as message: {err:?}")) @@ -171,7 +163,7 @@ impl FlightDecoder { reader::read_record_batch( &Buffer::from(data_body.as_ref()), batch, - arrow_schema, + schema, &HashMap::new(), None, &message.version(), @@ -206,36 +198,29 @@ impl FlightDecoder { .fail() } MessageHeader::Schema => { - let arrow_schema = ArrowSchema::try_from(flight_data).map_err(|e| { + let arrow_schema = Arc::new(ArrowSchema::try_from(flight_data).map_err(|e| { InvalidFlightDataSnafu { reason: e.to_string(), } .build() - })?; - let schema = - Arc::new(Schema::try_from(arrow_schema).context(ConvertArrowSchemaSnafu)?); - - self.schema = Some(schema.clone()); + })?); + self.schema = Some(arrow_schema.clone()); self.schema_bytes = Some(flight_data.data_header.clone()); - Ok(FlightMessage::Schema(schema)) + Ok(FlightMessage::Schema(arrow_schema)) } MessageHeader::RecordBatch => { let schema = self.schema.clone().context(InvalidFlightDataSnafu { reason: "Should have decoded schema first!", })?; - let arrow_schema = schema.arrow_schema().clone(); - let arrow_batch = - flight_data_to_arrow_batch(flight_data, arrow_schema, &HashMap::new()) + flight_data_to_arrow_batch(flight_data, schema.clone(), &HashMap::new()) .map_err(|e| { InvalidFlightDataSnafu { reason: e.to_string(), } .build() })?; - let recordbatch = RecordBatch::try_from_df_record_batch(schema, arrow_batch) - .context(CreateRecordBatchSnafu)?; - Ok(FlightMessage::Recordbatch(recordbatch)) + Ok(FlightMessage::RecordBatch(arrow_batch)) } other => { let name = other.variant_name().unwrap_or("UNKNOWN"); @@ -256,14 +241,16 @@ impl FlightDecoder { } } -pub fn flight_messages_to_recordbatches(messages: Vec) -> Result { +pub fn flight_messages_to_recordbatches( + messages: Vec, +) -> Result> { if messages.is_empty() { - Ok(RecordBatches::empty()) + Ok(vec![]) } else { let mut recordbatches = Vec::with_capacity(messages.len() - 1); - let schema = match &messages[0] { - FlightMessage::Schema(schema) => schema.clone(), + match &messages[0] { + FlightMessage::Schema(_schema) => {} _ => { return InvalidFlightDataSnafu { reason: "First Flight Message must be schema!", @@ -274,7 +261,7 @@ pub fn flight_messages_to_recordbatches(messages: Vec) -> Result< for message in messages.into_iter().skip(1) { match message { - FlightMessage::Recordbatch(recordbatch) => recordbatches.push(recordbatch), + FlightMessage::RecordBatch(recordbatch) => recordbatches.push(recordbatch), _ => { return InvalidFlightDataSnafu { reason: "Expect the following Flight Messages are all Recordbatches!", @@ -284,7 +271,7 @@ pub fn flight_messages_to_recordbatches(messages: Vec) -> Result< } } - RecordBatches::try_new(schema, recordbatches).context(CreateRecordBatchSnafu) + Ok(recordbatches) } } @@ -305,38 +292,33 @@ fn build_none_flight_msg() -> Bytes { #[cfg(test)] mod test { use arrow_flight::utils::batches_to_flight_data; - use datatypes::arrow::datatypes::{DataType, Field}; - use datatypes::prelude::ConcreteDataType; - use datatypes::schema::ColumnSchema; - use datatypes::vectors::Int32Vector; + use datatypes::arrow::array::Int32Array; + use datatypes::arrow::datatypes::{DataType, Field, Schema}; use super::*; use crate::Error; #[test] fn test_try_decode() { - let arrow_schema = ArrowSchema::new(vec![Field::new("n", DataType::Int32, true)]); - let schema = Arc::new(Schema::try_from(arrow_schema.clone()).unwrap()); + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "n", + DataType::Int32, + true, + )])); - let batch1 = RecordBatch::new( + let batch1 = DfRecordBatch::try_new( schema.clone(), - vec![Arc::new(Int32Vector::from(vec![Some(1), None, Some(3)])) as _], + vec![Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])) as _], ) .unwrap(); - let batch2 = RecordBatch::new( + let batch2 = DfRecordBatch::try_new( schema.clone(), - vec![Arc::new(Int32Vector::from(vec![None, Some(5)])) as _], + vec![Arc::new(Int32Array::from(vec![None, Some(5)])) as _], ) .unwrap(); - let flight_data = batches_to_flight_data( - &arrow_schema, - vec![ - batch1.clone().into_df_record_batch(), - batch2.clone().into_df_record_batch(), - ], - ) - .unwrap(); + let flight_data = + batches_to_flight_data(&schema, vec![batch1.clone(), batch2.clone()]).unwrap(); assert_eq!(flight_data.len(), 3); let [d1, d2, d3] = flight_data.as_slice() else { unreachable!() @@ -362,15 +344,15 @@ mod test { let _ = decoder.schema.as_ref().unwrap(); let message = decoder.try_decode(d2).unwrap(); - assert!(matches!(message, FlightMessage::Recordbatch(_))); - let FlightMessage::Recordbatch(actual_batch) = message else { + assert!(matches!(message, FlightMessage::RecordBatch(_))); + let FlightMessage::RecordBatch(actual_batch) = message else { unreachable!() }; assert_eq!(actual_batch, batch1); let message = decoder.try_decode(d3).unwrap(); - assert!(matches!(message, FlightMessage::Recordbatch(_))); - let FlightMessage::Recordbatch(actual_batch) = message else { + assert!(matches!(message, FlightMessage::RecordBatch(_))); + let FlightMessage::RecordBatch(actual_batch) = message else { unreachable!() }; assert_eq!(actual_batch, batch2); @@ -378,27 +360,22 @@ mod test { #[test] fn test_flight_messages_to_recordbatches() { - let schema = Arc::new(Schema::new(vec![ColumnSchema::new( - "m", - ConcreteDataType::int32_datatype(), - true, - )])); - let batch1 = RecordBatch::new( + let schema = Arc::new(Schema::new(vec![Field::new("m", DataType::Int32, true)])); + let batch1 = DfRecordBatch::try_new( schema.clone(), - vec![Arc::new(Int32Vector::from(vec![Some(2), None, Some(4)])) as _], + vec![Arc::new(Int32Array::from(vec![Some(2), None, Some(4)])) as _], ) .unwrap(); - let batch2 = RecordBatch::new( + let batch2 = DfRecordBatch::try_new( schema.clone(), - vec![Arc::new(Int32Vector::from(vec![None, Some(6)])) as _], + vec![Arc::new(Int32Array::from(vec![None, Some(6)])) as _], ) .unwrap(); - let recordbatches = - RecordBatches::try_new(schema.clone(), vec![batch1.clone(), batch2.clone()]).unwrap(); + let recordbatches = vec![batch1.clone(), batch2.clone()]; let m1 = FlightMessage::Schema(schema); - let m2 = FlightMessage::Recordbatch(batch1); - let m3 = FlightMessage::Recordbatch(batch2); + let m2 = FlightMessage::RecordBatch(batch1); + let m3 = FlightMessage::RecordBatch(batch2); let result = flight_messages_to_recordbatches(vec![m2.clone(), m1.clone(), m3.clone()]); assert!(matches!(result, Err(Error::InvalidFlightData { .. }))); diff --git a/src/common/query/src/error.rs b/src/common/query/src/error.rs index a6d2997e95..a84dceecc6 100644 --- a/src/common/query/src/error.rs +++ b/src/common/query/src/error.rs @@ -103,13 +103,6 @@ pub enum Error { source: common_recordbatch::error::Error, }, - #[snafu(display("Failed to convert arrow schema"))] - ConvertArrowSchema { - #[snafu(implicit)] - location: Location, - source: DataTypeError, - }, - #[snafu(display("Failed to cast array to {:?}", typ))] TypeCast { #[snafu(source)] @@ -244,7 +237,6 @@ impl ErrorExt for Error { Error::InvalidInputType { source, .. } | Error::IntoVector { source, .. } | Error::FromScalarValue { source, .. } - | Error::ConvertArrowSchema { source, .. } | Error::FromArrowArray { source, .. } | Error::InvalidVectorString { source, .. } => source.status_code(), diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs index e0c8f28cef..815c4514e6 100644 --- a/src/operator/src/bulk_insert.rs +++ b/src/operator/src/bulk_insert.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::Arc; - use ahash::{HashMap, HashMapExt}; use api::v1::region::{ bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest, @@ -22,9 +20,7 @@ use api::v1::region::{ use common_base::AffectedRows; use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; use common_grpc::FlightData; -use common_recordbatch::RecordBatch; use common_telemetry::tracing_context::TracingContext; -use datatypes::schema::Schema; use snafu::ResultExt; use store_api::storage::RegionId; use table::metadata::TableId; @@ -48,10 +44,9 @@ impl Inserter { let message = decoder .try_decode(&data) .context(error::DecodeFlightDataSnafu)?; - let FlightMessage::Recordbatch(rb) = message else { + let FlightMessage::RecordBatch(record_batch) = message else { return Ok(0); }; - let record_batch = rb.df_record_batch(); decode_timer.observe_duration(); metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); metrics::BULK_REQUEST_ROWS @@ -71,7 +66,7 @@ impl Inserter { // find partitions for each row in the record batch let region_masks = partition_rule - .split_record_batch(record_batch) + .split_record_batch(&record_batch) .context(error::SplitInsertSnafu)?; partition_timer.observe_duration(); @@ -134,8 +129,6 @@ impl Inserter { .start_timer(); let mut handles = Vec::with_capacity(mask_per_datanode.len()); - let record_batch_schema = - Arc::new(Schema::try_from(record_batch.schema()).context(error::ConvertSchemaSnafu)?); // raw daya header and payload bytes. let mut raw_data_bytes = None; @@ -143,7 +136,6 @@ impl Inserter { for (region_id, mask) in masks { let rb = record_batch.clone(); let schema_bytes = schema_bytes.clone(); - let record_batch_schema = record_batch_schema.clone(); let node_manager = self.node_manager.clone(); let peer = peer.clone(); let raw_header_and_data = if mask.select_all() { @@ -166,21 +158,18 @@ impl Inserter { let filter_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["filter"]) .start_timer(); - let rb = arrow::compute::filter_record_batch(&rb, mask.array()) + let batch = arrow::compute::filter_record_batch(&rb, mask.array()) .context(error::ComputeArrowSnafu)?; filter_timer.observe_duration(); metrics::BULK_REQUEST_ROWS .with_label_values(&["rows_per_region"]) - .observe(rb.num_rows() as f64); + .observe(batch.num_rows() as f64); let encode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED .with_label_values(&["encode"]) .start_timer(); - let batch = - RecordBatch::try_from_df_record_batch(record_batch_schema, rb) - .context(error::BuildRecordBatchSnafu)?; let flight_data = - FlightEncoder::default().encode(FlightMessage::Recordbatch(batch)); + FlightEncoder::default().encode(FlightMessage::RecordBatch(batch)); encode_timer.observe_duration(); (flight_data.data_header, flight_data.data_body) }; diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index dea9f40af2..bd9ebe1762 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -60,7 +60,7 @@ impl FlightRecordBatchStream { mut recordbatches: SendableRecordBatchStream, mut tx: Sender>, ) { - let schema = recordbatches.schema(); + let schema = recordbatches.schema().arrow_schema().clone(); if let Err(e) = tx.send(Ok(FlightMessage::Schema(schema))).await { warn!(e; "stop sending Flight data"); return; @@ -69,7 +69,12 @@ impl FlightRecordBatchStream { while let Some(batch_or_err) = recordbatches.next().in_current_span().await { match batch_or_err { Ok(recordbatch) => { - if let Err(e) = tx.send(Ok(FlightMessage::Recordbatch(recordbatch))).await { + if let Err(e) = tx + .send(Ok(FlightMessage::RecordBatch( + recordbatch.into_df_record_batch(), + ))) + .await + { warn!(e; "stop sending Flight data"); return; } @@ -173,14 +178,14 @@ mod test { match flight_messages.remove(0) { FlightMessage::Schema(actual_schema) => { - assert_eq!(actual_schema, schema); + assert_eq!(&actual_schema, schema.arrow_schema()); } _ => unreachable!(), } match flight_messages.remove(0) { - FlightMessage::Recordbatch(actual_recordbatch) => { - assert_eq!(actual_recordbatch, recordbatch); + FlightMessage::RecordBatch(actual_recordbatch) => { + assert_eq!(&actual_recordbatch, recordbatch.df_record_batch()); } _ => unreachable!(), } diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index cebe59af5b..3d16466f28 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -136,7 +136,7 @@ mod test { async fn test_put_record_batches(client: &Database, record_batches: Vec) { let requests_count = record_batches.len(); - let schema = record_batches[0].schema.clone(); + let schema = record_batches[0].schema.arrow_schema().clone(); let stream = futures::stream::once(async move { let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema)); @@ -155,7 +155,7 @@ mod test { .enumerate() .map(|(i, x)| { let mut encoder = FlightEncoder::default(); - let message = FlightMessage::Recordbatch(x); + let message = FlightMessage::RecordBatch(x.into_df_record_batch()); let mut data = encoder.encode(message); let metadata = DoPutMetadata::new((i + 1) as i64); data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();