diff --git a/Cargo.lock b/Cargo.lock index fede4a0e06..6f0f9205ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4819,7 +4819,7 @@ dependencies = [ [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e82b0158cd38d4021edb4e4c0ae77f999051e62f#e82b0158cd38d4021edb4e4c0ae77f999051e62f" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ce65659d95a4a11c5d668d27edb4f1c1eed36824#ce65659d95a4a11c5d668d27edb4f1c1eed36824" dependencies = [ "prost 0.13.5", "serde", @@ -7849,8 +7849,11 @@ version = "0.15.0" dependencies = [ "ahash 0.8.11", "api", + "arrow 54.2.1", + "arrow-ipc 54.2.1", "async-stream", "async-trait", + "bytes", "catalog", "chrono", "client", @@ -7859,6 +7862,7 @@ dependencies = [ "common-datasource", "common-error", "common-function", + "common-grpc", "common-grpc-expr", "common-macro", "common-meta", @@ -7886,6 +7890,7 @@ dependencies = [ "partition", "path-slash", "prometheus", + "prost 0.13.5", "query", "regex", "serde_json", @@ -11276,6 +11281,7 @@ dependencies = [ "async-trait", "common-base", "common-error", + "common-grpc", "common-macro", "common-meta", "common-recordbatch", @@ -11287,6 +11293,8 @@ dependencies = [ "derive_builder 0.20.1", "futures", "humantime", + "lazy_static", + "prometheus", "prost 0.13.5", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index b567a36a49..f427f5e699 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -130,7 +130,7 @@ etcd-client = "0.14" fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e82b0158cd38d4021edb4e4c0ae77f999051e62f" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ce65659d95a4a11c5d668d27edb4f1c1eed36824" } hex = "0.4" http = "1" humantime = "2.1" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index 5c4a02f335..98539e8f05 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -1050,7 +1050,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue { Value::Int64(v) => Some(ValueData::I64Value(v)), Value::Float32(v) => Some(ValueData::F32Value(*v)), Value::Float64(v) => Some(ValueData::F64Value(*v)), - Value::String(v) => Some(ValueData::StringValue(v.as_utf8().to_string())), + Value::String(v) => Some(ValueData::StringValue(v.into_string())), Value::Binary(v) => Some(ValueData::BinaryValue(v.to_vec())), Value::Date(v) => Some(ValueData::DateValue(v.val())), Value::Timestamp(v) => Some(match v.unit() { diff --git a/src/common/grpc/src/flight.rs b/src/common/grpc/src/flight.rs index 872897ccbf..37a725e0cc 100644 --- a/src/common/grpc/src/flight.rs +++ b/src/common/grpc/src/flight.rs @@ -192,6 +192,10 @@ impl FlightDecoder { } } } + + pub fn schema(&self) -> Option<&SchemaRef> { + self.schema.as_ref() + } } pub fn flight_messages_to_recordbatches(messages: Vec) -> Result { diff --git a/src/common/grpc/src/lib.rs b/src/common/grpc/src/lib.rs index 2480c74fc1..287644b529 100644 --- a/src/common/grpc/src/lib.rs +++ b/src/common/grpc/src/lib.rs @@ -18,4 +18,5 @@ pub mod flight; pub mod precision; pub mod select; +pub use arrow_flight::FlightData; pub use error::Error; diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 4a83a99d12..7432519ecd 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -24,15 +24,18 @@ use api::v1::{ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_base::AffectedRows; +use common_grpc::flight::FlightDecoder; +use common_grpc::FlightData; use common_query::logical_plan::add_insert_to_logical_plan; use common_query::Output; use common_telemetry::tracing::{self}; use query::parser::PromQuery; use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef}; -use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch}; +use servers::query_handler::grpc::GrpcQueryHandler; use servers::query_handler::sql::SqlQueryHandler; use session::context::QueryContextRef; use snafu::{ensure, OptionExt, ResultExt}; +use table::metadata::TableId; use table::table_name::TableName; use crate::error::{ @@ -230,29 +233,35 @@ impl GrpcQueryHandler for Instance { async fn put_record_batch( &self, table: &TableName, - record_batch: RawRecordBatch, + table_id: &mut Option, + decoder: &mut FlightDecoder, + data: FlightData, ) -> Result { - let _table = self - .catalog_manager() - .table( - &table.catalog_name, - &table.schema_name, - &table.table_name, - None, - ) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table.to_string(), - })?; + let table_id = if let Some(table_id) = table_id { + *table_id + } else { + let table = self + .catalog_manager() + .table( + &table.catalog_name, + &table.schema_name, + &table.table_name, + None, + ) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: table.to_string(), + })?; + let id = table.table_info().table_id(); + *table_id = Some(id); + id + }; - // TODO(LFC): Implement it. - common_telemetry::debug!( - "calling put_record_batch with table: {:?} and record_batch size: {}", - table, - record_batch.len() - ); - Ok(record_batch.len()) + self.inserter + .handle_bulk_insert(table_id, decoder, data) + .await + .context(TableOperationSnafu) } } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 2c64ea38b1..3b4dbd81b8 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -18,6 +18,7 @@ #![feature(let_chains)] #![feature(assert_matches)] +#![feature(result_flattening)] #[cfg(any(test, feature = "test"))] #[cfg_attr(feature = "test", allow(unused))] diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 5331ba6fdc..9332deb8fc 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -62,9 +62,9 @@ pub struct WriteRequest { /// Rows to write. pub rows: Rows, /// Map column name to column index in `rows`. - name_to_index: HashMap, + pub name_to_index: HashMap, /// Whether each column has null. - has_null: Vec, + pub has_null: Vec, /// Write hint. pub hint: Option, /// Region metadata on the time of this request is created. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 33edb186d4..6614bcc749 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -835,7 +835,7 @@ impl RegionWorkerLoop { sender, } => { if let Some(region_metadata) = metadata { - self.handle_bulk_inserts(request, region_metadata, write_requests, sender) + self.handle_bulk_insert(request, region_metadata, write_requests, sender) .await; } else { error!("Cannot find region metadata for {}", request.region_id); diff --git a/src/mito2/src/worker/handle_bulk_insert.rs b/src/mito2/src/worker/handle_bulk_insert.rs index 9f2a745835..d54ba6b7ed 100644 --- a/src/mito2/src/worker/handle_bulk_insert.rs +++ b/src/mito2/src/worker/handle_bulk_insert.rs @@ -14,8 +14,11 @@ //! Handles bulk insert requests. +use std::collections::HashMap; + use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper}; use api::v1::{ColumnSchema, OpType, Row, Rows}; +use common_base::AffectedRows; use common_recordbatch::DfRecordBatch; use datatypes::prelude::VectorRef; use datatypes::vectors::Helper; @@ -23,62 +26,64 @@ use snafu::ResultExt; use store_api::logstore::LogStore; use store_api::metadata::RegionMetadataRef; use store_api::region_request::{BulkInsertPayload, RegionBulkInsertsRequest}; +use tokio::sync::oneshot::Receiver; use crate::error; use crate::request::{OptionOutputTx, SenderWriteRequest, WriteRequest}; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { - pub(crate) async fn handle_bulk_inserts( + pub(crate) async fn handle_bulk_insert( &mut self, - request: RegionBulkInsertsRequest, + mut request: RegionBulkInsertsRequest, region_metadata: RegionMetadataRef, pending_write_requests: &mut Vec, sender: OptionOutputTx, ) { - let schema = match region_metadata_to_column_schema(®ion_metadata) { - Ok(schema) => schema, - Err(e) => { - sender.send(Err(e)); - return; - } - }; + let (column_schemas, name_to_index) = + match region_metadata_to_column_schema(®ion_metadata) { + Ok(schema) => schema, + Err(e) => { + sender.send(Err(e)); + return; + } + }; + + // fast path: only one payload. + if request.payloads.len() == 1 { + match Self::handle_payload( + ®ion_metadata, + request.payloads.swap_remove(0), + pending_write_requests, + column_schemas, + name_to_index, + ) { + Ok(task_future) => common_runtime::spawn_global(async move { + sender.send(task_future.await.context(error::RecvSnafu).flatten()); + }), + Err(e) => { + sender.send(Err(e)); + return; + } + }; + return; + } + let mut pending_tasks = Vec::with_capacity(request.payloads.len()); for req in request.payloads { - match req { - BulkInsertPayload::ArrowIpc(df_record_batch) => { - let rows = match record_batch_to_rows(®ion_metadata, &df_record_batch) { - Ok(rows) => rows, - Err(e) => { - sender.send(Err(e)); - return; - } - }; - - let write_request = match WriteRequest::new( - region_metadata.region_id, - OpType::Put, - Rows { - schema: schema.clone(), - rows, - }, - Some(region_metadata.clone()), - ) { - Ok(write_request) => write_request, - Err(e) => { - sender.send(Err(e)); - return; - } - }; - - let (tx, rx) = tokio::sync::oneshot::channel(); - let sender = OptionOutputTx::from(tx); - let req = SenderWriteRequest { - sender, - request: write_request, - }; - pending_tasks.push(rx); - pending_write_requests.push(req); + match Self::handle_payload( + ®ion_metadata, + req, + pending_write_requests, + column_schemas.clone(), + name_to_index.clone(), + ) { + Ok(task_future) => { + pending_tasks.push(task_future); + } + Err(e) => { + sender.send(Err(e)); + return; } } } @@ -91,35 +96,135 @@ impl RegionWorkerLoop { return; } }; - let result1 = match results.into_iter().collect::>>() { - Ok(results) => Ok(results.into_iter().sum()), - Err(e) => Err(e), - }; - sender.send(result1); + sender.send( + match results.into_iter().collect::>>() { + Ok(results) => Ok(results.into_iter().sum()), + Err(e) => Err(e), + }, + ); }); } + + fn handle_payload( + region_metadata: &RegionMetadataRef, + payload: BulkInsertPayload, + pending_write_requests: &mut Vec, + column_schemas: Vec, + name_to_index: HashMap, + ) -> error::Result>> { + let rx = match payload { + BulkInsertPayload::ArrowIpc(rb) => Self::handle_arrow_ipc( + region_metadata, + rb, + pending_write_requests, + column_schemas, + name_to_index, + ), + BulkInsertPayload::Rows { data, has_null } => Self::handle_rows( + region_metadata, + data, + column_schemas, + has_null, + pending_write_requests, + name_to_index, + ), + }?; + + Ok(rx) + } + + fn handle_arrow_ipc( + region_metadata: &RegionMetadataRef, + df_record_batch: DfRecordBatch, + pending_write_requests: &mut Vec, + column_schemas: Vec, + name_to_index: HashMap, + ) -> error::Result>> { + let has_null: Vec<_> = df_record_batch + .columns() + .iter() + .map(|c| c.null_count() > 0) + .collect(); + + let rows = record_batch_to_rows(region_metadata, &df_record_batch)?; + + let write_request = WriteRequest { + region_id: region_metadata.region_id, + op_type: OpType::Put, + rows: Rows { + schema: column_schemas, + rows, + }, + name_to_index, + has_null, + hint: None, + region_metadata: Some(region_metadata.clone()), + }; + + let (tx, rx) = tokio::sync::oneshot::channel(); + let sender = OptionOutputTx::from(tx); + let req = SenderWriteRequest { + sender, + request: write_request, + }; + pending_write_requests.push(req); + Ok(rx) + } + + fn handle_rows( + region_metadata: &RegionMetadataRef, + rows: Vec, + column_schemas: Vec, + has_null: Vec, + pending_write_requests: &mut Vec, + name_to_index: HashMap, + ) -> error::Result>> { + let write_request = WriteRequest { + region_id: region_metadata.region_id, + op_type: OpType::Put, + rows: Rows { + schema: column_schemas, + rows, + }, + name_to_index, + has_null, + hint: None, + region_metadata: Some(region_metadata.clone()), + }; + + let (tx, rx) = tokio::sync::oneshot::channel(); + let sender = OptionOutputTx::from(tx); + let req = SenderWriteRequest { + sender, + request: write_request, + }; + pending_write_requests.push(req); + Ok(rx) + } } fn region_metadata_to_column_schema( region_meta: &RegionMetadataRef, -) -> error::Result> { - region_meta - .column_metadatas - .iter() - .map(|c| { - let wrapper = ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone()) - .with_context(|_| error::ConvertDataTypeSnafu { - data_type: c.column_schema.data_type.clone(), - })?; +) -> error::Result<(Vec, HashMap)> { + let mut column_schemas = Vec::with_capacity(region_meta.column_metadatas.len()); + let mut name_to_index = HashMap::with_capacity(region_meta.column_metadatas.len()); - Ok(ColumnSchema { - column_name: c.column_schema.name.clone(), - datatype: wrapper.datatype() as i32, - semantic_type: c.semantic_type as i32, - ..Default::default() - }) - }) - .collect::>() + for (idx, c) in region_meta.column_metadatas.iter().enumerate() { + let wrapper = ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone()) + .with_context(|_| error::ConvertDataTypeSnafu { + data_type: c.column_schema.data_type.clone(), + })?; + column_schemas.push(ColumnSchema { + column_name: c.column_schema.name.clone(), + datatype: wrapper.datatype() as i32, + semantic_type: c.semantic_type as i32, + ..Default::default() + }); + + name_to_index.insert(c.column_schema.name.clone(), idx); + } + + Ok((column_schemas, name_to_index)) } /// Convert [DfRecordBatch] to gRPC rows. @@ -187,7 +292,7 @@ mod tests { #[test] fn test_region_metadata_to_column_schema() { let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build()); - let result = region_metadata_to_column_schema(®ion_metadata).unwrap(); + let (result, _) = region_metadata_to_column_schema(®ion_metadata).unwrap(); assert_eq!(result.len(), 3); assert_eq!(result[0].column_name, "ts"); diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index f4ee399b42..34623b0409 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -13,8 +13,11 @@ workspace = true [dependencies] ahash.workspace = true api.workspace = true +arrow.workspace = true +arrow-ipc.workspace = true async-stream.workspace = true async-trait.workspace = true +bytes.workspace = true catalog.workspace = true chrono.workspace = true client.workspace = true @@ -23,6 +26,7 @@ common-catalog.workspace = true common-datasource.workspace = true common-error.workspace = true common-function.workspace = true +common-grpc.workspace = true common-grpc-expr.workspace = true common-macro.workspace = true common-meta.workspace = true @@ -48,6 +52,7 @@ object-store.workspace = true object_store_opendal.workspace = true partition.workspace = true prometheus.workspace = true +prost.workspace = true query.workspace = true regex.workspace = true serde_json.workspace = true diff --git a/src/operator/src/bulk_insert.rs b/src/operator/src/bulk_insert.rs new file mode 100644 index 0000000000..24cd81f358 --- /dev/null +++ b/src/operator/src/bulk_insert.rs @@ -0,0 +1,170 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ahash::{HashMap, HashMapExt}; +use api::v1::region::{ + bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest, + RegionRequestHeader, RegionSelection, +}; +use bytes::Bytes; +use common_base::AffectedRows; +use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage}; +use common_grpc::FlightData; +use common_telemetry::tracing_context::TracingContext; +use prost::Message; +use snafu::ResultExt; +use store_api::storage::RegionId; +use table::metadata::TableId; + +use crate::insert::Inserter; +use crate::{error, metrics}; + +impl Inserter { + /// Handle bulk insert request. + pub async fn handle_bulk_insert( + &self, + table_id: TableId, + decoder: &mut FlightDecoder, + data: FlightData, + ) -> error::Result { + let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED + .with_label_values(&["decode_request"]) + .start_timer(); + let raw_flight_data = Bytes::from(data.encode_to_vec()); + let body_size = data.data_body.len(); + // Build region server requests + let message = decoder + .try_decode(data) + .context(error::DecodeFlightDataSnafu)?; + let FlightMessage::Recordbatch(rb) = message else { + return Ok(0); + }; + metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64); + decode_timer.observe_duration(); + + // todo(hl): find a way to embed raw FlightData messages in greptimedb proto files so we don't have to encode here. + // safety: when reach here schema must be present. + let schema_message = FlightEncoder::default() + .encode(FlightMessage::Schema(decoder.schema().unwrap().clone())); + let schema_data = Bytes::from(schema_message.encode_to_vec()); + + let record_batch = rb.df_record_batch(); + + let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED + .with_label_values(&["partition"]) + .start_timer(); + let partition_rule = self + .partition_manager + .find_table_partition_rule(table_id) + .await + .context(error::InvalidPartitionSnafu)?; + + // find partitions for each row in the record batch + let region_masks = partition_rule + .split_record_batch(record_batch) + .context(error::SplitInsertSnafu)?; + partition_timer.observe_duration(); + + let group_request_timer = metrics::HANDLE_BULK_INSERT_ELAPSED + .with_label_values(&["group_request"]) + .start_timer(); + + let mut mask_per_datanode = HashMap::with_capacity(region_masks.len()); + for (region_number, mask) in region_masks { + let region_id = RegionId::new(table_id, region_number); + let datanode = self + .partition_manager + .find_region_leader(region_id) + .await + .context(error::FindRegionLeaderSnafu)?; + let selection = RegionSelection { + region_id: region_id.as_u64(), + selection: mask.values().inner().as_slice().to_vec(), + }; + mask_per_datanode + .entry(datanode) + .or_insert_with(Vec::new) + .push(selection); + } + group_request_timer.observe_duration(); + + let datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED + .with_label_values(&["datanode_handle"]) + .start_timer(); + // fast path: only one datanode + if mask_per_datanode.len() == 1 { + let (peer, requests) = mask_per_datanode.into_iter().next().unwrap(); + let datanode = self.node_manager.datanode(&peer).await; + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::BulkInsert(BulkInsertRequest { + body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { + schema: schema_data, + payload: raw_flight_data, + region_selection: requests, + })), + })), + }; + let response = datanode + .handle(request) + .await + .context(error::RequestRegionSnafu)?; + return Ok(response.affected_rows); + } + + let mut handles = Vec::with_capacity(mask_per_datanode.len()); + for (peer, masks) in mask_per_datanode { + let node_manager = self.node_manager.clone(); + let schema = schema_data.clone(); + let payload = raw_flight_data.clone(); + + let handle: common_runtime::JoinHandle> = + common_runtime::spawn_global(async move { + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::BulkInsert(BulkInsertRequest { + body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { + schema, + payload, + region_selection: masks, + })), + })), + }; + + let datanode = node_manager.datanode(&peer).await; + datanode + .handle(request) + .await + .context(error::RequestRegionSnafu) + }); + handles.push(handle); + } + + let region_responses = futures::future::try_join_all(handles) + .await + .context(error::JoinTaskSnafu)?; + datanode_handle_timer.observe_duration(); + let mut rows_inserted: usize = 0; + for res in region_responses { + rows_inserted += res?.affected_rows; + } + Ok(rows_inserted) + } +} diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 900a3b4310..488005f5d5 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -807,6 +807,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to decode arrow flight data"))] + DecodeFlightData { + source: common_grpc::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -850,116 +857,85 @@ impl ErrorExt for Error { | Error::CursorNotFound { .. } | Error::CursorExists { .. } | Error::CreatePartitionRules { .. } => StatusCode::InvalidArguments, - Error::TableAlreadyExists { .. } | Error::ViewAlreadyExists { .. } => { StatusCode::TableAlreadyExists } - Error::NotSupported { .. } | Error::ShowCreateTableBaseOnly { .. } | Error::SchemaReadOnly { .. } => StatusCode::Unsupported, - Error::TableMetadataManager { source, .. } => source.status_code(), - Error::ParseSql { source, .. } => source.status_code(), - Error::InvalidateTableCache { source, .. } => source.status_code(), - Error::ParseFileFormat { source, .. } | Error::InferSchema { source, .. } => { source.status_code() } - Error::Table { source, .. } | Error::Insert { source, .. } => source.status_code(), - Error::ConvertColumnDefaultConstraint { source, .. } | Error::CreateTableInfo { source, .. } | Error::IntoVectors { source, .. } => source.status_code(), - Error::RequestInserts { source, .. } | Error::FindViewInfo { source, .. } => { source.status_code() } Error::RequestRegion { source, .. } => source.status_code(), Error::RequestDeletes { source, .. } => source.status_code(), Error::SubstraitCodec { source, .. } => source.status_code(), - Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => { source.status_code() } - Error::MissingTimeIndexColumn { source, .. } => source.status_code(), - Error::BuildDfLogicalPlan { .. } | Error::BuildTableMeta { .. } | Error::MissingInsertBody { .. } => StatusCode::Internal, - Error::EncodeJson { .. } => StatusCode::Unexpected, - Error::ViewNotFound { .. } | Error::ViewInfoNotFound { .. } | Error::TableNotFound { .. } => StatusCode::TableNotFound, - Error::FlowNotFound { .. } => StatusCode::FlowNotFound, - Error::JoinTask { .. } => StatusCode::Internal, - Error::BuildParquetRecordBatchStream { .. } | Error::BuildFileStream { .. } | Error::WriteStreamToFile { .. } | Error::ReadDfRecordBatch { .. } | Error::Unexpected { .. } => StatusCode::Unexpected, - Error::Catalog { source, .. } => source.status_code(), - Error::BuildCreateExprOnInsertion { source, .. } | Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(), - Error::ExecuteStatement { source, .. } | Error::ExtractTableNames { source, .. } | Error::PlanStatement { source, .. } | Error::ParseQuery { source, .. } | Error::ExecLogicalPlan { source, .. } | Error::DescribeStatement { source, .. } => source.status_code(), - Error::AlterExprToRequest { source, .. } => source.status_code(), - Error::External { source, .. } => source.status_code(), Error::DeserializePartition { source, .. } | Error::FindTablePartitionRule { source, .. } | Error::SplitInsert { source, .. } | Error::SplitDelete { source, .. } | Error::FindRegionLeader { source, .. } => source.status_code(), - Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, - Error::ReadObject { .. } | Error::ReadParquetMetadata { .. } | Error::ReadOrc { .. } => StatusCode::StorageUnavailable, - Error::ListObjects { source, .. } | Error::ParseUrl { source, .. } | Error::BuildBackend { source, .. } => source.status_code(), - Error::ExecuteDdl { source, .. } => source.status_code(), Error::InvalidCopyParameter { .. } | Error::InvalidCopyDatabasePath { .. } => { StatusCode::InvalidArguments } - Error::ColumnDefaultValue { source, .. } => source.status_code(), - Error::EmptyDdlExpr { .. } | Error::InvalidPartitionRule { .. } | Error::ParseSqlValue { .. } | Error::InvalidTimestampRange { .. } => StatusCode::InvalidArguments, - Error::CreateLogicalTables { .. } => StatusCode::Unexpected, - Error::ExecuteAdminFunction { source, .. } => source.status_code(), Error::BuildRecordBatch { source, .. } => source.status_code(), - Error::UpgradeCatalogManagerRef { .. } => StatusCode::Internal, Error::StatementTimeout { .. } => StatusCode::Cancelled, - Error::ColumnOptions { source, .. } => source.status_code(), + Error::DecodeFlightData { source, .. } => source.status_code(), } } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 2f7c30ccd0..b12bd91989 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -76,8 +76,8 @@ use crate::statement::StatementExecutor; pub struct Inserter { catalog_manager: CatalogManagerRef, - partition_manager: PartitionRuleManagerRef, - node_manager: NodeManagerRef, + pub(crate) partition_manager: PartitionRuleManagerRef, + pub(crate) node_manager: NodeManagerRef, table_flownode_set_cache: TableFlownodeSetCacheRef, } diff --git a/src/operator/src/lib.rs b/src/operator/src/lib.rs index ae00ef40dd..783501c6f0 100644 --- a/src/operator/src/lib.rs +++ b/src/operator/src/lib.rs @@ -15,6 +15,7 @@ #![feature(assert_matches)] #![feature(if_let_guard)] +mod bulk_insert; pub mod delete; pub mod error; pub mod expr_helper; diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index e6a4827e48..07f6897e37 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -62,4 +62,35 @@ lazy_static! { &["table_type"] ) .unwrap(); + pub static ref HANDLE_BULK_INSERT_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_table_operator_handle_bulk_insert", + "table operator duration to handle bulk inserts", + &["stage"], + vec![ + 0.001, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.10, 0.15, 0.2, 0.3, 0.4, 0.5, 1.0, 1.5, + 2.0, 2.5, 3.0, 4.0, 5.0 + ] + ) + .unwrap(); + pub static ref BULK_REQUEST_MESSAGE_SIZE: Histogram = register_histogram!( + "greptime_table_operator_bulk_insert_message_size", + "table operator bulk inserts message encoded size", + vec![ + 32768.0, + 65536.0, + 131072.0, + 262144.0, + 524288.0, + 1048576.0, + 2097152.0, + 4194304.0, + 8388608.0, + 16777216.0, + 33554432.0, + 67108864.0, + 134217728.0, + 268435456.0 + ] + ) + .unwrap(); } diff --git a/src/partition/src/multi_dim.rs b/src/partition/src/multi_dim.rs index 551fb6a8de..9bbbf8f015 100644 --- a/src/partition/src/multi_dim.rs +++ b/src/partition/src/multi_dim.rs @@ -211,6 +211,13 @@ impl MultiDimPartitionRule { record_batch: &RecordBatch, ) -> Result> { let num_rows = record_batch.num_rows(); + if self.regions.len() == 1 { + return Ok( + [(self.regions[0], BooleanArray::from(vec![true; num_rows]))] + .into_iter() + .collect(), + ); + } let physical_exprs = { let cache_read_guard = self.physical_expr_cache.read().unwrap(); if let Some((cached_exprs, schema)) = cache_read_guard.as_ref() diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index 648cfff377..4e1ced8662 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -49,7 +49,6 @@ use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler}; use crate::grpc::TonicResult; use crate::http::header::constants::GREPTIME_DB_HEADER_NAME; use crate::http::AUTHORIZATION_HEADER; -use crate::query_handler::grpc::RawRecordBatch; pub type TonicStream = Pin> + Send + 'static>>; @@ -257,17 +256,22 @@ impl FlightCraft for GreptimeRequestHandler { pub(crate) struct PutRecordBatchRequest { pub(crate) table_name: TableName, pub(crate) request_id: i64, - pub(crate) record_batch: RawRecordBatch, + pub(crate) data: FlightData, } impl PutRecordBatchRequest { fn try_new(table_name: TableName, flight_data: FlightData) -> Result { - let metadata: DoPutMetadata = - serde_json::from_slice(&flight_data.app_metadata).context(ParseJsonSnafu)?; + let request_id = if !flight_data.app_metadata.is_empty() { + let metadata: DoPutMetadata = + serde_json::from_slice(&flight_data.app_metadata).context(ParseJsonSnafu)?; + metadata.request_id() + } else { + 0 + }; Ok(Self { table_name, - request_id: metadata.request_id(), - record_batch: flight_data.data_body, + request_id, + data: flight_data, }) } } diff --git a/src/servers/src/grpc/greptime_handler.rs b/src/servers/src/grpc/greptime_handler.rs index 73e1a768c8..f29c3f8e56 100644 --- a/src/servers/src/grpc/greptime_handler.rs +++ b/src/servers/src/grpc/greptime_handler.rs @@ -27,6 +27,7 @@ use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_grpc::flight::do_put::DoPutResponse; +use common_grpc::flight::FlightDecoder; use common_query::Output; use common_runtime::runtime::RuntimeTrait; use common_runtime::Runtime; @@ -36,6 +37,7 @@ use common_time::timezone::parse_timezone; use futures_util::StreamExt; use session::context::{QueryContext, QueryContextBuilder, QueryContextRef}; use snafu::{OptionExt, ResultExt}; +use table::metadata::TableId; use tokio::sync::mpsc; use crate::error::Error::UnsupportedAuthScheme; @@ -45,6 +47,7 @@ use crate::error::{ }; use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream}; use crate::grpc::TonicResult; +use crate::metrics; use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER}; use crate::query_handler::grpc::ServerGrpcQueryHandlerRef; @@ -140,6 +143,10 @@ impl GreptimeRequestHandler { .clone() .unwrap_or_else(common_runtime::global_runtime); runtime.spawn(async move { + // Cached table id + let mut table_id: Option = None; + + let mut decoder = FlightDecoder::default(); while let Some(request) = stream.next().await { let request = match request { Ok(request) => request, @@ -148,13 +155,17 @@ impl GreptimeRequestHandler { break; } }; - let PutRecordBatchRequest { table_name, request_id, - record_batch, + data, } = request; - let result = handler.put_record_batch(&table_name, record_batch).await; + + let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer(); + let result = handler + .put_record_batch(&table_name, &mut table_id, &mut decoder, data) + .await; + timer.observe_duration(); let result = result .map(|x| DoPutResponse::new(request_id, x)) .map_err(Into::into); diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index da465f5707..cc10b38708 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -23,8 +23,8 @@ use axum::middleware::Next; use axum::response::IntoResponse; use lazy_static::lazy_static; use prometheus::{ - register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge, - Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, + register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec, + register_int_gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, }; use tonic::body::BoxBody; use tower::{Layer, Service}; @@ -276,8 +276,12 @@ lazy_static! { "greptime_servers_jaeger_query_elapsed", "servers jaeger query elapsed", &[METRIC_DB_LABEL, METRIC_PATH_LABEL] - ) -.unwrap(); + ).unwrap(); + + pub static ref GRPC_BULK_INSERT_ELAPSED: Histogram = register_histogram!( + "greptime_servers_bulk_insert_elapsed", + "servers handle bulk insert elapsed", + ).unwrap(); } // Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs diff --git a/src/servers/src/query_handler/grpc.rs b/src/servers/src/query_handler/grpc.rs index 7af5c9935a..c7055092f6 100644 --- a/src/servers/src/query_handler/grpc.rs +++ b/src/servers/src/query_handler/grpc.rs @@ -15,12 +15,15 @@ use std::sync::Arc; use api::v1::greptime_request::Request; +use arrow_flight::FlightData; use async_trait::async_trait; use common_base::AffectedRows; use common_error::ext::{BoxedError, ErrorExt}; +use common_grpc::flight::FlightDecoder; use common_query::Output; use session::context::QueryContextRef; use snafu::ResultExt; +use table::metadata::TableId; use table::table_name::TableName; use crate::error::{self, Result}; @@ -43,7 +46,9 @@ pub trait GrpcQueryHandler { async fn put_record_batch( &self, table: &TableName, - record_batch: RawRecordBatch, + table_id: &mut Option, + decoder: &mut FlightDecoder, + flight_data: FlightData, ) -> std::result::Result; } @@ -73,10 +78,12 @@ where async fn put_record_batch( &self, table: &TableName, - record_batch: RawRecordBatch, + table_id: &mut Option, + decoder: &mut FlightDecoder, + data: FlightData, ) -> Result { self.0 - .put_record_batch(table, record_batch) + .put_record_batch(table, table_id, decoder, data) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcRequestSnafu) diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 7bd8a696be..752145b7cb 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -16,10 +16,12 @@ use std::sync::Arc; use api::v1::greptime_request::Request; use api::v1::query_request::Query; +use arrow_flight::FlightData; use async_trait::async_trait; use catalog::memory::MemoryCatalogManager; use common_base::AffectedRows; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; +use common_grpc::flight::FlightDecoder; use common_query::Output; use datafusion_expr::LogicalPlan; use query::options::QueryOptions; @@ -27,11 +29,12 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::query_engine::DescribeResult; use query::{QueryEngineFactory, QueryEngineRef}; use servers::error::{Error, NotSupportedSnafu, Result}; -use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch, ServerGrpcQueryHandlerRef}; +use servers::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerRef}; use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler}; use session::context::QueryContextRef; use snafu::ensure; use sql::statements::statement::Statement; +use table::metadata::TableId; use table::table_name::TableName; use table::TableRef; @@ -161,10 +164,14 @@ impl GrpcQueryHandler for DummyInstance { async fn put_record_batch( &self, table: &TableName, - record_batch: RawRecordBatch, + table_id: &mut Option, + decoder: &mut FlightDecoder, + data: FlightData, ) -> std::result::Result { let _ = table; - let _ = record_batch; + let _ = data; + let _ = table_id; + let _ = decoder; unimplemented!() } } diff --git a/src/store-api/Cargo.toml b/src/store-api/Cargo.toml index 7c974661e3..4cc2cf1232 100644 --- a/src/store-api/Cargo.toml +++ b/src/store-api/Cargo.toml @@ -13,6 +13,7 @@ aquamarine.workspace = true async-trait.workspace = true common-base.workspace = true common-error.workspace = true +common-grpc.workspace = true common-macro.workspace = true common-recordbatch.workspace = true common-time.workspace = true @@ -23,6 +24,8 @@ datatypes.workspace = true derive_builder.workspace = true futures.workspace = true humantime.workspace = true +lazy_static.workspace = true +prometheus.workspace = true prost.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index c6ee28f5d3..09e8634e43 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -23,6 +23,7 @@ pub mod logstore; pub mod manifest; pub mod metadata; pub mod metric_engine_consts; +mod metrics; pub mod mito_engine_options; pub mod path_utils; pub mod region_engine; diff --git a/src/store-api/src/metadata.rs b/src/store-api/src/metadata.rs index e8372a8df2..f8df4745d4 100644 --- a/src/store-api/src/metadata.rs +++ b/src/store-api/src/metadata.rs @@ -973,6 +973,21 @@ pub enum MetadataError { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to encode/decode flight message"))] + FlightCodec { + source: common_grpc::Error, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Failed to decode prost message"))] + Prost { + #[snafu(source)] + error: prost::DecodeError, + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for MetadataError { diff --git a/src/store-api/src/metrics.rs b/src/store-api/src/metrics.rs new file mode 100644 index 0000000000..7989e0f295 --- /dev/null +++ b/src/store-api/src/metrics.rs @@ -0,0 +1,29 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use lazy_static::lazy_static; +use prometheus::{register_histogram_vec, HistogramVec}; + +lazy_static! { + pub static ref CONVERT_REGION_BULK_REQUEST: HistogramVec = register_histogram_vec!( + "greptime_datanode_convert_region_request", + "datanode duration to convert region request", + &["stage"], + vec![ + 0.001, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.10, 0.15, 0.2, 0.3, 0.4, 0.5, 1.0, 1.5, + 2.0, 2.5, 3.0, 4.0, 5.0 + ] + ) + .unwrap(); +} diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 5a6e1289c6..e083ddbf95 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -12,43 +12,50 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::Entry; use std::collections::HashMap; use std::fmt::{self, Display}; -use std::io::Cursor; +use std::time::{Duration, Instant}; -use api::helper::ColumnDataTypeWrapper; +use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper}; use api::v1::add_column_location::LocationType; use api::v1::column_def::{ as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type, }; +use api::v1::region::bulk_insert_request::Body; use api::v1::region::{ - alter_request, compact_request, region_request, AlterRequest, AlterRequests, - BulkInsertRequests, CloseRequest, CompactRequest, CreateRequest, CreateRequests, - DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest, - TruncateRequest, + alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest, + CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, + DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest, }; use api::v1::{ - self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows, + self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Row, Rows, SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint, }; pub use common_base::AffectedRows; +use common_grpc::flight::{FlightDecoder, FlightMessage}; +use common_grpc::FlightData; use common_recordbatch::DfRecordBatch; use common_time::TimeToLive; -use datatypes::arrow::ipc::reader::FileReader; -use datatypes::prelude::ConcreteDataType; +use datatypes::arrow; +use datatypes::arrow::array::{Array, BooleanArray}; +use datatypes::arrow::buffer::{BooleanBuffer, Buffer}; +use datatypes::prelude::{ConcreteDataType, VectorRef}; use datatypes::schema::{FulltextOptions, SkippingIndexOptions}; +use datatypes::vectors::Helper; +use prost::Message; use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use strum::{AsRefStr, IntoStaticStr}; use crate::logstore::entry; use crate::metadata::{ - ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, InvalidRawRegionRequestSnafu, - InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu, - InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu, + ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, FlightCodecSnafu, + InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu, + InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result, + UnexpectedSnafu, }; use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; +use crate::metrics; use crate::mito_engine_options::{ TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS, TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE, @@ -152,7 +159,7 @@ impl RegionRequest { region_request::Body::Creates(creates) => make_region_creates(creates), region_request::Body::Drops(drops) => make_region_drops(drops), region_request::Body::Alters(alters) => make_region_alters(alters), - region_request::Body::BulkInserts(bulk) => make_region_bulk_inserts(bulk), + region_request::Body::BulkInsert(bulk) => make_region_rows_bulk_inserts(bulk), region_request::Body::Sync(_) => UnexpectedSnafu { reason: "Sync request should be handled separately by RegionServer", } @@ -326,44 +333,51 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result Result> { - let mut region_requests: HashMap> = - HashMap::with_capacity(requests.requests.len()); +/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId]. +#[allow(unused)] +fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result> { + let Some(Body::ArrowIpc(request)) = request.body else { + return Ok(vec![]); + }; - for req in requests.requests { - let region_id = req.region_id; - match req.payload_type() { - api::v1::region::BulkInsertType::ArrowIpc => { - // todo(hl): use StreamReader instead - let reader = FileReader::try_new(Cursor::new(req.payload), None) - .context(DecodeArrowIpcSnafu)?; - let record_batches = reader - .map(|b| b.map(BulkInsertPayload::ArrowIpc)) - .try_collect::>() - .context(DecodeArrowIpcSnafu)?; - match region_requests.entry(region_id) { - Entry::Occupied(mut e) => { - e.get_mut().extend(record_batches); - } - Entry::Vacant(e) => { - e.insert(record_batches); - } - } - } - } + let mut region_requests: HashMap = + HashMap::with_capacity(request.region_selection.len()); + + let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?; + let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?; + let mut decoder = FlightDecoder::default(); + let _schema_message = decoder.try_decode(schema_data).context(FlightCodecSnafu)?; + let FlightMessage::Recordbatch(rb) = + decoder.try_decode(payload_data).context(FlightCodecSnafu)? + else { + unreachable!("Always expect record batch message after schema"); + }; + + for region_selection in request.region_selection { + let region_id = region_selection.region_id; + let region_mask = BooleanArray::new( + BooleanBuffer::new(Buffer::from(region_selection.selection), 0, rb.num_rows()), + None, + ); + + let region_batch = if region_mask.true_count() == rb.num_rows() { + rb.df_record_batch().clone() + } else { + arrow::compute::filter_record_batch(rb.df_record_batch(), ®ion_mask) + .context(DecodeArrowIpcSnafu)? + }; + + region_requests.insert(region_id, BulkInsertPayload::ArrowIpc(region_batch)); } let result = region_requests .into_iter() - .map(|(region_id, payloads)| { + .map(|(region_id, payload)| { ( region_id.into(), RegionRequest::BulkInserts(RegionBulkInsertsRequest { region_id: region_id.into(), - payloads, + payloads: vec![payload], }), ) }) @@ -371,6 +385,116 @@ fn make_region_bulk_inserts( Ok(result) } +/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId]. +fn make_region_rows_bulk_inserts( + request: BulkInsertRequest, +) -> Result> { + let Some(Body::ArrowIpc(request)) = request.body else { + return Ok(vec![]); + }; + + let mut region_requests: HashMap = + HashMap::with_capacity(request.region_selection.len()); + + let decode_timer = metrics::CONVERT_REGION_BULK_REQUEST + .with_label_values(&["decode"]) + .start_timer(); + let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?; + let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?; + let mut decoder = FlightDecoder::default(); + let _schema_message = decoder.try_decode(schema_data).context(FlightCodecSnafu)?; + let FlightMessage::Recordbatch(rb) = + decoder.try_decode(payload_data).context(FlightCodecSnafu)? + else { + unreachable!("Always expect record batch message after schema"); + }; + decode_timer.observe_duration(); + + let filter_timer = metrics::CONVERT_REGION_BULK_REQUEST.with_label_values(&["filter_batch"]); + let convert_to_rows_timer = + metrics::CONVERT_REGION_BULK_REQUEST.with_label_values(&["convert_to_rows"]); + + let mut filter_time = Duration::default(); + let mut convert_to_rows_time = Duration::default(); + for region_selection in request.region_selection { + let region_id = region_selection.region_id; + let start = Instant::now(); + let region_mask = BooleanArray::new( + BooleanBuffer::new(Buffer::from(region_selection.selection), 0, rb.num_rows()), + None, + ); + + let region_batch = if region_mask.true_count() == rb.num_rows() { + rb.df_record_batch().clone() + } else { + arrow::compute::filter_record_batch(rb.df_record_batch(), ®ion_mask) + .context(DecodeArrowIpcSnafu)? + }; + filter_time += start.elapsed(); + + let start = Instant::now(); + let (rows, has_null) = record_batch_to_rows(®ion_batch); + convert_to_rows_time += start.elapsed(); + + region_requests.insert( + region_id, + BulkInsertPayload::Rows { + data: rows, + has_null, + }, + ); + } + filter_timer.observe(filter_time.as_secs_f64()); + convert_to_rows_timer.observe(convert_to_rows_time.as_secs_f64()); + + let result = region_requests + .into_iter() + .map(|(region_id, payload)| { + ( + region_id.into(), + RegionRequest::BulkInserts(RegionBulkInsertsRequest { + region_id: region_id.into(), + payloads: vec![payload], + }), + ) + }) + .collect::>(); + + Ok(result) +} + +/// Convert [DfRecordBatch] to gRPC rows. +fn record_batch_to_rows(rb: &DfRecordBatch) -> (Vec, Vec) { + let num_rows = rb.num_rows(); + let mut rows = Vec::with_capacity(num_rows); + if num_rows == 0 { + return (rows, vec![false; rb.num_columns()]); + } + + let mut vectors = Vec::with_capacity(rb.num_columns()); + let mut has_null = Vec::with_capacity(rb.num_columns()); + for c in rb.columns() { + vectors.push(Helper::try_into_vector(c).unwrap()); + has_null.push(c.null_count() > 0); + } + + for row_idx in 0..num_rows { + let row = Row { + values: row_at(&vectors, row_idx), + }; + rows.push(row); + } + (rows, has_null) +} + +fn row_at(vectors: &[VectorRef], row_idx: usize) -> Vec { + let mut row = Vec::with_capacity(vectors.len()); + for a in vectors { + row.push(value_to_grpc_value(a.get(row_idx))) + } + row +} + /// Request to put data into a region. #[derive(Debug)] pub struct RegionPutRequest { @@ -1184,6 +1308,7 @@ pub struct RegionBulkInsertsRequest { #[derive(Debug, Clone)] pub enum BulkInsertPayload { ArrowIpc(DfRecordBatch), + Rows { data: Vec, has_null: Vec }, } impl fmt::Display for RegionRequest { diff --git a/tests-integration/src/grpc/flight.rs b/tests-integration/src/grpc/flight.rs index e97165f16c..5e079bb037 100644 --- a/tests-integration/src/grpc/flight.rs +++ b/tests-integration/src/grpc/flight.rs @@ -61,8 +61,19 @@ mod test { let sql = "select ts, a, b from foo order by ts"; let expected = "\ -++ -++"; ++-------------------------+----+----+ +| ts | a | b | ++-------------------------+----+----+ +| 1970-01-01T00:00:00.001 | -1 | s1 | +| 1970-01-01T00:00:00.002 | -2 | s2 | +| 1970-01-01T00:00:00.003 | -3 | s3 | +| 1970-01-01T00:00:00.004 | -4 | s4 | +| 1970-01-01T00:00:00.005 | -5 | s5 | +| 1970-01-01T00:00:00.006 | -6 | s6 | +| 1970-01-01T00:00:00.007 | -7 | s7 | +| 1970-01-01T00:00:00.008 | -8 | s8 | +| 1970-01-01T00:00:00.009 | -9 | s9 | ++-------------------------+----+----+"; query_and_expect(db.frontend().as_ref(), sql, expected).await; } @@ -110,35 +121,52 @@ mod test { let sql = "select ts, a, b from foo order by ts"; let expected = "\ -++ -++"; ++-------------------------+----+----+ +| ts | a | b | ++-------------------------+----+----+ +| 1970-01-01T00:00:00.001 | -1 | s1 | +| 1970-01-01T00:00:00.002 | -2 | s2 | +| 1970-01-01T00:00:00.003 | -3 | s3 | +| 1970-01-01T00:00:00.004 | -4 | s4 | +| 1970-01-01T00:00:00.005 | -5 | s5 | +| 1970-01-01T00:00:00.006 | -6 | s6 | +| 1970-01-01T00:00:00.007 | -7 | s7 | +| 1970-01-01T00:00:00.008 | -8 | s8 | +| 1970-01-01T00:00:00.009 | -9 | s9 | ++-------------------------+----+----+"; query_and_expect(db.fe_instance().as_ref(), sql, expected).await; } async fn test_put_record_batches(client: &Database, record_batches: Vec) { let requests_count = record_batches.len(); + let schema = record_batches[0].schema.clone(); - let stream = tokio_stream::iter(record_batches) - .enumerate() - .map(|(i, x)| { - let mut encoder = FlightEncoder::default(); - let message = FlightMessage::Recordbatch(x); - let mut data = encoder.encode(message); - - let metadata = DoPutMetadata::new(i as i64); - data.app_metadata = serde_json::to_vec(&metadata).unwrap().into(); - - // first message in "DoPut" stream should carry table name in flight descriptor - if i == 0 { - data.flight_descriptor = Some(FlightDescriptor { - r#type: arrow_flight::flight_descriptor::DescriptorType::Path as i32, - path: vec!["foo".to_string()], - ..Default::default() - }); - } - data - }) - .boxed(); + let stream = futures::stream::once(async move { + let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema)); + let metadata = DoPutMetadata::new(0); + schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into(); + // first message in "DoPut" stream should carry table name in flight descriptor + schema_data.flight_descriptor = Some(FlightDescriptor { + r#type: arrow_flight::flight_descriptor::DescriptorType::Path as i32, + path: vec!["foo".to_string()], + ..Default::default() + }); + schema_data + }) + .chain( + tokio_stream::iter(record_batches) + .enumerate() + .map(|(i, x)| { + let mut encoder = FlightEncoder::default(); + let message = FlightMessage::Recordbatch(x); + let mut data = encoder.encode(message); + let metadata = DoPutMetadata::new((i + 1) as i64); + data.app_metadata = serde_json::to_vec(&metadata).unwrap().into(); + data + }) + .boxed(), + ) + .boxed(); let response_stream = client.do_put(stream).await.unwrap(); @@ -148,9 +176,14 @@ mod test { assert!(response.is_ok(), "{}", response.err().unwrap()); let response = response.unwrap(); assert_eq!(response.request_id(), i as i64); - assert_eq!(response.affected_rows(), 448); + if i == 0 { + // the first is schema + assert_eq!(response.affected_rows(), 0); + } else { + assert_eq!(response.affected_rows(), 3); + } } - assert_eq!(requests_count, responses_count); + assert_eq!(requests_count + 1, responses_count); } fn create_record_batches(start: i64) -> Vec {