From dc98e0215bd19312f136dfecd5f3d64fc26023b7 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 17 Mar 2026 19:28:06 +0800 Subject: [PATCH] feat(metric-engine): support bulk inserts with put fallback (#7792) * feat(metric-engine): support bulk inserts Implement `RegionRequest::BulkInserts` to support efficient columnar data ingestion in the metric engine. Key changes: - Implement `bulk_insert_region` to handle logical-to-physical region mapping and dispatch writes. - Add `batch_modifier` for `RecordBatch` transformations, specifically for `__tsid` generation and sparse primary key encoding. - Integrate `BulkInserts` into the `MetricEngine` request handling logic. - Provide a row-based fallback mechanism if the underlying storage doesn't support bulk writes. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Update `bulk_insert.rs` to Support Partition Expression Version - **Enhancements**: - Added support for `partition_expr_version` in `RegionBulkInsertsRequest` and `RegionPutRequest`. - Modified the handling of `partition_expr_version` to be dynamically set from the `request` object. Files affected: - `src/metric-engine/src/engine/bulk_insert.rs` Signed-off-by: Lei, HUANG * fix: cargo lock revert Signed-off-by: Lei, HUANG * add doc for conversions Signed-off-by: Lei, HUANG * chore: simplify test Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Refactor `bulk_insert.rs` in `metric-engine` - **Refactor Functionality**: - Replaced `resolve_tag_columns` with `resolve_tag_columns_from_metadata` to streamline tag column resolution. - Moved logic for resolving tag columns directly into `resolve_tag_columns_from_metadata`, removing the need for an external function call. - **Enhancements**: - Improved error handling and context provision for missing physical regions and columns. - Optimized tag column sorting and index management within the batch processing logic. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Refactor `record_batch_to_rows` Function in `bulk_insert.rs` - Simplified the `record_batch_to_rows` function by removing the `logical_metadata` parameter and directly validating column types within the function. - Enhanced error handling for timestamp, value, and tag columns by checking their data types and providing detailed error messages. - Replaced the use of `Helper::try_into_vector` with direct downcasting to `TimestampMillisecondArray`, `Float64Array`, and `StringArray` for improved type safety and clarity. - Updated the construction of `api::v1::Rows` to directly handle null values and construct `api::v1::Value` objects accordingly. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ## Commit Message Refactor `bulk_insert.rs` to optimize state access - Moved the state read operation inside a new block to limit its scope and improve code clarity. - Adjusted logic for processing `tag_columns` and `non_tag_indices` to work within the new block structure. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Refactor `compute_tsid_array` Function - **Refactored `compute_tsid_array` function**: Modified the function signature to accept `tag_arrays` as a parameter instead of building it internally. This change affects the following files: - `src/metric-engine/src/batch_modifier.rs` - **Updated test cases**: Adjusted test cases to accommodate the new `compute_tsid_array` function signature by passing `tag_arrays` explicitly. Signed-off-by: Lei, HUANG * docs: add doc for bulk_insert_region Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: ### Commit Message Refactor `bulk_insert.rs` in `metric-engine`: - Removed error handling for unsupported status codes in `write_data` method. - Eliminated `record_batch_to_rows` function, simplifying the data insertion process. - Streamlined the `write_data` method by removing fallback logic for unsupported operations. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: - **Optimize Primary Key Construction**: Refactored `modify_batch_sparse` in `batch_modifier.rs` to use `BinaryBuilder` for more efficient primary key construction. - **Add Fallback for Unsupported Bulk Inserts**: Updated `bulk_insert.rs` to handle unsupported bulk inserts by converting record batches to rows and using `RegionPutRequest`. - **Implement Record Batch to Rows Conversion**: Added `record_batch_to_rows` function in `bulk_insert.rs` to convert `RecordBatch` to `api::v1::Rows` for fallback operations. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: Add test for handling null values in `record_batch_to_rows` - Added a new test `test_record_batch_to_rows_with_null_values` in `bulk_insert.rs` to verify the handling of null values in the `record_batch_to_rows` function. - The test checks the conversion of a `RecordBatch` with null values in various fields to ensure correct row creation and schema handling. Signed-off-by: Lei, HUANG * feat/metric-engine-bulk-insert: Add fallback path for unsupported status and improve error context handling - **`bulk_insert.rs`**: - Added a fallback path for `PartitionTreeMemtable` in case of unsupported status code. - Enhanced error handling by using `with_context` for better error messages when timestamp and value columns are not found in `RecordBatch`. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- Cargo.lock | 1 + src/metric-engine/Cargo.toml | 1 + src/metric-engine/src/batch_modifier.rs | 426 +++++++++++ src/metric-engine/src/engine.rs | 6 +- src/metric-engine/src/engine/bulk_insert.rs | 783 ++++++++++++++++++++ src/metric-engine/src/engine/put.rs | 2 +- src/metric-engine/src/lib.rs | 1 + 7 files changed, 1216 insertions(+), 4 deletions(-) create mode 100644 src/metric-engine/src/batch_modifier.rs create mode 100644 src/metric-engine/src/engine/bulk_insert.rs diff --git a/Cargo.lock b/Cargo.lock index 1f65f1289c..605b037fc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7886,6 +7886,7 @@ dependencies = [ "common-base", "common-error", "common-function", + "common-grpc", "common-macro", "common-meta", "common-query", diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 567210b952..5b561997ab 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -17,6 +17,7 @@ bytes.workspace = true fxhash = "0.2" common-base.workspace = true common-error.workspace = true +common-grpc.workspace = true common-macro.workspace = true common-query.workspace = true common-recordbatch.workspace = true diff --git a/src/metric-engine/src/batch_modifier.rs b/src/metric-engine/src/batch_modifier.rs new file mode 100644 index 0000000000..8a5774889b --- /dev/null +++ b/src/metric-engine/src/batch_modifier.rs @@ -0,0 +1,426 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::hash::Hasher; +use std::sync::Arc; + +use datatypes::arrow::array::{Array, BinaryBuilder, StringArray, UInt64Array}; +use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; +use datatypes::arrow::record_batch::RecordBatch; +use datatypes::value::ValueRef; +use fxhash::FxHasher; +use mito_codec::row_converter::SparsePrimaryKeyCodec; +use snafu::ResultExt; +use store_api::storage::ColumnId; +use store_api::storage::consts::{PRIMARY_KEY_COLUMN_NAME, ReservedColumnId}; + +use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu}; + +/// Info about a tag column for TSID computation and sparse primary key encoding. +#[allow(dead_code)] +pub(crate) struct TagColumnInfo { + /// Column name (used for label-name hash). + pub name: String, + /// Column index in the RecordBatch. + pub index: usize, + /// Column ID in the physical region. + pub column_id: ColumnId, +} + +/// Computes `__tsid` values for each row. +#[allow(dead_code)] +pub(crate) fn compute_tsid_array( + batch: &RecordBatch, + sorted_tag_columns: &[TagColumnInfo], + tag_arrays: &[&StringArray], +) -> UInt64Array { + let num_rows = batch.num_rows(); + + let label_name_hash = { + let mut hasher = FxHasher::default(); + for tag_col in sorted_tag_columns { + hasher.write(tag_col.name.as_bytes()); + hasher.write_u8(0xff); + } + hasher.finish() + }; + + let mut tsid_values = Vec::with_capacity(num_rows); + for row in 0..num_rows { + let has_null = tag_arrays.iter().any(|arr| arr.is_null(row)); + + let tsid = if !has_null { + let mut hasher = FxHasher::default(); + hasher.write_u64(label_name_hash); + for arr in tag_arrays { + hasher.write(arr.value(row).as_bytes()); + hasher.write_u8(0xff); + } + hasher.finish() + } else { + let mut name_hasher = FxHasher::default(); + for (tc, arr) in sorted_tag_columns.iter().zip(tag_arrays.iter()) { + if !arr.is_null(row) { + name_hasher.write(tc.name.as_bytes()); + name_hasher.write_u8(0xff); + } + } + let row_label_hash = name_hasher.finish(); + + let mut val_hasher = FxHasher::default(); + val_hasher.write_u64(row_label_hash); + for arr in tag_arrays { + if !arr.is_null(row) { + val_hasher.write(arr.value(row).as_bytes()); + val_hasher.write_u8(0xff); + } + } + val_hasher.finish() + }; + + tsid_values.push(tsid); + } + + UInt64Array::from(tsid_values) +} + +fn build_tag_arrays<'a>( + batch: &'a RecordBatch, + sorted_tag_columns: &[TagColumnInfo], +) -> Vec<&'a StringArray> { + sorted_tag_columns + .iter() + .map(|tc| { + batch + .column(tc.index) + .as_any() + .downcast_ref::() + .expect("tag column must be utf8") + }) + .collect() +} + +/// Modifies a RecordBatch for sparse primary key encoding. +#[allow(dead_code)] +pub(crate) fn modify_batch_sparse( + batch: RecordBatch, + table_id: u32, + sorted_tag_columns: &[TagColumnInfo], + non_tag_column_indices: &[usize], +) -> Result { + let num_rows = batch.num_rows(); + let codec = SparsePrimaryKeyCodec::schemaless(); + let tag_arrays: Vec<&StringArray> = build_tag_arrays(&batch, sorted_tag_columns); + let tsid_array = compute_tsid_array(&batch, sorted_tag_columns, &tag_arrays); + + let mut pk_builder = BinaryBuilder::with_capacity(num_rows, 0); + let mut buffer = Vec::new(); + for row in 0..num_rows { + buffer.clear(); + let internal = [ + (ReservedColumnId::table_id(), ValueRef::UInt32(table_id)), + ( + ReservedColumnId::tsid(), + ValueRef::UInt64(tsid_array.value(row)), + ), + ]; + codec + .encode_to_vec(internal.into_iter(), &mut buffer) + .context(EncodePrimaryKeySnafu)?; + + let tags = sorted_tag_columns + .iter() + .zip(tag_arrays.iter()) + .filter(|(_, arr)| !arr.is_null(row)) + .map(|(tc, arr)| (tc.column_id, ValueRef::String(arr.value(row)))); + codec + .encode_to_vec(tags, &mut buffer) + .context(EncodePrimaryKeySnafu)?; + + pk_builder.append_value(&buffer); + } + + let pk_array = pk_builder.finish(); + + let mut fields = vec![Arc::new(Field::new( + PRIMARY_KEY_COLUMN_NAME, + DataType::Binary, + false, + ))]; + let mut columns: Vec> = vec![Arc::new(pk_array)]; + + for &idx in non_tag_column_indices { + fields.push(batch.schema().fields()[idx].clone()); + columns.push(batch.column(idx).clone()); + } + + let new_schema = Arc::new(ArrowSchema::new(fields)); + RecordBatch::try_new(new_schema, columns).map_err(|e| { + UnexpectedRequestSnafu { + reason: format!("Failed to build modified sparse RecordBatch: {e}"), + } + .build() + }) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::sync::Arc; + + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; + use datatypes::arrow::array::{BinaryArray, Int64Array, StringArray}; + use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datatypes::arrow::record_batch::RecordBatch; + use store_api::codec::PrimaryKeyEncoding; + use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME; + + use super::*; + use crate::row_modifier::{RowModifier, RowsIter, TableIdInput}; + + fn build_sparse_test_batch() -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("greptime_timestamp", DataType::Int64, false), + Field::new("greptime_value", DataType::Float64, true), + Field::new("namespace", DataType::Utf8, true), + Field::new("host", DataType::Utf8, true), + ])); + RecordBatch::try_new( + schema, + vec![ + Arc::new(Int64Array::from(vec![1000])), + Arc::new(datatypes::arrow::array::Float64Array::from(vec![42.0])), + Arc::new(StringArray::from(vec!["greptimedb"])), + Arc::new(StringArray::from(vec!["127.0.0.1"])), + ], + ) + .unwrap() + } + + fn sparse_tag_columns() -> Vec { + vec![ + TagColumnInfo { + name: "host".to_string(), + index: 3, + column_id: 3, + }, + TagColumnInfo { + name: "namespace".to_string(), + index: 2, + column_id: 2, + }, + ] + } + + #[test] + fn test_compute_tsid_basic() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("namespace", DataType::Utf8, true), + Field::new("host", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec!["greptimedb"])), + Arc::new(StringArray::from(vec!["127.0.0.1"])), + ], + ) + .unwrap(); + + let tag_columns: Vec = vec![ + TagColumnInfo { + name: "host".to_string(), + index: 1, + column_id: 2, + }, + TagColumnInfo { + name: "namespace".to_string(), + index: 0, + column_id: 1, + }, + ]; + let tag_arrays = build_tag_arrays(&batch, &tag_columns); + let tsid_array = compute_tsid_array(&batch, &tag_columns, &tag_arrays); + + assert_eq!(tsid_array.value(0), 2721566936019240841); + } + + #[test] + fn test_compute_tsid_with_nulls() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + ])); + let batch_no_null = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(vec!["A"])), + Arc::new(StringArray::from(vec!["B"])), + ], + ) + .unwrap(); + let tag_cols_2: Vec = vec![ + TagColumnInfo { + name: "a".to_string(), + index: 0, + column_id: 1, + }, + TagColumnInfo { + name: "b".to_string(), + index: 1, + column_id: 2, + }, + ]; + let tag_arrays_2 = build_tag_arrays(&batch_no_null, &tag_cols_2); + let tsid_no_null = compute_tsid_array(&batch_no_null, &tag_cols_2, &tag_arrays_2); + + let schema3 = Arc::new(ArrowSchema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + Field::new("c", DataType::Utf8, true), + ])); + let batch_with_null = RecordBatch::try_new( + schema3, + vec![ + Arc::new(StringArray::from(vec!["A"])), + Arc::new(StringArray::from(vec!["B"])), + Arc::new(StringArray::from(vec![None as Option<&str>])), + ], + ) + .unwrap(); + let tag_cols_3: Vec = vec![ + TagColumnInfo { + name: "a".to_string(), + index: 0, + column_id: 1, + }, + TagColumnInfo { + name: "b".to_string(), + index: 1, + column_id: 2, + }, + TagColumnInfo { + name: "c".to_string(), + index: 2, + column_id: 3, + }, + ]; + let tag_arrays_3 = build_tag_arrays(&batch_with_null, &tag_cols_3); + let tsid_with_null = compute_tsid_array(&batch_with_null, &tag_cols_3, &tag_arrays_3); + + assert_eq!(tsid_no_null.value(0), tsid_with_null.value(0)); + } + + #[test] + fn test_modify_batch_sparse() { + let batch = build_sparse_test_batch(); + let tag_columns = sparse_tag_columns(); + let non_tag_indices = vec![0, 1]; + let table_id: u32 = 1025; + + let modified = + modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap(); + + assert_eq!(modified.num_columns(), 3); + assert_eq!(modified.schema().field(0).name(), PRIMARY_KEY_COLUMN_NAME); + assert_eq!(modified.schema().field(1).name(), "greptime_timestamp"); + assert_eq!(modified.schema().field(2).name(), "greptime_value"); + } + + #[test] + fn test_modify_batch_sparse_matches_row_modifier() { + let batch = build_sparse_test_batch(); + let tag_columns = sparse_tag_columns(); + let non_tag_indices = vec![0, 1]; + let table_id: u32 = 1025; + let modified = + modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap(); + + let name_to_column_id: HashMap = [ + ("greptime_timestamp".to_string(), 0), + ("greptime_value".to_string(), 1), + ("namespace".to_string(), 2), + ("host".to_string(), 3), + ] + .into_iter() + .collect(); + + let rows = Rows { + schema: vec![ + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "greptime_value".to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "namespace".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "host".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ], + rows: vec![Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(1000)), + }, + Value { + value_data: Some(ValueData::F64Value(42.0)), + }, + Value { + value_data: Some(ValueData::StringValue("greptimedb".to_string())), + }, + Value { + value_data: Some(ValueData::StringValue("127.0.0.1".to_string())), + }, + ], + }], + }; + + let row_iter = RowsIter::new(rows, &name_to_column_id); + let rows = RowModifier::default() + .modify_rows( + row_iter, + TableIdInput::Single(table_id), + PrimaryKeyEncoding::Sparse, + ) + .unwrap(); + let ValueData::BinaryValue(expected_pk) = + rows.rows[0].values[0].value_data.clone().unwrap() + else { + panic!("expected binary primary key"); + }; + + let actual_array = modified + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(actual_array.value(0), expected_pk.as_slice()); + } +} diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 7a1efedac4..ba90ca960d 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -13,6 +13,7 @@ // limitations under the License. mod alter; +mod bulk_insert; mod catchup; mod close; mod create; @@ -288,9 +289,8 @@ impl RegionEngine for MetricEngine { debug_assert_eq!(region_id, resp_region_id); return response; } - RegionRequest::BulkInserts(_) => { - // todo(hl): find a way to support bulk inserts in metric engine. - UnsupportedRegionRequestSnafu { request }.fail() + RegionRequest::BulkInserts(bulk) => { + self.inner.bulk_insert_region(region_id, bulk).await } }; diff --git a/src/metric-engine/src/engine/bulk_insert.rs b/src/metric-engine/src/engine/bulk_insert.rs new file mode 100644 index 0000000000..2a3c26c80c --- /dev/null +++ b/src/metric-engine/src/engine/bulk_insert.rs @@ -0,0 +1,783 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use api::v1::{ArrowIpc, ColumnDataType, SemanticType}; +use bytes::Bytes; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_grpc::flight::{FlightEncoder, FlightMessage}; +use common_query::prelude::{greptime_timestamp, greptime_value}; +use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray}; +use datatypes::arrow::record_batch::RecordBatch; +use snafu::{OptionExt, ensure}; +use store_api::codec::PrimaryKeyEncoding; +use store_api::metadata::RegionMetadataRef; +use store_api::region_request::{ + AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest, +}; +use store_api::storage::RegionId; + +use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse}; +use crate::engine::MetricEngineInner; +use crate::error; +use crate::error::Result; + +impl MetricEngineInner { + /// Bulk-inserts logical rows into a metric region. + /// + /// This method accepts a `RegionBulkInsertsRequest` whose payload is a logical + /// `RecordBatch` (timestamp, value and tag columns) for the given logical `region_id`. + /// + /// The transformed batch is encoded to Arrow IPC and forwarded as a `BulkInserts` + /// request to the data region, along with the original `partition_expr_version`. + /// If the data region reports `StatusCode::Unsupported` for bulk inserts, the request + /// is transparently retried as a `Put` by converting the original logical batch into + /// `api::v1::Rows`, so callers observe the same semantics as `put_region`. + /// + /// Returns the number of affected rows, or `0` if the input batch is empty. + pub async fn bulk_insert_region( + &self, + region_id: RegionId, + request: RegionBulkInsertsRequest, + ) -> Result { + ensure!( + !self.is_physical_region(region_id), + error::UnsupportedRegionRequestSnafu { + request: RegionRequest::BulkInserts(request), + } + ); + + let (physical_region_id, data_region_id, primary_key_encoding) = + self.find_data_region_meta(region_id)?; + + if primary_key_encoding != PrimaryKeyEncoding::Sparse { + return error::UnsupportedRegionRequestSnafu { + request: RegionRequest::BulkInserts(request), + } + .fail(); + } + + let batch = request.payload; + if batch.num_rows() == 0 { + return Ok(0); + } + + let logical_metadata = self + .logical_region_metadata(physical_region_id, region_id) + .await?; + let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata( + region_id, + data_region_id, + &batch, + &logical_metadata, + )?; + let modified_batch = modify_batch_sparse( + batch.clone(), + region_id.table_id(), + &tag_columns, + &non_tag_indices, + )?; + let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?; + + let partition_expr_version = request.partition_expr_version; + let request = RegionBulkInsertsRequest { + region_id: data_region_id, + payload: modified_batch, + raw_data: ArrowIpc { + schema, + data_header, + payload, + }, + partition_expr_version, + }; + match self + .data_region + .write_data(data_region_id, RegionRequest::BulkInserts(request)) + .await + { + Ok(affected_rows) => Ok(affected_rows), + Err(err) if err.status_code() == StatusCode::Unsupported => { + // todo(hl): fallback path for PartitionTreeMemtable, remove this once we remove it + let rows = record_batch_to_rows(&batch, region_id)?; + self.put_region( + region_id, + RegionPutRequest { + rows, + hint: None, + partition_expr_version, + }, + ) + .await + } + Err(err) => Err(err), + } + } + + fn resolve_tag_columns_from_metadata( + &self, + logical_region_id: RegionId, + data_region_id: RegionId, + batch: &RecordBatch, + logical_metadata: &RegionMetadataRef, + ) -> Result<(Vec, Vec)> { + let tag_names: HashSet<&str> = logical_metadata + .column_metadatas + .iter() + .filter_map(|column| { + if column.semantic_type == SemanticType::Tag { + Some(column.column_schema.name.as_str()) + } else { + None + } + }) + .collect(); + + let mut tag_columns = Vec::new(); + let mut non_tag_indices = Vec::new(); + { + let state = self.state.read().unwrap(); + let physical_columns = state + .physical_region_states() + .get(&data_region_id) + .context(error::PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + })? + .physical_columns(); + + for (index, field) in batch.schema().fields().iter().enumerate() { + let name = field.name(); + let column_id = + *physical_columns + .get(name) + .with_context(|| error::ColumnNotFoundSnafu { + name: name.clone(), + region_id: logical_region_id, + })?; + if tag_names.contains(name.as_str()) { + tag_columns.push(TagColumnInfo { + name: name.clone(), + index, + column_id, + }); + } else { + non_tag_indices.push(index); + } + } + } + + tag_columns.sort_by(|a, b| a.name.cmp(&b.name)); + Ok((tag_columns, non_tag_indices)) + } +} + +fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result { + let schema_ref = batch.schema(); + let fields = schema_ref.fields(); + + let mut ts_idx = None; + let mut val_idx = None; + let mut tag_indices = Vec::new(); + + for (idx, field) in fields.iter().enumerate() { + if field.name() == greptime_timestamp() { + ts_idx = Some(idx); + if !matches!( + field.data_type(), + datatypes::arrow::datatypes::DataType::Timestamp( + datatypes::arrow::datatypes::TimeUnit::Millisecond, + _ + ) + ) { + return error::UnexpectedRequestSnafu { + reason: format!( + "Timestamp column '{}' in region {:?} has incompatible type: {:?}", + field.name(), + logical_region_id, + field.data_type() + ), + } + .fail(); + } + } else if field.name() == greptime_value() { + val_idx = Some(idx); + if !matches!( + field.data_type(), + datatypes::arrow::datatypes::DataType::Float64 + ) { + return error::UnexpectedRequestSnafu { + reason: format!( + "Value column '{}' in region {:?} has incompatible type: {:?}", + field.name(), + logical_region_id, + field.data_type() + ), + } + .fail(); + } + } else { + if !matches!( + field.data_type(), + datatypes::arrow::datatypes::DataType::Utf8 + ) { + return error::UnexpectedRequestSnafu { + reason: format!( + "Tag column '{}' in region {:?} must be Utf8, found: {:?}", + field.name(), + logical_region_id, + field.data_type() + ), + } + .fail(); + } + tag_indices.push(idx); + } + } + + let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu { + reason: format!( + "Timestamp column '{}' not found in RecordBatch for region {:?}", + greptime_timestamp(), + logical_region_id + ), + })?; + let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu { + reason: format!( + "Value column '{}' not found in RecordBatch for region {:?}", + greptime_value(), + logical_region_id + ), + })?; + + let mut schema = Vec::with_capacity(2 + tag_indices.len()); + schema.push(api::v1::ColumnSchema { + column_name: greptime_timestamp().to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + datatype_extension: None, + options: None, + }); + schema.push(api::v1::ColumnSchema { + column_name: greptime_value().to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: None, + options: None, + }); + for &idx in &tag_indices { + let field = &fields[idx]; + schema.push(api::v1::ColumnSchema { + column_name: field.name().clone(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + datatype_extension: None, + options: None, + }); + } + + let ts_array = batch + .column(ts_idx) + .as_any() + .downcast_ref::() + .expect("validated as TimestampMillisecond"); + let val_array = batch + .column(val_idx) + .as_any() + .downcast_ref::() + .expect("validated as Float64"); + let tag_arrays: Vec<&StringArray> = tag_indices + .iter() + .map(|&idx| { + batch + .column(idx) + .as_any() + .downcast_ref::() + .expect("validated as Utf8") + }) + .collect(); + + let num_rows = batch.num_rows(); + let mut rows = Vec::with_capacity(num_rows); + for row_idx in 0..num_rows { + let mut values = Vec::with_capacity(2 + tag_arrays.len()); + + if ts_array.is_null(row_idx) { + values.push(api::v1::Value { value_data: None }); + } else { + values.push(api::v1::Value { + value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue( + ts_array.value(row_idx), + )), + }); + } + + if val_array.is_null(row_idx) { + values.push(api::v1::Value { value_data: None }); + } else { + values.push(api::v1::Value { + value_data: Some(api::v1::value::ValueData::F64Value( + val_array.value(row_idx), + )), + }); + } + + for arr in &tag_arrays { + if arr.is_null(row_idx) { + values.push(api::v1::Value { value_data: None }); + } else { + values.push(api::v1::Value { + value_data: Some(api::v1::value::ValueData::StringValue( + arr.value(row_idx).to_string(), + )), + }); + } + } + + rows.push(api::v1::Row { values }); + } + + Ok(api::v1::Rows { schema, rows }) +} + +fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> { + let mut encoder = FlightEncoder::default(); + let schema = encoder.encode_schema(record_batch.schema().as_ref()); + let mut iter = encoder + .encode(FlightMessage::RecordBatch(record_batch.clone())) + .into_iter(); + + let Some(flight_data) = iter.next() else { + return error::UnexpectedRequestSnafu { + reason: "Failed to encode empty flight data", + } + .fail(); + }; + ensure!( + iter.next().is_none(), + error::UnexpectedRequestSnafu { + reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(), + } + ); + + Ok(( + schema.data_header, + flight_data.data_header, + flight_data.data_body, + )) +} + +#[cfg(test)] +mod tests { + use std::assert_matches::assert_matches; + use std::sync::Arc; + + use api::v1::ArrowIpc; + use common_error::ext::ErrorExt; + use common_query::prelude::{greptime_timestamp, greptime_value}; + use common_recordbatch::RecordBatches; + use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray}; + use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + use datatypes::arrow::record_batch::RecordBatch; + use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING; + use store_api::path_utils::table_dir; + use store_api::region_engine::RegionEngine; + use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest}; + use store_api::storage::{RegionId, ScanRequest}; + + use super::record_batch_to_ipc; + use crate::error::Error; + use crate::test_util::{self, TestEnv}; + + fn build_logical_batch(start: usize, rows: usize) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + greptime_timestamp(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new(greptime_value(), DataType::Float64, true), + Field::new("job", DataType::Utf8, true), + ])); + + let mut ts = Vec::with_capacity(rows); + let mut values = Vec::with_capacity(rows); + let mut tags = Vec::with_capacity(rows); + for i in start..start + rows { + ts.push(i as i64); + values.push(i as f64); + tags.push("tag_0".to_string()); + } + + RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(ts)), + Arc::new(Float64Array::from(values)), + Arc::new(StringArray::from(tags)), + ], + ) + .unwrap() + } + + fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest { + let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap(); + RegionRequest::BulkInserts(RegionBulkInsertsRequest { + region_id: logical_region_id, + payload: batch, + raw_data: ArrowIpc { + schema, + data_header, + payload, + }, + partition_expr_version: None, + }) + } + + async fn init_dense_metric_region(env: &TestEnv) -> RegionId { + let physical_region_id = env.default_physical_region_id(); + env.create_physical_region( + physical_region_id, + &TestEnv::default_table_dir(), + vec![( + MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(), + "dense".to_string(), + )], + ) + .await; + + let logical_region_id = env.default_logical_region_id(); + let request = test_util::create_logical_region_request( + &["job"], + physical_region_id, + &table_dir("test", logical_region_id.table_id()), + ); + env.metric() + .handle_request(logical_region_id, RegionRequest::Create(request)) + .await + .unwrap(); + logical_region_id + } + + #[tokio::test] + async fn test_bulk_insert_empty_batch_returns_zero() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let logical_region_id = env.default_logical_region_id(); + + let batch = build_logical_batch(0, 0); + let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest { + region_id: logical_region_id, + payload: batch, + raw_data: ArrowIpc::default(), + partition_expr_version: None, + }); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 0); + } + + #[tokio::test] + async fn test_bulk_insert_physical_region_rejected() { + let env = TestEnv::new().await; + env.init_metric_region().await; + + let physical_region_id = env.default_physical_region_id(); + let batch = build_logical_batch(0, 2); + let request = build_bulk_request(physical_region_id, batch); + + let err = env + .metric() + .handle_request(physical_region_id, request) + .await + .unwrap_err(); + let Some(err) = err.as_any().downcast_ref::() else { + panic!("unexpected error type"); + }; + assert_matches!(err, Error::UnsupportedRegionRequest { .. }); + } + + #[tokio::test] + async fn test_bulk_insert_unknown_column_errors() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let logical_region_id = env.default_logical_region_id(); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + greptime_timestamp(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new(greptime_value(), DataType::Float64, true), + Field::new("nonexistent_column", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0i64])), + Arc::new(Float64Array::from(vec![1.0])), + Arc::new(StringArray::from(vec!["val"])), + ], + ) + .unwrap(); + + let request = build_bulk_request(logical_region_id, batch); + let err = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap_err(); + let Some(err) = err.as_any().downcast_ref::() else { + panic!("unexpected error type"); + }; + assert_matches!(err, Error::ColumnNotFound { .. }); + } + + #[tokio::test] + async fn test_bulk_insert_multiple_tag_columns() { + let env = TestEnv::new().await; + let physical_region_id = env.default_physical_region_id(); + env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![]) + .await; + let logical_region_id = env.default_logical_region_id(); + let request = test_util::create_logical_region_request( + &["host", "region"], + physical_region_id, + &table_dir("test", logical_region_id.table_id()), + ); + env.metric() + .handle_request(logical_region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + greptime_timestamp(), + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new(greptime_value(), DataType::Float64, true), + Field::new("host", DataType::Utf8, true), + Field::new("region", DataType::Utf8, true), + ])); + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])), + Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])), + Arc::new(StringArray::from(vec!["h1", "h2", "h1"])), + Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])), + ], + ) + .unwrap(); + + let request = build_bulk_request(logical_region_id, batch); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 3); + + let stream = env + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 3); + } + + #[tokio::test] + async fn test_bulk_insert_accumulates_rows() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let logical_region_id = env.default_logical_region_id(); + + let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3)); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 3); + + let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5)); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 5); + + let stream = env + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 8); + } + + #[tokio::test] + async fn test_bulk_insert_sparse_encoding() { + let env = TestEnv::new().await; + env.init_metric_region().await; + let logical_region_id = env.default_logical_region_id(); + + let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4)); + let response = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 4); + + let stream = env + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 4); + } + + #[tokio::test] + async fn test_bulk_insert_dense_encoding_rejected() { + let env = TestEnv::new().await; + let logical_region_id = init_dense_metric_region(&env).await; + + let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2)); + let err = env + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap_err(); + let Some(err) = err.as_any().downcast_ref::() else { + panic!("unexpected error type"); + }; + assert_matches!(err, Error::UnsupportedRegionRequest { .. }); + } + + #[tokio::test] + async fn test_bulk_insert_matches_put() { + let env_put = TestEnv::new().await; + env_put.init_metric_region().await; + let logical_region_id = env_put.default_logical_region_id(); + let schema = test_util::row_schema_with_tags(&["job"]); + let rows = test_util::build_rows(1, 5); + env_put + .metric() + .handle_request( + logical_region_id, + RegionRequest::Put(RegionPutRequest { + rows: api::v1::Rows { schema, rows }, + hint: None, + partition_expr_version: None, + }), + ) + .await + .unwrap(); + let put_stream = env_put + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let put_batches = RecordBatches::try_collect(put_stream).await.unwrap(); + let put_output = put_batches.pretty_print().unwrap(); + + let env_bulk = TestEnv::new().await; + env_bulk.init_metric_region().await; + let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5)); + env_bulk + .metric() + .handle_request(logical_region_id, request) + .await + .unwrap(); + let bulk_stream = env_bulk + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap(); + let bulk_output = bulk_batches.pretty_print().unwrap(); + + assert_eq!(put_output, bulk_output); + } + + #[test] + fn test_record_batch_to_rows_with_null_values() { + use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray}; + use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + use datatypes::arrow::record_batch::RecordBatch; + use store_api::storage::RegionId; + + use crate::engine::bulk_insert::record_batch_to_rows; + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + greptime_timestamp(), + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ), + Field::new(greptime_value(), DataType::Float64, true), + Field::new("job", DataType::Utf8, true), + Field::new("host", DataType::Utf8, true), + ])); + + let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]); + let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]); + let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]); + let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(ts_array), + Arc::new(val_array), + Arc::new(job_array), + Arc::new(host_array), + ], + ) + .unwrap(); + + let region_id = RegionId::new(1, 1); + let rows = record_batch_to_rows(&batch, region_id).unwrap(); + + assert_eq!(rows.rows.len(), 3); + assert_eq!(rows.schema.len(), 4); + + // Row 0: all non-null except host + assert!(rows.rows[0].values[0].value_data.is_some()); + assert!(rows.rows[0].values[1].value_data.is_some()); + assert!(rows.rows[0].values[2].value_data.is_some()); + assert!(rows.rows[0].values[3].value_data.is_none()); + + // Row 1: null timestamp, null job + assert!(rows.rows[1].values[0].value_data.is_none()); + assert!(rows.rows[1].values[1].value_data.is_some()); + assert!(rows.rows[1].values[2].value_data.is_none()); + assert!(rows.rows[1].values[3].value_data.is_some()); + + // Row 2: null value + assert!(rows.rows[2].values[0].value_data.is_some()); + assert!(rows.rows[2].values[1].value_data.is_none()); + assert!(rows.rows[2].values[2].value_data.is_some()); + assert!(rows.rows[2].values[3].value_data.is_some()); + } +} diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 9251605aea..edae0d2bb4 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -460,7 +460,7 @@ impl MetricEngineInner { .await } - fn find_data_region_meta( + pub(crate) fn find_data_region_meta( &self, logical_region_id: RegionId, ) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> { diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index 30daa80b91..b93029f2f4 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -52,6 +52,7 @@ #![feature(assert_matches)] +mod batch_modifier; pub mod config; mod data_region; pub mod engine;