feat: bridge bulk insert (#5927)

* feat/bridge-bulk-insert:
 ## Implement Bulk Insert and Update Dependencies

 - **Bulk Insert Implementation**: Added `handle_bulk_inserts` method in `src/operator/src/bulk_insert.rs` to manage bulk insert requests using `FlightDecoder` and `FlightData`.
 - **Dependency Updates**: Updated `Cargo.lock` and `Cargo.toml` to use the latest revision of `greptime-proto` and added new dependencies like `arrow`, `arrow-ipc`, `bytes`, and `prost`.
 - **gRPC Enhancements**: Modified `put_record_batch` method in `src/frontend/src/instance/grpc.rs` and `src/servers/src/grpc/flight.rs` to handle `FlightData` instead of `RawRecordBatch`.
 - **Error Handling**: Added new error types in `src/operator/src/error.rs` for handling Arrow operations and decoding flight data.
 - **Miscellaneous**: Updated `src/operator/src/insert.rs` to expose `partition_manager` and `node_manager` as public fields.

* feat/bridge-bulk-insert:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in `Cargo.lock` and `Cargo.toml`.
 - **Refactor gRPC Query Handling**: Removed `RawRecordBatch` usage from `grpc.rs`, `flight.rs`, `greptime_handler.rs`, and test files, simplifying the gRPC query handling.
 - **Enhance Bulk Insert Logic**: Improved bulk insert logic in `bulk_insert.rs` and `region_request.rs` by using `FlightDecoder` and `BooleanArray` for better performance and clarity.
 - **Add `common-grpc` Dependency**: Added `common-grpc` as a workspace dependency in `store-api/Cargo.toml` to support gRPC functionalities.

* fix: clippy

* fix schema serialization

* feat/bridge-bulk-insert:
 Add error handling for encoding/decoding in `metadata.rs` and `region_request.rs`

 - Introduced new error variants `FlightCodec` and `Prost` in `MetadataError` to handle encoding/decoding failures in `metadata.rs`.
 - Updated `make_region_bulk_inserts` function in `region_request.rs` to use `context` for error handling with `ProstSnafu` and `FlightCodecSnafu`.
 - Enhanced error handling for `FlightData` decoding and `filter_record_batch` operations.

* fix: test

* refactor: rename

* allow empty app_metadata in FlightData

* feat/bridge-bulk-insert:
 - **Remove Logging**: Removed unnecessary logging of affected rows in `region_server.rs`.
 - **Error Handling Enhancement**: Improved error handling in `bulk_insert.rs` by adding context to `split_record_batch` and handling single datanode fast path.
 - **Error Enum Cleanup**: Removed unused `Arrow` error variant from `error.rs`.

* fix: standalone test

* feat/bridge-bulk-insert:
 ### Enhance Bulk Insert Handling and Metadata Management

 - **`lib.rs`**: Enabled the `result_flattening` feature for improved error handling.
 - **`request.rs`**: Made `name_to_index` and `has_null` fields public in `WriteRequest` for better accessibility.
 - **`handle_bulk_insert.rs`**:
   - Added `handle_record_batch` function to streamline processing of bulk insert payloads.
   - Improved error handling and task management for bulk insert operations.
   - Updated `region_metadata_to_column_schema` to return both column schemas and a name-to-index map for efficient data access.

* feat/bridge-bulk-insert:
 - **Refactor `handle_bulk_insert.rs`:**
   - Replaced `handle_record_batch` with `handle_payload` for handling payloads.
   - Modified the fast path to use `common_runtime::spawn_global` for asynchronous task execution.

 - **Optimize `multi_dim.rs`:**
   - Added a fast path for single-region scenarios in `MultiDimPartitionRule::partition_record_batch`.

* feat/bridge-bulk-insert:
 - **Update `greptime-proto` Dependency**: Updated the `greptime-proto` dependency to a new revision in both `Cargo.lock` and `Cargo.toml`.
 - **Optimize Memory Allocation**: Increased initial and builder capacities in `time_series.rs` to improve performance.
 - **Enhance Data Handling**: Modified `bulk_insert.rs` to use `Bytes` for efficient data handling.
 - **Improve Bulk Insert Logic**: Refined the bulk insert logic in `region_request.rs` to handle schema and payload data more effectively and optimize record batch filtering.
 - **String Handling Improvement**: Updated string conversion in `helper.rs` for better performance.

* fix: clippy warnings

* feat/bridge-bulk-insert:
 **Add Metrics and Improve Error Handling**

 - **Metrics Enhancements**: Introduced new metrics for bulk insert operations in `metrics.rs`, `bulk_insert.rs`, `greptime_handler.rs`, and `region_request.rs`. Added `HANDLE_BULK_INSERT_ELAPSED`, `BULK_REQUEST_MESSAGE_SIZE`, and `GRPC_BULK_INSERT_ELAPSED` histograms to
 monitor performance.
 - **Error Handling Improvements**: Removed unnecessary error handling in `handle_bulk_insert.rs` by eliminating redundant `let _ =` patterns.
 - **Dependency Updates**: Added `lazy_static` and `prometheus` to `Cargo.lock` and `Cargo.toml` for metrics support.
 - **Code Refactoring**: Simplified function calls in `region_server.rs` and `handle_bulk_insert.rs` for better readability.

* chore: rebase main

* chore: merge main
This commit is contained in:
Lei, HUANG
2025-05-06 17:53:25 +08:00
committed by GitHub
parent 6a5936468e
commit f298a110f9
28 changed files with 774 additions and 217 deletions

10
Cargo.lock generated
View File

@@ -4819,7 +4819,7 @@ dependencies = [
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=e82b0158cd38d4021edb4e4c0ae77f999051e62f#e82b0158cd38d4021edb4e4c0ae77f999051e62f"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ce65659d95a4a11c5d668d27edb4f1c1eed36824#ce65659d95a4a11c5d668d27edb4f1c1eed36824"
dependencies = [
"prost 0.13.5",
"serde",
@@ -7849,8 +7849,11 @@ version = "0.15.0"
dependencies = [
"ahash 0.8.11",
"api",
"arrow 54.2.1",
"arrow-ipc 54.2.1",
"async-stream",
"async-trait",
"bytes",
"catalog",
"chrono",
"client",
@@ -7859,6 +7862,7 @@ dependencies = [
"common-datasource",
"common-error",
"common-function",
"common-grpc",
"common-grpc-expr",
"common-macro",
"common-meta",
@@ -7886,6 +7890,7 @@ dependencies = [
"partition",
"path-slash",
"prometheus",
"prost 0.13.5",
"query",
"regex",
"serde_json",
@@ -11276,6 +11281,7 @@ dependencies = [
"async-trait",
"common-base",
"common-error",
"common-grpc",
"common-macro",
"common-meta",
"common-recordbatch",
@@ -11287,6 +11293,8 @@ dependencies = [
"derive_builder 0.20.1",
"futures",
"humantime",
"lazy_static",
"prometheus",
"prost 0.13.5",
"serde",
"serde_json",

View File

@@ -130,7 +130,7 @@ etcd-client = "0.14"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "e82b0158cd38d4021edb4e4c0ae77f999051e62f" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ce65659d95a4a11c5d668d27edb4f1c1eed36824" }
hex = "0.4"
http = "1"
humantime = "2.1"

View File

@@ -1050,7 +1050,7 @@ pub fn value_to_grpc_value(value: Value) -> GrpcValue {
Value::Int64(v) => Some(ValueData::I64Value(v)),
Value::Float32(v) => Some(ValueData::F32Value(*v)),
Value::Float64(v) => Some(ValueData::F64Value(*v)),
Value::String(v) => Some(ValueData::StringValue(v.as_utf8().to_string())),
Value::String(v) => Some(ValueData::StringValue(v.into_string())),
Value::Binary(v) => Some(ValueData::BinaryValue(v.to_vec())),
Value::Date(v) => Some(ValueData::DateValue(v.val())),
Value::Timestamp(v) => Some(match v.unit() {

View File

@@ -192,6 +192,10 @@ impl FlightDecoder {
}
}
}
pub fn schema(&self) -> Option<&SchemaRef> {
self.schema.as_ref()
}
}
pub fn flight_messages_to_recordbatches(messages: Vec<FlightMessage>) -> Result<RecordBatches> {

View File

@@ -18,4 +18,5 @@ pub mod flight;
pub mod precision;
pub mod select;
pub use arrow_flight::FlightData;
pub use error::Error;

View File

@@ -24,15 +24,18 @@ use api::v1::{
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_base::AffectedRows;
use common_grpc::flight::FlightDecoder;
use common_grpc::FlightData;
use common_query::logical_plan::add_insert_to_logical_plan;
use common_query::Output;
use common_telemetry::tracing::{self};
use query::parser::PromQuery;
use servers::interceptor::{GrpcQueryInterceptor, GrpcQueryInterceptorRef};
use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch};
use servers::query_handler::grpc::GrpcQueryHandler;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::QueryContextRef;
use snafu::{ensure, OptionExt, ResultExt};
use table::metadata::TableId;
use table::table_name::TableName;
use crate::error::{
@@ -230,29 +233,35 @@ impl GrpcQueryHandler for Instance {
async fn put_record_batch(
&self,
table: &TableName,
record_batch: RawRecordBatch,
table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder,
data: FlightData,
) -> Result<AffectedRows> {
let _table = self
.catalog_manager()
.table(
&table.catalog_name,
&table.schema_name,
&table.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table.to_string(),
})?;
let table_id = if let Some(table_id) = table_id {
*table_id
} else {
let table = self
.catalog_manager()
.table(
&table.catalog_name,
&table.schema_name,
&table.table_name,
None,
)
.await
.context(CatalogSnafu)?
.with_context(|| TableNotFoundSnafu {
table_name: table.to_string(),
})?;
let id = table.table_info().table_id();
*table_id = Some(id);
id
};
// TODO(LFC): Implement it.
common_telemetry::debug!(
"calling put_record_batch with table: {:?} and record_batch size: {}",
table,
record_batch.len()
);
Ok(record_batch.len())
self.inserter
.handle_bulk_insert(table_id, decoder, data)
.await
.context(TableOperationSnafu)
}
}

View File

@@ -18,6 +18,7 @@
#![feature(let_chains)]
#![feature(assert_matches)]
#![feature(result_flattening)]
#[cfg(any(test, feature = "test"))]
#[cfg_attr(feature = "test", allow(unused))]

View File

@@ -62,9 +62,9 @@ pub struct WriteRequest {
/// Rows to write.
pub rows: Rows,
/// Map column name to column index in `rows`.
name_to_index: HashMap<String, usize>,
pub name_to_index: HashMap<String, usize>,
/// Whether each column has null.
has_null: Vec<bool>,
pub has_null: Vec<bool>,
/// Write hint.
pub hint: Option<WriteHint>,
/// Region metadata on the time of this request is created.

View File

@@ -835,7 +835,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
sender,
} => {
if let Some(region_metadata) = metadata {
self.handle_bulk_inserts(request, region_metadata, write_requests, sender)
self.handle_bulk_insert(request, region_metadata, write_requests, sender)
.await;
} else {
error!("Cannot find region metadata for {}", request.region_id);

View File

@@ -14,8 +14,11 @@
//! Handles bulk insert requests.
use std::collections::HashMap;
use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::v1::{ColumnSchema, OpType, Row, Rows};
use common_base::AffectedRows;
use common_recordbatch::DfRecordBatch;
use datatypes::prelude::VectorRef;
use datatypes::vectors::Helper;
@@ -23,62 +26,64 @@ use snafu::ResultExt;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::{BulkInsertPayload, RegionBulkInsertsRequest};
use tokio::sync::oneshot::Receiver;
use crate::error;
use crate::request::{OptionOutputTx, SenderWriteRequest, WriteRequest};
use crate::worker::RegionWorkerLoop;
impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_bulk_inserts(
pub(crate) async fn handle_bulk_insert(
&mut self,
request: RegionBulkInsertsRequest,
mut request: RegionBulkInsertsRequest,
region_metadata: RegionMetadataRef,
pending_write_requests: &mut Vec<SenderWriteRequest>,
sender: OptionOutputTx,
) {
let schema = match region_metadata_to_column_schema(&region_metadata) {
Ok(schema) => schema,
Err(e) => {
sender.send(Err(e));
return;
}
};
let (column_schemas, name_to_index) =
match region_metadata_to_column_schema(&region_metadata) {
Ok(schema) => schema,
Err(e) => {
sender.send(Err(e));
return;
}
};
// fast path: only one payload.
if request.payloads.len() == 1 {
match Self::handle_payload(
&region_metadata,
request.payloads.swap_remove(0),
pending_write_requests,
column_schemas,
name_to_index,
) {
Ok(task_future) => common_runtime::spawn_global(async move {
sender.send(task_future.await.context(error::RecvSnafu).flatten());
}),
Err(e) => {
sender.send(Err(e));
return;
}
};
return;
}
let mut pending_tasks = Vec::with_capacity(request.payloads.len());
for req in request.payloads {
match req {
BulkInsertPayload::ArrowIpc(df_record_batch) => {
let rows = match record_batch_to_rows(&region_metadata, &df_record_batch) {
Ok(rows) => rows,
Err(e) => {
sender.send(Err(e));
return;
}
};
let write_request = match WriteRequest::new(
region_metadata.region_id,
OpType::Put,
Rows {
schema: schema.clone(),
rows,
},
Some(region_metadata.clone()),
) {
Ok(write_request) => write_request,
Err(e) => {
sender.send(Err(e));
return;
}
};
let (tx, rx) = tokio::sync::oneshot::channel();
let sender = OptionOutputTx::from(tx);
let req = SenderWriteRequest {
sender,
request: write_request,
};
pending_tasks.push(rx);
pending_write_requests.push(req);
match Self::handle_payload(
&region_metadata,
req,
pending_write_requests,
column_schemas.clone(),
name_to_index.clone(),
) {
Ok(task_future) => {
pending_tasks.push(task_future);
}
Err(e) => {
sender.send(Err(e));
return;
}
}
}
@@ -91,35 +96,135 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
};
let result1 = match results.into_iter().collect::<error::Result<Vec<_>>>() {
Ok(results) => Ok(results.into_iter().sum()),
Err(e) => Err(e),
};
sender.send(result1);
sender.send(
match results.into_iter().collect::<error::Result<Vec<_>>>() {
Ok(results) => Ok(results.into_iter().sum()),
Err(e) => Err(e),
},
);
});
}
fn handle_payload(
region_metadata: &RegionMetadataRef,
payload: BulkInsertPayload,
pending_write_requests: &mut Vec<SenderWriteRequest>,
column_schemas: Vec<ColumnSchema>,
name_to_index: HashMap<String, usize>,
) -> error::Result<Receiver<error::Result<AffectedRows>>> {
let rx = match payload {
BulkInsertPayload::ArrowIpc(rb) => Self::handle_arrow_ipc(
region_metadata,
rb,
pending_write_requests,
column_schemas,
name_to_index,
),
BulkInsertPayload::Rows { data, has_null } => Self::handle_rows(
region_metadata,
data,
column_schemas,
has_null,
pending_write_requests,
name_to_index,
),
}?;
Ok(rx)
}
fn handle_arrow_ipc(
region_metadata: &RegionMetadataRef,
df_record_batch: DfRecordBatch,
pending_write_requests: &mut Vec<SenderWriteRequest>,
column_schemas: Vec<ColumnSchema>,
name_to_index: HashMap<String, usize>,
) -> error::Result<Receiver<error::Result<AffectedRows>>> {
let has_null: Vec<_> = df_record_batch
.columns()
.iter()
.map(|c| c.null_count() > 0)
.collect();
let rows = record_batch_to_rows(region_metadata, &df_record_batch)?;
let write_request = WriteRequest {
region_id: region_metadata.region_id,
op_type: OpType::Put,
rows: Rows {
schema: column_schemas,
rows,
},
name_to_index,
has_null,
hint: None,
region_metadata: Some(region_metadata.clone()),
};
let (tx, rx) = tokio::sync::oneshot::channel();
let sender = OptionOutputTx::from(tx);
let req = SenderWriteRequest {
sender,
request: write_request,
};
pending_write_requests.push(req);
Ok(rx)
}
fn handle_rows(
region_metadata: &RegionMetadataRef,
rows: Vec<Row>,
column_schemas: Vec<ColumnSchema>,
has_null: Vec<bool>,
pending_write_requests: &mut Vec<SenderWriteRequest>,
name_to_index: HashMap<String, usize>,
) -> error::Result<Receiver<error::Result<AffectedRows>>> {
let write_request = WriteRequest {
region_id: region_metadata.region_id,
op_type: OpType::Put,
rows: Rows {
schema: column_schemas,
rows,
},
name_to_index,
has_null,
hint: None,
region_metadata: Some(region_metadata.clone()),
};
let (tx, rx) = tokio::sync::oneshot::channel();
let sender = OptionOutputTx::from(tx);
let req = SenderWriteRequest {
sender,
request: write_request,
};
pending_write_requests.push(req);
Ok(rx)
}
}
fn region_metadata_to_column_schema(
region_meta: &RegionMetadataRef,
) -> error::Result<Vec<ColumnSchema>> {
region_meta
.column_metadatas
.iter()
.map(|c| {
let wrapper = ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
.with_context(|_| error::ConvertDataTypeSnafu {
data_type: c.column_schema.data_type.clone(),
})?;
) -> error::Result<(Vec<ColumnSchema>, HashMap<String, usize>)> {
let mut column_schemas = Vec::with_capacity(region_meta.column_metadatas.len());
let mut name_to_index = HashMap::with_capacity(region_meta.column_metadatas.len());
Ok(ColumnSchema {
column_name: c.column_schema.name.clone(),
datatype: wrapper.datatype() as i32,
semantic_type: c.semantic_type as i32,
..Default::default()
})
})
.collect::<error::Result<_>>()
for (idx, c) in region_meta.column_metadatas.iter().enumerate() {
let wrapper = ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())
.with_context(|_| error::ConvertDataTypeSnafu {
data_type: c.column_schema.data_type.clone(),
})?;
column_schemas.push(ColumnSchema {
column_name: c.column_schema.name.clone(),
datatype: wrapper.datatype() as i32,
semantic_type: c.semantic_type as i32,
..Default::default()
});
name_to_index.insert(c.column_schema.name.clone(), idx);
}
Ok((column_schemas, name_to_index))
}
/// Convert [DfRecordBatch] to gRPC rows.
@@ -187,7 +292,7 @@ mod tests {
#[test]
fn test_region_metadata_to_column_schema() {
let region_metadata = Arc::new(TestRegionMetadataBuilder::default().build());
let result = region_metadata_to_column_schema(&region_metadata).unwrap();
let (result, _) = region_metadata_to_column_schema(&region_metadata).unwrap();
assert_eq!(result.len(), 3);
assert_eq!(result[0].column_name, "ts");

View File

@@ -13,8 +13,11 @@ workspace = true
[dependencies]
ahash.workspace = true
api.workspace = true
arrow.workspace = true
arrow-ipc.workspace = true
async-stream.workspace = true
async-trait.workspace = true
bytes.workspace = true
catalog.workspace = true
chrono.workspace = true
client.workspace = true
@@ -23,6 +26,7 @@ common-catalog.workspace = true
common-datasource.workspace = true
common-error.workspace = true
common-function.workspace = true
common-grpc.workspace = true
common-grpc-expr.workspace = true
common-macro.workspace = true
common-meta.workspace = true
@@ -48,6 +52,7 @@ object-store.workspace = true
object_store_opendal.workspace = true
partition.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
regex.workspace = true
serde_json.workspace = true

View File

@@ -0,0 +1,170 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use ahash::{HashMap, HashMapExt};
use api::v1::region::{
bulk_insert_request, region_request, ArrowIpc, BulkInsertRequest, RegionRequest,
RegionRequestHeader, RegionSelection,
};
use bytes::Bytes;
use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
use common_grpc::FlightData;
use common_telemetry::tracing_context::TracingContext;
use prost::Message;
use snafu::ResultExt;
use store_api::storage::RegionId;
use table::metadata::TableId;
use crate::insert::Inserter;
use crate::{error, metrics};
impl Inserter {
/// Handle bulk insert request.
pub async fn handle_bulk_insert(
&self,
table_id: TableId,
decoder: &mut FlightDecoder,
data: FlightData,
) -> error::Result<AffectedRows> {
let decode_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["decode_request"])
.start_timer();
let raw_flight_data = Bytes::from(data.encode_to_vec());
let body_size = data.data_body.len();
// Build region server requests
let message = decoder
.try_decode(data)
.context(error::DecodeFlightDataSnafu)?;
let FlightMessage::Recordbatch(rb) = message else {
return Ok(0);
};
metrics::BULK_REQUEST_MESSAGE_SIZE.observe(body_size as f64);
decode_timer.observe_duration();
// todo(hl): find a way to embed raw FlightData messages in greptimedb proto files so we don't have to encode here.
// safety: when reach here schema must be present.
let schema_message = FlightEncoder::default()
.encode(FlightMessage::Schema(decoder.schema().unwrap().clone()));
let schema_data = Bytes::from(schema_message.encode_to_vec());
let record_batch = rb.df_record_batch();
let partition_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["partition"])
.start_timer();
let partition_rule = self
.partition_manager
.find_table_partition_rule(table_id)
.await
.context(error::InvalidPartitionSnafu)?;
// find partitions for each row in the record batch
let region_masks = partition_rule
.split_record_batch(record_batch)
.context(error::SplitInsertSnafu)?;
partition_timer.observe_duration();
let group_request_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["group_request"])
.start_timer();
let mut mask_per_datanode = HashMap::with_capacity(region_masks.len());
for (region_number, mask) in region_masks {
let region_id = RegionId::new(table_id, region_number);
let datanode = self
.partition_manager
.find_region_leader(region_id)
.await
.context(error::FindRegionLeaderSnafu)?;
let selection = RegionSelection {
region_id: region_id.as_u64(),
selection: mask.values().inner().as_slice().to_vec(),
};
mask_per_datanode
.entry(datanode)
.or_insert_with(Vec::new)
.push(selection);
}
group_request_timer.observe_duration();
let datanode_handle_timer = metrics::HANDLE_BULK_INSERT_ELAPSED
.with_label_values(&["datanode_handle"])
.start_timer();
// fast path: only one datanode
if mask_per_datanode.len() == 1 {
let (peer, requests) = mask_per_datanode.into_iter().next().unwrap();
let datanode = self.node_manager.datanode(&peer).await;
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
schema: schema_data,
payload: raw_flight_data,
region_selection: requests,
})),
})),
};
let response = datanode
.handle(request)
.await
.context(error::RequestRegionSnafu)?;
return Ok(response.affected_rows);
}
let mut handles = Vec::with_capacity(mask_per_datanode.len());
for (peer, masks) in mask_per_datanode {
let node_manager = self.node_manager.clone();
let schema = schema_data.clone();
let payload = raw_flight_data.clone();
let handle: common_runtime::JoinHandle<error::Result<api::region::RegionResponse>> =
common_runtime::spawn_global(async move {
let request = RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::BulkInsert(BulkInsertRequest {
body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc {
schema,
payload,
region_selection: masks,
})),
})),
};
let datanode = node_manager.datanode(&peer).await;
datanode
.handle(request)
.await
.context(error::RequestRegionSnafu)
});
handles.push(handle);
}
let region_responses = futures::future::try_join_all(handles)
.await
.context(error::JoinTaskSnafu)?;
datanode_handle_timer.observe_duration();
let mut rows_inserted: usize = 0;
for res in region_responses {
rows_inserted += res?.affected_rows;
}
Ok(rows_inserted)
}
}

View File

@@ -807,6 +807,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode arrow flight data"))]
DecodeFlightData {
source: common_grpc::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -850,116 +857,85 @@ impl ErrorExt for Error {
| Error::CursorNotFound { .. }
| Error::CursorExists { .. }
| Error::CreatePartitionRules { .. } => StatusCode::InvalidArguments,
Error::TableAlreadyExists { .. } | Error::ViewAlreadyExists { .. } => {
StatusCode::TableAlreadyExists
}
Error::NotSupported { .. }
| Error::ShowCreateTableBaseOnly { .. }
| Error::SchemaReadOnly { .. } => StatusCode::Unsupported,
Error::TableMetadataManager { source, .. } => source.status_code(),
Error::ParseSql { source, .. } => source.status_code(),
Error::InvalidateTableCache { source, .. } => source.status_code(),
Error::ParseFileFormat { source, .. } | Error::InferSchema { source, .. } => {
source.status_code()
}
Error::Table { source, .. } | Error::Insert { source, .. } => source.status_code(),
Error::ConvertColumnDefaultConstraint { source, .. }
| Error::CreateTableInfo { source, .. }
| Error::IntoVectors { source, .. } => source.status_code(),
Error::RequestInserts { source, .. } | Error::FindViewInfo { source, .. } => {
source.status_code()
}
Error::RequestRegion { source, .. } => source.status_code(),
Error::RequestDeletes { source, .. } => source.status_code(),
Error::SubstraitCodec { source, .. } => source.status_code(),
Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => {
source.status_code()
}
Error::MissingTimeIndexColumn { source, .. } => source.status_code(),
Error::BuildDfLogicalPlan { .. }
| Error::BuildTableMeta { .. }
| Error::MissingInsertBody { .. } => StatusCode::Internal,
Error::EncodeJson { .. } => StatusCode::Unexpected,
Error::ViewNotFound { .. }
| Error::ViewInfoNotFound { .. }
| Error::TableNotFound { .. } => StatusCode::TableNotFound,
Error::FlowNotFound { .. } => StatusCode::FlowNotFound,
Error::JoinTask { .. } => StatusCode::Internal,
Error::BuildParquetRecordBatchStream { .. }
| Error::BuildFileStream { .. }
| Error::WriteStreamToFile { .. }
| Error::ReadDfRecordBatch { .. }
| Error::Unexpected { .. } => StatusCode::Unexpected,
Error::Catalog { source, .. } => source.status_code(),
Error::BuildCreateExprOnInsertion { source, .. }
| Error::FindNewColumnsOnInsertion { source, .. } => source.status_code(),
Error::ExecuteStatement { source, .. }
| Error::ExtractTableNames { source, .. }
| Error::PlanStatement { source, .. }
| Error::ParseQuery { source, .. }
| Error::ExecLogicalPlan { source, .. }
| Error::DescribeStatement { source, .. } => source.status_code(),
Error::AlterExprToRequest { source, .. } => source.status_code(),
Error::External { source, .. } => source.status_code(),
Error::DeserializePartition { source, .. }
| Error::FindTablePartitionRule { source, .. }
| Error::SplitInsert { source, .. }
| Error::SplitDelete { source, .. }
| Error::FindRegionLeader { source, .. } => source.status_code(),
Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments,
Error::ReadObject { .. }
| Error::ReadParquetMetadata { .. }
| Error::ReadOrc { .. } => StatusCode::StorageUnavailable,
Error::ListObjects { source, .. }
| Error::ParseUrl { source, .. }
| Error::BuildBackend { source, .. } => source.status_code(),
Error::ExecuteDdl { source, .. } => source.status_code(),
Error::InvalidCopyParameter { .. } | Error::InvalidCopyDatabasePath { .. } => {
StatusCode::InvalidArguments
}
Error::ColumnDefaultValue { source, .. } => source.status_code(),
Error::EmptyDdlExpr { .. }
| Error::InvalidPartitionRule { .. }
| Error::ParseSqlValue { .. }
| Error::InvalidTimestampRange { .. } => StatusCode::InvalidArguments,
Error::CreateLogicalTables { .. } => StatusCode::Unexpected,
Error::ExecuteAdminFunction { source, .. } => source.status_code(),
Error::BuildRecordBatch { source, .. } => source.status_code(),
Error::UpgradeCatalogManagerRef { .. } => StatusCode::Internal,
Error::StatementTimeout { .. } => StatusCode::Cancelled,
Error::ColumnOptions { source, .. } => source.status_code(),
Error::DecodeFlightData { source, .. } => source.status_code(),
}
}

View File

@@ -76,8 +76,8 @@ use crate::statement::StatementExecutor;
pub struct Inserter {
catalog_manager: CatalogManagerRef,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
pub(crate) partition_manager: PartitionRuleManagerRef,
pub(crate) node_manager: NodeManagerRef,
table_flownode_set_cache: TableFlownodeSetCacheRef,
}

View File

@@ -15,6 +15,7 @@
#![feature(assert_matches)]
#![feature(if_let_guard)]
mod bulk_insert;
pub mod delete;
pub mod error;
pub mod expr_helper;

View File

@@ -62,4 +62,35 @@ lazy_static! {
&["table_type"]
)
.unwrap();
pub static ref HANDLE_BULK_INSERT_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_table_operator_handle_bulk_insert",
"table operator duration to handle bulk inserts",
&["stage"],
vec![
0.001, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.10, 0.15, 0.2, 0.3, 0.4, 0.5, 1.0, 1.5,
2.0, 2.5, 3.0, 4.0, 5.0
]
)
.unwrap();
pub static ref BULK_REQUEST_MESSAGE_SIZE: Histogram = register_histogram!(
"greptime_table_operator_bulk_insert_message_size",
"table operator bulk inserts message encoded size",
vec![
32768.0,
65536.0,
131072.0,
262144.0,
524288.0,
1048576.0,
2097152.0,
4194304.0,
8388608.0,
16777216.0,
33554432.0,
67108864.0,
134217728.0,
268435456.0
]
)
.unwrap();
}

View File

@@ -211,6 +211,13 @@ impl MultiDimPartitionRule {
record_batch: &RecordBatch,
) -> Result<HashMap<RegionNumber, BooleanArray>> {
let num_rows = record_batch.num_rows();
if self.regions.len() == 1 {
return Ok(
[(self.regions[0], BooleanArray::from(vec![true; num_rows]))]
.into_iter()
.collect(),
);
}
let physical_exprs = {
let cache_read_guard = self.physical_expr_cache.read().unwrap();
if let Some((cached_exprs, schema)) = cache_read_guard.as_ref()

View File

@@ -49,7 +49,6 @@ use crate::grpc::greptime_handler::{get_request_type, GreptimeRequestHandler};
use crate::grpc::TonicResult;
use crate::http::header::constants::GREPTIME_DB_HEADER_NAME;
use crate::http::AUTHORIZATION_HEADER;
use crate::query_handler::grpc::RawRecordBatch;
pub type TonicStream<T> = Pin<Box<dyn Stream<Item = TonicResult<T>> + Send + 'static>>;
@@ -257,17 +256,22 @@ impl FlightCraft for GreptimeRequestHandler {
pub(crate) struct PutRecordBatchRequest {
pub(crate) table_name: TableName,
pub(crate) request_id: i64,
pub(crate) record_batch: RawRecordBatch,
pub(crate) data: FlightData,
}
impl PutRecordBatchRequest {
fn try_new(table_name: TableName, flight_data: FlightData) -> Result<Self> {
let metadata: DoPutMetadata =
serde_json::from_slice(&flight_data.app_metadata).context(ParseJsonSnafu)?;
let request_id = if !flight_data.app_metadata.is_empty() {
let metadata: DoPutMetadata =
serde_json::from_slice(&flight_data.app_metadata).context(ParseJsonSnafu)?;
metadata.request_id()
} else {
0
};
Ok(Self {
table_name,
request_id: metadata.request_id(),
record_batch: flight_data.data_body,
request_id,
data: flight_data,
})
}
}

View File

@@ -27,6 +27,7 @@ use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc::flight::do_put::DoPutResponse;
use common_grpc::flight::FlightDecoder;
use common_query::Output;
use common_runtime::runtime::RuntimeTrait;
use common_runtime::Runtime;
@@ -36,6 +37,7 @@ use common_time::timezone::parse_timezone;
use futures_util::StreamExt;
use session::context::{QueryContext, QueryContextBuilder, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use table::metadata::TableId;
use tokio::sync::mpsc;
use crate::error::Error::UnsupportedAuthScheme;
@@ -45,6 +47,7 @@ use crate::error::{
};
use crate::grpc::flight::{PutRecordBatchRequest, PutRecordBatchRequestStream};
use crate::grpc::TonicResult;
use crate::metrics;
use crate::metrics::{METRIC_AUTH_FAILURE, METRIC_SERVER_GRPC_DB_REQUEST_TIMER};
use crate::query_handler::grpc::ServerGrpcQueryHandlerRef;
@@ -140,6 +143,10 @@ impl GreptimeRequestHandler {
.clone()
.unwrap_or_else(common_runtime::global_runtime);
runtime.spawn(async move {
// Cached table id
let mut table_id: Option<TableId> = None;
let mut decoder = FlightDecoder::default();
while let Some(request) = stream.next().await {
let request = match request {
Ok(request) => request,
@@ -148,13 +155,17 @@ impl GreptimeRequestHandler {
break;
}
};
let PutRecordBatchRequest {
table_name,
request_id,
record_batch,
data,
} = request;
let result = handler.put_record_batch(&table_name, record_batch).await;
let timer = metrics::GRPC_BULK_INSERT_ELAPSED.start_timer();
let result = handler
.put_record_batch(&table_name, &mut table_id, &mut decoder, data)
.await;
timer.observe_duration();
let result = result
.map(|x| DoPutResponse::new(request_id, x))
.map_err(Into::into);

View File

@@ -23,8 +23,8 @@ use axum::middleware::Next;
use axum::response::IntoResponse;
use lazy_static::lazy_static;
use prometheus::{
register_histogram_vec, register_int_counter, register_int_counter_vec, register_int_gauge,
Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
register_histogram, register_histogram_vec, register_int_counter, register_int_counter_vec,
register_int_gauge, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
};
use tonic::body::BoxBody;
use tower::{Layer, Service};
@@ -276,8 +276,12 @@ lazy_static! {
"greptime_servers_jaeger_query_elapsed",
"servers jaeger query elapsed",
&[METRIC_DB_LABEL, METRIC_PATH_LABEL]
)
.unwrap();
).unwrap();
pub static ref GRPC_BULK_INSERT_ELAPSED: Histogram = register_histogram!(
"greptime_servers_bulk_insert_elapsed",
"servers handle bulk insert elapsed",
).unwrap();
}
// Based on https://github.com/hyperium/tonic/blob/master/examples/src/tower/server.rs

View File

@@ -15,12 +15,15 @@
use std::sync::Arc;
use api::v1::greptime_request::Request;
use arrow_flight::FlightData;
use async_trait::async_trait;
use common_base::AffectedRows;
use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::flight::FlightDecoder;
use common_query::Output;
use session::context::QueryContextRef;
use snafu::ResultExt;
use table::metadata::TableId;
use table::table_name::TableName;
use crate::error::{self, Result};
@@ -43,7 +46,9 @@ pub trait GrpcQueryHandler {
async fn put_record_batch(
&self,
table: &TableName,
record_batch: RawRecordBatch,
table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder,
flight_data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error>;
}
@@ -73,10 +78,12 @@ where
async fn put_record_batch(
&self,
table: &TableName,
record_batch: RawRecordBatch,
table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder,
data: FlightData,
) -> Result<AffectedRows> {
self.0
.put_record_batch(table, record_batch)
.put_record_batch(table, table_id, decoder, data)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcRequestSnafu)

View File

@@ -16,10 +16,12 @@ use std::sync::Arc;
use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use arrow_flight::FlightData;
use async_trait::async_trait;
use catalog::memory::MemoryCatalogManager;
use common_base::AffectedRows;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_grpc::flight::FlightDecoder;
use common_query::Output;
use datafusion_expr::LogicalPlan;
use query::options::QueryOptions;
@@ -27,11 +29,12 @@ use query::parser::{PromQuery, QueryLanguageParser, QueryStatement};
use query::query_engine::DescribeResult;
use query::{QueryEngineFactory, QueryEngineRef};
use servers::error::{Error, NotSupportedSnafu, Result};
use servers::query_handler::grpc::{GrpcQueryHandler, RawRecordBatch, ServerGrpcQueryHandlerRef};
use servers::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerRef};
use servers::query_handler::sql::{ServerSqlQueryHandlerRef, SqlQueryHandler};
use session::context::QueryContextRef;
use snafu::ensure;
use sql::statements::statement::Statement;
use table::metadata::TableId;
use table::table_name::TableName;
use table::TableRef;
@@ -161,10 +164,14 @@ impl GrpcQueryHandler for DummyInstance {
async fn put_record_batch(
&self,
table: &TableName,
record_batch: RawRecordBatch,
table_id: &mut Option<TableId>,
decoder: &mut FlightDecoder,
data: FlightData,
) -> std::result::Result<AffectedRows, Self::Error> {
let _ = table;
let _ = record_batch;
let _ = data;
let _ = table_id;
let _ = decoder;
unimplemented!()
}
}

View File

@@ -13,6 +13,7 @@ aquamarine.workspace = true
async-trait.workspace = true
common-base.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-recordbatch.workspace = true
common-time.workspace = true
@@ -23,6 +24,8 @@ datatypes.workspace = true
derive_builder.workspace = true
futures.workspace = true
humantime.workspace = true
lazy_static.workspace = true
prometheus.workspace = true
prost.workspace = true
serde.workspace = true
serde_json.workspace = true

View File

@@ -23,6 +23,7 @@ pub mod logstore;
pub mod manifest;
pub mod metadata;
pub mod metric_engine_consts;
mod metrics;
pub mod mito_engine_options;
pub mod path_utils;
pub mod region_engine;

View File

@@ -973,6 +973,21 @@ pub enum MetadataError {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to encode/decode flight message"))]
FlightCodec {
source: common_grpc::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to decode prost message"))]
Prost {
#[snafu(source)]
error: prost::DecodeError,
#[snafu(implicit)]
location: Location,
},
}
impl ErrorExt for MetadataError {

View File

@@ -0,0 +1,29 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use lazy_static::lazy_static;
use prometheus::{register_histogram_vec, HistogramVec};
lazy_static! {
pub static ref CONVERT_REGION_BULK_REQUEST: HistogramVec = register_histogram_vec!(
"greptime_datanode_convert_region_request",
"datanode duration to convert region request",
&["stage"],
vec![
0.001, 0.005, 0.01, 0.02, 0.03, 0.04, 0.05, 0.10, 0.15, 0.2, 0.3, 0.4, 0.5, 1.0, 1.5,
2.0, 2.5, 3.0, 4.0, 5.0
]
)
.unwrap();
}

View File

@@ -12,43 +12,50 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt::{self, Display};
use std::io::Cursor;
use std::time::{Duration, Instant};
use api::helper::ColumnDataTypeWrapper;
use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
use api::v1::add_column_location::LocationType;
use api::v1::column_def::{
as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
};
use api::v1::region::bulk_insert_request::Body;
use api::v1::region::{
alter_request, compact_request, region_request, AlterRequest, AlterRequests,
BulkInsertRequests, CloseRequest, CompactRequest, CreateRequest, CreateRequests,
DeleteRequests, DropRequest, DropRequests, FlushRequest, InsertRequests, OpenRequest,
TruncateRequest,
alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
};
use api::v1::{
self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Row, Rows,
SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
};
pub use common_base::AffectedRows;
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_grpc::FlightData;
use common_recordbatch::DfRecordBatch;
use common_time::TimeToLive;
use datatypes::arrow::ipc::reader::FileReader;
use datatypes::prelude::ConcreteDataType;
use datatypes::arrow;
use datatypes::arrow::array::{Array, BooleanArray};
use datatypes::arrow::buffer::{BooleanBuffer, Buffer};
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
use datatypes::vectors::Helper;
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use strum::{AsRefStr, IntoStaticStr};
use crate::logstore::entry;
use crate::metadata::{
ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, InvalidRawRegionRequestSnafu,
InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
ColumnMetadata, DecodeArrowIpcSnafu, DecodeProtoSnafu, FlightCodecSnafu,
InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result,
UnexpectedSnafu,
};
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::metrics;
use crate::mito_engine_options::{
TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
@@ -152,7 +159,7 @@ impl RegionRequest {
region_request::Body::Creates(creates) => make_region_creates(creates),
region_request::Body::Drops(drops) => make_region_drops(drops),
region_request::Body::Alters(alters) => make_region_alters(alters),
region_request::Body::BulkInserts(bulk) => make_region_bulk_inserts(bulk),
region_request::Body::BulkInsert(bulk) => make_region_rows_bulk_inserts(bulk),
region_request::Body::Sync(_) => UnexpectedSnafu {
reason: "Sync request should be handled separately by RegionServer",
}
@@ -326,44 +333,51 @@ fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, Regi
)])
}
/// Convert [BulkInsertRequests] to [RegionRequest] and group by [RegionId].
fn make_region_bulk_inserts(
requests: BulkInsertRequests,
) -> Result<Vec<(RegionId, RegionRequest)>> {
let mut region_requests: HashMap<u64, Vec<BulkInsertPayload>> =
HashMap::with_capacity(requests.requests.len());
/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
#[allow(unused)]
fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
let Some(Body::ArrowIpc(request)) = request.body else {
return Ok(vec![]);
};
for req in requests.requests {
let region_id = req.region_id;
match req.payload_type() {
api::v1::region::BulkInsertType::ArrowIpc => {
// todo(hl): use StreamReader instead
let reader = FileReader::try_new(Cursor::new(req.payload), None)
.context(DecodeArrowIpcSnafu)?;
let record_batches = reader
.map(|b| b.map(BulkInsertPayload::ArrowIpc))
.try_collect::<Vec<_>>()
.context(DecodeArrowIpcSnafu)?;
match region_requests.entry(region_id) {
Entry::Occupied(mut e) => {
e.get_mut().extend(record_batches);
}
Entry::Vacant(e) => {
e.insert(record_batches);
}
}
}
}
let mut region_requests: HashMap<u64, BulkInsertPayload> =
HashMap::with_capacity(request.region_selection.len());
let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?;
let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?;
let mut decoder = FlightDecoder::default();
let _schema_message = decoder.try_decode(schema_data).context(FlightCodecSnafu)?;
let FlightMessage::Recordbatch(rb) =
decoder.try_decode(payload_data).context(FlightCodecSnafu)?
else {
unreachable!("Always expect record batch message after schema");
};
for region_selection in request.region_selection {
let region_id = region_selection.region_id;
let region_mask = BooleanArray::new(
BooleanBuffer::new(Buffer::from(region_selection.selection), 0, rb.num_rows()),
None,
);
let region_batch = if region_mask.true_count() == rb.num_rows() {
rb.df_record_batch().clone()
} else {
arrow::compute::filter_record_batch(rb.df_record_batch(), &region_mask)
.context(DecodeArrowIpcSnafu)?
};
region_requests.insert(region_id, BulkInsertPayload::ArrowIpc(region_batch));
}
let result = region_requests
.into_iter()
.map(|(region_id, payloads)| {
.map(|(region_id, payload)| {
(
region_id.into(),
RegionRequest::BulkInserts(RegionBulkInsertsRequest {
region_id: region_id.into(),
payloads,
payloads: vec![payload],
}),
)
})
@@ -371,6 +385,116 @@ fn make_region_bulk_inserts(
Ok(result)
}
/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
fn make_region_rows_bulk_inserts(
request: BulkInsertRequest,
) -> Result<Vec<(RegionId, RegionRequest)>> {
let Some(Body::ArrowIpc(request)) = request.body else {
return Ok(vec![]);
};
let mut region_requests: HashMap<u64, BulkInsertPayload> =
HashMap::with_capacity(request.region_selection.len());
let decode_timer = metrics::CONVERT_REGION_BULK_REQUEST
.with_label_values(&["decode"])
.start_timer();
let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?;
let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?;
let mut decoder = FlightDecoder::default();
let _schema_message = decoder.try_decode(schema_data).context(FlightCodecSnafu)?;
let FlightMessage::Recordbatch(rb) =
decoder.try_decode(payload_data).context(FlightCodecSnafu)?
else {
unreachable!("Always expect record batch message after schema");
};
decode_timer.observe_duration();
let filter_timer = metrics::CONVERT_REGION_BULK_REQUEST.with_label_values(&["filter_batch"]);
let convert_to_rows_timer =
metrics::CONVERT_REGION_BULK_REQUEST.with_label_values(&["convert_to_rows"]);
let mut filter_time = Duration::default();
let mut convert_to_rows_time = Duration::default();
for region_selection in request.region_selection {
let region_id = region_selection.region_id;
let start = Instant::now();
let region_mask = BooleanArray::new(
BooleanBuffer::new(Buffer::from(region_selection.selection), 0, rb.num_rows()),
None,
);
let region_batch = if region_mask.true_count() == rb.num_rows() {
rb.df_record_batch().clone()
} else {
arrow::compute::filter_record_batch(rb.df_record_batch(), &region_mask)
.context(DecodeArrowIpcSnafu)?
};
filter_time += start.elapsed();
let start = Instant::now();
let (rows, has_null) = record_batch_to_rows(&region_batch);
convert_to_rows_time += start.elapsed();
region_requests.insert(
region_id,
BulkInsertPayload::Rows {
data: rows,
has_null,
},
);
}
filter_timer.observe(filter_time.as_secs_f64());
convert_to_rows_timer.observe(convert_to_rows_time.as_secs_f64());
let result = region_requests
.into_iter()
.map(|(region_id, payload)| {
(
region_id.into(),
RegionRequest::BulkInserts(RegionBulkInsertsRequest {
region_id: region_id.into(),
payloads: vec![payload],
}),
)
})
.collect::<Vec<_>>();
Ok(result)
}
/// Convert [DfRecordBatch] to gRPC rows.
fn record_batch_to_rows(rb: &DfRecordBatch) -> (Vec<Row>, Vec<bool>) {
let num_rows = rb.num_rows();
let mut rows = Vec::with_capacity(num_rows);
if num_rows == 0 {
return (rows, vec![false; rb.num_columns()]);
}
let mut vectors = Vec::with_capacity(rb.num_columns());
let mut has_null = Vec::with_capacity(rb.num_columns());
for c in rb.columns() {
vectors.push(Helper::try_into_vector(c).unwrap());
has_null.push(c.null_count() > 0);
}
for row_idx in 0..num_rows {
let row = Row {
values: row_at(&vectors, row_idx),
};
rows.push(row);
}
(rows, has_null)
}
fn row_at(vectors: &[VectorRef], row_idx: usize) -> Vec<api::v1::Value> {
let mut row = Vec::with_capacity(vectors.len());
for a in vectors {
row.push(value_to_grpc_value(a.get(row_idx)))
}
row
}
/// Request to put data into a region.
#[derive(Debug)]
pub struct RegionPutRequest {
@@ -1184,6 +1308,7 @@ pub struct RegionBulkInsertsRequest {
#[derive(Debug, Clone)]
pub enum BulkInsertPayload {
ArrowIpc(DfRecordBatch),
Rows { data: Vec<Row>, has_null: Vec<bool> },
}
impl fmt::Display for RegionRequest {

View File

@@ -61,8 +61,19 @@ mod test {
let sql = "select ts, a, b from foo order by ts";
let expected = "\
++
++";
+-------------------------+----+----+
| ts | a | b |
+-------------------------+----+----+
| 1970-01-01T00:00:00.001 | -1 | s1 |
| 1970-01-01T00:00:00.002 | -2 | s2 |
| 1970-01-01T00:00:00.003 | -3 | s3 |
| 1970-01-01T00:00:00.004 | -4 | s4 |
| 1970-01-01T00:00:00.005 | -5 | s5 |
| 1970-01-01T00:00:00.006 | -6 | s6 |
| 1970-01-01T00:00:00.007 | -7 | s7 |
| 1970-01-01T00:00:00.008 | -8 | s8 |
| 1970-01-01T00:00:00.009 | -9 | s9 |
+-------------------------+----+----+";
query_and_expect(db.frontend().as_ref(), sql, expected).await;
}
@@ -110,35 +121,52 @@ mod test {
let sql = "select ts, a, b from foo order by ts";
let expected = "\
++
++";
+-------------------------+----+----+
| ts | a | b |
+-------------------------+----+----+
| 1970-01-01T00:00:00.001 | -1 | s1 |
| 1970-01-01T00:00:00.002 | -2 | s2 |
| 1970-01-01T00:00:00.003 | -3 | s3 |
| 1970-01-01T00:00:00.004 | -4 | s4 |
| 1970-01-01T00:00:00.005 | -5 | s5 |
| 1970-01-01T00:00:00.006 | -6 | s6 |
| 1970-01-01T00:00:00.007 | -7 | s7 |
| 1970-01-01T00:00:00.008 | -8 | s8 |
| 1970-01-01T00:00:00.009 | -9 | s9 |
+-------------------------+----+----+";
query_and_expect(db.fe_instance().as_ref(), sql, expected).await;
}
async fn test_put_record_batches(client: &Database, record_batches: Vec<RecordBatch>) {
let requests_count = record_batches.len();
let schema = record_batches[0].schema.clone();
let stream = tokio_stream::iter(record_batches)
.enumerate()
.map(|(i, x)| {
let mut encoder = FlightEncoder::default();
let message = FlightMessage::Recordbatch(x);
let mut data = encoder.encode(message);
let metadata = DoPutMetadata::new(i as i64);
data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
// first message in "DoPut" stream should carry table name in flight descriptor
if i == 0 {
data.flight_descriptor = Some(FlightDescriptor {
r#type: arrow_flight::flight_descriptor::DescriptorType::Path as i32,
path: vec!["foo".to_string()],
..Default::default()
});
}
data
})
.boxed();
let stream = futures::stream::once(async move {
let mut schema_data = FlightEncoder::default().encode(FlightMessage::Schema(schema));
let metadata = DoPutMetadata::new(0);
schema_data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
// first message in "DoPut" stream should carry table name in flight descriptor
schema_data.flight_descriptor = Some(FlightDescriptor {
r#type: arrow_flight::flight_descriptor::DescriptorType::Path as i32,
path: vec!["foo".to_string()],
..Default::default()
});
schema_data
})
.chain(
tokio_stream::iter(record_batches)
.enumerate()
.map(|(i, x)| {
let mut encoder = FlightEncoder::default();
let message = FlightMessage::Recordbatch(x);
let mut data = encoder.encode(message);
let metadata = DoPutMetadata::new((i + 1) as i64);
data.app_metadata = serde_json::to_vec(&metadata).unwrap().into();
data
})
.boxed(),
)
.boxed();
let response_stream = client.do_put(stream).await.unwrap();
@@ -148,9 +176,14 @@ mod test {
assert!(response.is_ok(), "{}", response.err().unwrap());
let response = response.unwrap();
assert_eq!(response.request_id(), i as i64);
assert_eq!(response.affected_rows(), 448);
if i == 0 {
// the first is schema
assert_eq!(response.affected_rows(), 0);
} else {
assert_eq!(response.affected_rows(), 3);
}
}
assert_eq!(requests_count, responses_count);
assert_eq!(requests_count + 1, responses_count);
}
fn create_record_batches(start: i64) -> Vec<RecordBatch> {