feat(metric-engine): support bulk inserts with put fallback (#7792)

* feat(metric-engine): support bulk inserts

Implement `RegionRequest::BulkInserts` to support efficient columnar data
ingestion in the metric engine.

Key changes:
- Implement `bulk_insert_region` to handle logical-to-physical region mapping
  and dispatch writes.
- Add `batch_modifier` for `RecordBatch` transformations, specifically for
  `__tsid` generation and sparse primary key encoding.
- Integrate `BulkInserts` into the `MetricEngine` request handling logic.
- Provide a row-based fallback mechanism if the underlying storage doesn't
  support bulk writes.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/metric-engine-bulk-insert:
 ### Update `bulk_insert.rs` to Support Partition Expression Version

 - **Enhancements**:
   - Added support for `partition_expr_version` in `RegionBulkInsertsRequest` and `RegionPutRequest`.
   - Modified the handling of `partition_expr_version` to be dynamically set from the `request` object.

 Files affected:
 - `src/metric-engine/src/engine/bulk_insert.rs`

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: cargo lock revert

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* add doc for conversions

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore: simplify test

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/metric-engine-bulk-insert:
 ### Refactor `bulk_insert.rs` in `metric-engine`

 - **Refactor Functionality**:
   - Replaced `resolve_tag_columns` with `resolve_tag_columns_from_metadata` to streamline tag column resolution.
   - Moved logic for resolving tag columns directly into `resolve_tag_columns_from_metadata`, removing the need for an external function call.
 - **Enhancements**:
   - Improved error handling and context provision for missing physical regions and columns.
   - Optimized tag column sorting and index management within the batch processing logic.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/metric-engine-bulk-insert:
 ### Refactor `record_batch_to_rows` Function in `bulk_insert.rs`

 - Simplified the `record_batch_to_rows` function by removing the `logical_metadata` parameter and directly validating column types within the function.
 - Enhanced error handling for timestamp, value, and tag columns by checking their data types and providing detailed error messages.
 - Replaced the use of `Helper::try_into_vector` with direct downcasting to `TimestampMillisecondArray`, `Float64Array`, and `StringArray` for improved type safety and clarity.
 - Updated the construction of `api::v1::Rows` to directly handle null values and construct `api::v1::Value` objects accordingly.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/metric-engine-bulk-insert:
 ## Commit Message

 Refactor `bulk_insert.rs` to optimize state access

 - Moved the state read operation inside a new block to limit its scope and improve code clarity.
 - Adjusted logic for processing `tag_columns` and `non_tag_indices` to work within the new block structure.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/metric-engine-bulk-insert:
 ### Refactor `compute_tsid_array` Function

 - **Refactored `compute_tsid_array` function**: Modified the function signature to accept `tag_arrays` as a parameter instead of building it internally. This change affects the following files:
   - `src/metric-engine/src/batch_modifier.rs`

 - **Updated test cases**: Adjusted test cases to accommodate the new `compute_tsid_array` function signature by passing `tag_arrays` explicitly.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* docs: add doc for bulk_insert_region

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/metric-engine-bulk-insert:
 ### Commit Message

 Refactor `bulk_insert.rs` in `metric-engine`:

 - Removed error handling for unsupported status codes in `write_data` method.
 - Eliminated `record_batch_to_rows` function, simplifying the data insertion process.
 - Streamlined the `write_data` method by removing fallback logic for unsupported operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/metric-engine-bulk-insert:
 - **Optimize Primary Key Construction**: Refactored `modify_batch_sparse` in `batch_modifier.rs` to use `BinaryBuilder` for more efficient primary key construction.
 - **Add Fallback for Unsupported Bulk Inserts**: Updated `bulk_insert.rs` to handle unsupported bulk inserts by converting record batches to rows and using `RegionPutRequest`.
 - **Implement Record Batch to Rows Conversion**: Added `record_batch_to_rows` function in `bulk_insert.rs` to convert `RecordBatch` to `api::v1::Rows` for fallback operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/metric-engine-bulk-insert:
 Add test for handling null values in `record_batch_to_rows`

 - Added a new test `test_record_batch_to_rows_with_null_values` in `bulk_insert.rs` to verify the handling of null values in the `record_batch_to_rows` function.
 - The test checks the conversion of a `RecordBatch` with null values in various fields to ensure correct row creation and schema handling.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/metric-engine-bulk-insert:
 Add fallback path for unsupported status and improve error context handling

 - **`bulk_insert.rs`**:
   - Added a fallback path for `PartitionTreeMemtable` in case of unsupported status code.
   - Enhanced error handling by using `with_context` for better error messages when timestamp and value columns are not found in `RecordBatch`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-03-17 19:28:06 +08:00
committed by GitHub
parent e0aadffb91
commit dc98e0215b
7 changed files with 1216 additions and 4 deletions

1
Cargo.lock generated
View File

@@ -7886,6 +7886,7 @@ dependencies = [
"common-base",
"common-error",
"common-function",
"common-grpc",
"common-macro",
"common-meta",
"common-query",

View File

@@ -17,6 +17,7 @@ bytes.workspace = true
fxhash = "0.2"
common-base.workspace = true
common-error.workspace = true
common-grpc.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true

View File

@@ -0,0 +1,426 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hash::Hasher;
use std::sync::Arc;
use datatypes::arrow::array::{Array, BinaryBuilder, StringArray, UInt64Array};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::value::ValueRef;
use fxhash::FxHasher;
use mito_codec::row_converter::SparsePrimaryKeyCodec;
use snafu::ResultExt;
use store_api::storage::ColumnId;
use store_api::storage::consts::{PRIMARY_KEY_COLUMN_NAME, ReservedColumnId};
use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu};
/// Info about a tag column for TSID computation and sparse primary key encoding.
#[allow(dead_code)]
pub(crate) struct TagColumnInfo {
/// Column name (used for label-name hash).
pub name: String,
/// Column index in the RecordBatch.
pub index: usize,
/// Column ID in the physical region.
pub column_id: ColumnId,
}
/// Computes `__tsid` values for each row.
#[allow(dead_code)]
pub(crate) fn compute_tsid_array(
batch: &RecordBatch,
sorted_tag_columns: &[TagColumnInfo],
tag_arrays: &[&StringArray],
) -> UInt64Array {
let num_rows = batch.num_rows();
let label_name_hash = {
let mut hasher = FxHasher::default();
for tag_col in sorted_tag_columns {
hasher.write(tag_col.name.as_bytes());
hasher.write_u8(0xff);
}
hasher.finish()
};
let mut tsid_values = Vec::with_capacity(num_rows);
for row in 0..num_rows {
let has_null = tag_arrays.iter().any(|arr| arr.is_null(row));
let tsid = if !has_null {
let mut hasher = FxHasher::default();
hasher.write_u64(label_name_hash);
for arr in tag_arrays {
hasher.write(arr.value(row).as_bytes());
hasher.write_u8(0xff);
}
hasher.finish()
} else {
let mut name_hasher = FxHasher::default();
for (tc, arr) in sorted_tag_columns.iter().zip(tag_arrays.iter()) {
if !arr.is_null(row) {
name_hasher.write(tc.name.as_bytes());
name_hasher.write_u8(0xff);
}
}
let row_label_hash = name_hasher.finish();
let mut val_hasher = FxHasher::default();
val_hasher.write_u64(row_label_hash);
for arr in tag_arrays {
if !arr.is_null(row) {
val_hasher.write(arr.value(row).as_bytes());
val_hasher.write_u8(0xff);
}
}
val_hasher.finish()
};
tsid_values.push(tsid);
}
UInt64Array::from(tsid_values)
}
fn build_tag_arrays<'a>(
batch: &'a RecordBatch,
sorted_tag_columns: &[TagColumnInfo],
) -> Vec<&'a StringArray> {
sorted_tag_columns
.iter()
.map(|tc| {
batch
.column(tc.index)
.as_any()
.downcast_ref::<StringArray>()
.expect("tag column must be utf8")
})
.collect()
}
/// Modifies a RecordBatch for sparse primary key encoding.
#[allow(dead_code)]
pub(crate) fn modify_batch_sparse(
batch: RecordBatch,
table_id: u32,
sorted_tag_columns: &[TagColumnInfo],
non_tag_column_indices: &[usize],
) -> Result<RecordBatch> {
let num_rows = batch.num_rows();
let codec = SparsePrimaryKeyCodec::schemaless();
let tag_arrays: Vec<&StringArray> = build_tag_arrays(&batch, sorted_tag_columns);
let tsid_array = compute_tsid_array(&batch, sorted_tag_columns, &tag_arrays);
let mut pk_builder = BinaryBuilder::with_capacity(num_rows, 0);
let mut buffer = Vec::new();
for row in 0..num_rows {
buffer.clear();
let internal = [
(ReservedColumnId::table_id(), ValueRef::UInt32(table_id)),
(
ReservedColumnId::tsid(),
ValueRef::UInt64(tsid_array.value(row)),
),
];
codec
.encode_to_vec(internal.into_iter(), &mut buffer)
.context(EncodePrimaryKeySnafu)?;
let tags = sorted_tag_columns
.iter()
.zip(tag_arrays.iter())
.filter(|(_, arr)| !arr.is_null(row))
.map(|(tc, arr)| (tc.column_id, ValueRef::String(arr.value(row))));
codec
.encode_to_vec(tags, &mut buffer)
.context(EncodePrimaryKeySnafu)?;
pk_builder.append_value(&buffer);
}
let pk_array = pk_builder.finish();
let mut fields = vec![Arc::new(Field::new(
PRIMARY_KEY_COLUMN_NAME,
DataType::Binary,
false,
))];
let mut columns: Vec<Arc<dyn Array>> = vec![Arc::new(pk_array)];
for &idx in non_tag_column_indices {
fields.push(batch.schema().fields()[idx].clone());
columns.push(batch.column(idx).clone());
}
let new_schema = Arc::new(ArrowSchema::new(fields));
RecordBatch::try_new(new_schema, columns).map_err(|e| {
UnexpectedRequestSnafu {
reason: format!("Failed to build modified sparse RecordBatch: {e}"),
}
.build()
})
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use datatypes::arrow::array::{BinaryArray, Int64Array, StringArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
use datatypes::arrow::record_batch::RecordBatch;
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
use super::*;
use crate::row_modifier::{RowModifier, RowsIter, TableIdInput};
fn build_sparse_test_batch() -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("greptime_timestamp", DataType::Int64, false),
Field::new("greptime_value", DataType::Float64, true),
Field::new("namespace", DataType::Utf8, true),
Field::new("host", DataType::Utf8, true),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(vec![1000])),
Arc::new(datatypes::arrow::array::Float64Array::from(vec![42.0])),
Arc::new(StringArray::from(vec!["greptimedb"])),
Arc::new(StringArray::from(vec!["127.0.0.1"])),
],
)
.unwrap()
}
fn sparse_tag_columns() -> Vec<TagColumnInfo> {
vec![
TagColumnInfo {
name: "host".to_string(),
index: 3,
column_id: 3,
},
TagColumnInfo {
name: "namespace".to_string(),
index: 2,
column_id: 2,
},
]
}
#[test]
fn test_compute_tsid_basic() {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("namespace", DataType::Utf8, true),
Field::new("host", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["greptimedb"])),
Arc::new(StringArray::from(vec!["127.0.0.1"])),
],
)
.unwrap();
let tag_columns: Vec<TagColumnInfo> = vec![
TagColumnInfo {
name: "host".to_string(),
index: 1,
column_id: 2,
},
TagColumnInfo {
name: "namespace".to_string(),
index: 0,
column_id: 1,
},
];
let tag_arrays = build_tag_arrays(&batch, &tag_columns);
let tsid_array = compute_tsid_array(&batch, &tag_columns, &tag_arrays);
assert_eq!(tsid_array.value(0), 2721566936019240841);
}
#[test]
fn test_compute_tsid_with_nulls() {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", DataType::Utf8, true),
]));
let batch_no_null = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(vec!["A"])),
Arc::new(StringArray::from(vec!["B"])),
],
)
.unwrap();
let tag_cols_2: Vec<TagColumnInfo> = vec![
TagColumnInfo {
name: "a".to_string(),
index: 0,
column_id: 1,
},
TagColumnInfo {
name: "b".to_string(),
index: 1,
column_id: 2,
},
];
let tag_arrays_2 = build_tag_arrays(&batch_no_null, &tag_cols_2);
let tsid_no_null = compute_tsid_array(&batch_no_null, &tag_cols_2, &tag_arrays_2);
let schema3 = Arc::new(ArrowSchema::new(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", DataType::Utf8, true),
Field::new("c", DataType::Utf8, true),
]));
let batch_with_null = RecordBatch::try_new(
schema3,
vec![
Arc::new(StringArray::from(vec!["A"])),
Arc::new(StringArray::from(vec!["B"])),
Arc::new(StringArray::from(vec![None as Option<&str>])),
],
)
.unwrap();
let tag_cols_3: Vec<TagColumnInfo> = vec![
TagColumnInfo {
name: "a".to_string(),
index: 0,
column_id: 1,
},
TagColumnInfo {
name: "b".to_string(),
index: 1,
column_id: 2,
},
TagColumnInfo {
name: "c".to_string(),
index: 2,
column_id: 3,
},
];
let tag_arrays_3 = build_tag_arrays(&batch_with_null, &tag_cols_3);
let tsid_with_null = compute_tsid_array(&batch_with_null, &tag_cols_3, &tag_arrays_3);
assert_eq!(tsid_no_null.value(0), tsid_with_null.value(0));
}
#[test]
fn test_modify_batch_sparse() {
let batch = build_sparse_test_batch();
let tag_columns = sparse_tag_columns();
let non_tag_indices = vec![0, 1];
let table_id: u32 = 1025;
let modified =
modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
assert_eq!(modified.num_columns(), 3);
assert_eq!(modified.schema().field(0).name(), PRIMARY_KEY_COLUMN_NAME);
assert_eq!(modified.schema().field(1).name(), "greptime_timestamp");
assert_eq!(modified.schema().field(2).name(), "greptime_value");
}
#[test]
fn test_modify_batch_sparse_matches_row_modifier() {
let batch = build_sparse_test_batch();
let tag_columns = sparse_tag_columns();
let non_tag_indices = vec![0, 1];
let table_id: u32 = 1025;
let modified =
modify_batch_sparse(batch, table_id, &tag_columns, &non_tag_indices).unwrap();
let name_to_column_id: HashMap<String, ColumnId> = [
("greptime_timestamp".to_string(), 0),
("greptime_value".to_string(), 1),
("namespace".to_string(), 2),
("host".to_string(), 3),
]
.into_iter()
.collect();
let rows = Rows {
schema: vec![
ColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
..Default::default()
},
ColumnSchema {
column_name: "greptime_value".to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
..Default::default()
},
ColumnSchema {
column_name: "namespace".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
..Default::default()
},
ColumnSchema {
column_name: "host".to_string(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
..Default::default()
},
],
rows: vec![Row {
values: vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(1000)),
},
Value {
value_data: Some(ValueData::F64Value(42.0)),
},
Value {
value_data: Some(ValueData::StringValue("greptimedb".to_string())),
},
Value {
value_data: Some(ValueData::StringValue("127.0.0.1".to_string())),
},
],
}],
};
let row_iter = RowsIter::new(rows, &name_to_column_id);
let rows = RowModifier::default()
.modify_rows(
row_iter,
TableIdInput::Single(table_id),
PrimaryKeyEncoding::Sparse,
)
.unwrap();
let ValueData::BinaryValue(expected_pk) =
rows.rows[0].values[0].value_data.clone().unwrap()
else {
panic!("expected binary primary key");
};
let actual_array = modified
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
assert_eq!(actual_array.value(0), expected_pk.as_slice());
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
mod alter;
mod bulk_insert;
mod catchup;
mod close;
mod create;
@@ -288,9 +289,8 @@ impl RegionEngine for MetricEngine {
debug_assert_eq!(region_id, resp_region_id);
return response;
}
RegionRequest::BulkInserts(_) => {
// todo(hl): find a way to support bulk inserts in metric engine.
UnsupportedRegionRequestSnafu { request }.fail()
RegionRequest::BulkInserts(bulk) => {
self.inner.bulk_insert_region(region_id, bulk).await
}
};

View File

@@ -0,0 +1,783 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashSet;
use api::v1::{ArrowIpc, ColumnDataType, SemanticType};
use bytes::Bytes;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightEncoder, FlightMessage};
use common_query::prelude::{greptime_timestamp, greptime_value};
use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::record_batch::RecordBatch;
use snafu::{OptionExt, ensure};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
use store_api::region_request::{
AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
use crate::engine::MetricEngineInner;
use crate::error;
use crate::error::Result;
impl MetricEngineInner {
/// Bulk-inserts logical rows into a metric region.
///
/// This method accepts a `RegionBulkInsertsRequest` whose payload is a logical
/// `RecordBatch` (timestamp, value and tag columns) for the given logical `region_id`.
///
/// The transformed batch is encoded to Arrow IPC and forwarded as a `BulkInserts`
/// request to the data region, along with the original `partition_expr_version`.
/// If the data region reports `StatusCode::Unsupported` for bulk inserts, the request
/// is transparently retried as a `Put` by converting the original logical batch into
/// `api::v1::Rows`, so callers observe the same semantics as `put_region`.
///
/// Returns the number of affected rows, or `0` if the input batch is empty.
pub async fn bulk_insert_region(
&self,
region_id: RegionId,
request: RegionBulkInsertsRequest,
) -> Result<AffectedRows> {
ensure!(
!self.is_physical_region(region_id),
error::UnsupportedRegionRequestSnafu {
request: RegionRequest::BulkInserts(request),
}
);
let (physical_region_id, data_region_id, primary_key_encoding) =
self.find_data_region_meta(region_id)?;
if primary_key_encoding != PrimaryKeyEncoding::Sparse {
return error::UnsupportedRegionRequestSnafu {
request: RegionRequest::BulkInserts(request),
}
.fail();
}
let batch = request.payload;
if batch.num_rows() == 0 {
return Ok(0);
}
let logical_metadata = self
.logical_region_metadata(physical_region_id, region_id)
.await?;
let (tag_columns, non_tag_indices) = self.resolve_tag_columns_from_metadata(
region_id,
data_region_id,
&batch,
&logical_metadata,
)?;
let modified_batch = modify_batch_sparse(
batch.clone(),
region_id.table_id(),
&tag_columns,
&non_tag_indices,
)?;
let (schema, data_header, payload) = record_batch_to_ipc(&modified_batch)?;
let partition_expr_version = request.partition_expr_version;
let request = RegionBulkInsertsRequest {
region_id: data_region_id,
payload: modified_batch,
raw_data: ArrowIpc {
schema,
data_header,
payload,
},
partition_expr_version,
};
match self
.data_region
.write_data(data_region_id, RegionRequest::BulkInserts(request))
.await
{
Ok(affected_rows) => Ok(affected_rows),
Err(err) if err.status_code() == StatusCode::Unsupported => {
// todo(hl): fallback path for PartitionTreeMemtable, remove this once we remove it
let rows = record_batch_to_rows(&batch, region_id)?;
self.put_region(
region_id,
RegionPutRequest {
rows,
hint: None,
partition_expr_version,
},
)
.await
}
Err(err) => Err(err),
}
}
fn resolve_tag_columns_from_metadata(
&self,
logical_region_id: RegionId,
data_region_id: RegionId,
batch: &RecordBatch,
logical_metadata: &RegionMetadataRef,
) -> Result<(Vec<TagColumnInfo>, Vec<usize>)> {
let tag_names: HashSet<&str> = logical_metadata
.column_metadatas
.iter()
.filter_map(|column| {
if column.semantic_type == SemanticType::Tag {
Some(column.column_schema.name.as_str())
} else {
None
}
})
.collect();
let mut tag_columns = Vec::new();
let mut non_tag_indices = Vec::new();
{
let state = self.state.read().unwrap();
let physical_columns = state
.physical_region_states()
.get(&data_region_id)
.context(error::PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?
.physical_columns();
for (index, field) in batch.schema().fields().iter().enumerate() {
let name = field.name();
let column_id =
*physical_columns
.get(name)
.with_context(|| error::ColumnNotFoundSnafu {
name: name.clone(),
region_id: logical_region_id,
})?;
if tag_names.contains(name.as_str()) {
tag_columns.push(TagColumnInfo {
name: name.clone(),
index,
column_id,
});
} else {
non_tag_indices.push(index);
}
}
}
tag_columns.sort_by(|a, b| a.name.cmp(&b.name));
Ok((tag_columns, non_tag_indices))
}
}
fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result<api::v1::Rows> {
let schema_ref = batch.schema();
let fields = schema_ref.fields();
let mut ts_idx = None;
let mut val_idx = None;
let mut tag_indices = Vec::new();
for (idx, field) in fields.iter().enumerate() {
if field.name() == greptime_timestamp() {
ts_idx = Some(idx);
if !matches!(
field.data_type(),
datatypes::arrow::datatypes::DataType::Timestamp(
datatypes::arrow::datatypes::TimeUnit::Millisecond,
_
)
) {
return error::UnexpectedRequestSnafu {
reason: format!(
"Timestamp column '{}' in region {:?} has incompatible type: {:?}",
field.name(),
logical_region_id,
field.data_type()
),
}
.fail();
}
} else if field.name() == greptime_value() {
val_idx = Some(idx);
if !matches!(
field.data_type(),
datatypes::arrow::datatypes::DataType::Float64
) {
return error::UnexpectedRequestSnafu {
reason: format!(
"Value column '{}' in region {:?} has incompatible type: {:?}",
field.name(),
logical_region_id,
field.data_type()
),
}
.fail();
}
} else {
if !matches!(
field.data_type(),
datatypes::arrow::datatypes::DataType::Utf8
) {
return error::UnexpectedRequestSnafu {
reason: format!(
"Tag column '{}' in region {:?} must be Utf8, found: {:?}",
field.name(),
logical_region_id,
field.data_type()
),
}
.fail();
}
tag_indices.push(idx);
}
}
let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu {
reason: format!(
"Timestamp column '{}' not found in RecordBatch for region {:?}",
greptime_timestamp(),
logical_region_id
),
})?;
let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu {
reason: format!(
"Value column '{}' not found in RecordBatch for region {:?}",
greptime_value(),
logical_region_id
),
})?;
let mut schema = Vec::with_capacity(2 + tag_indices.len());
schema.push(api::v1::ColumnSchema {
column_name: greptime_timestamp().to_string(),
datatype: ColumnDataType::TimestampMillisecond as i32,
semantic_type: SemanticType::Timestamp as i32,
datatype_extension: None,
options: None,
});
schema.push(api::v1::ColumnSchema {
column_name: greptime_value().to_string(),
datatype: ColumnDataType::Float64 as i32,
semantic_type: SemanticType::Field as i32,
datatype_extension: None,
options: None,
});
for &idx in &tag_indices {
let field = &fields[idx];
schema.push(api::v1::ColumnSchema {
column_name: field.name().clone(),
datatype: ColumnDataType::String as i32,
semantic_type: SemanticType::Tag as i32,
datatype_extension: None,
options: None,
});
}
let ts_array = batch
.column(ts_idx)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.expect("validated as TimestampMillisecond");
let val_array = batch
.column(val_idx)
.as_any()
.downcast_ref::<Float64Array>()
.expect("validated as Float64");
let tag_arrays: Vec<&StringArray> = tag_indices
.iter()
.map(|&idx| {
batch
.column(idx)
.as_any()
.downcast_ref::<StringArray>()
.expect("validated as Utf8")
})
.collect();
let num_rows = batch.num_rows();
let mut rows = Vec::with_capacity(num_rows);
for row_idx in 0..num_rows {
let mut values = Vec::with_capacity(2 + tag_arrays.len());
if ts_array.is_null(row_idx) {
values.push(api::v1::Value { value_data: None });
} else {
values.push(api::v1::Value {
value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
ts_array.value(row_idx),
)),
});
}
if val_array.is_null(row_idx) {
values.push(api::v1::Value { value_data: None });
} else {
values.push(api::v1::Value {
value_data: Some(api::v1::value::ValueData::F64Value(
val_array.value(row_idx),
)),
});
}
for arr in &tag_arrays {
if arr.is_null(row_idx) {
values.push(api::v1::Value { value_data: None });
} else {
values.push(api::v1::Value {
value_data: Some(api::v1::value::ValueData::StringValue(
arr.value(row_idx).to_string(),
)),
});
}
}
rows.push(api::v1::Row { values });
}
Ok(api::v1::Rows { schema, rows })
}
fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
let mut encoder = FlightEncoder::default();
let schema = encoder.encode_schema(record_batch.schema().as_ref());
let mut iter = encoder
.encode(FlightMessage::RecordBatch(record_batch.clone()))
.into_iter();
let Some(flight_data) = iter.next() else {
return error::UnexpectedRequestSnafu {
reason: "Failed to encode empty flight data",
}
.fail();
};
ensure!(
iter.next().is_none(),
error::UnexpectedRequestSnafu {
reason: "Bulk insert RecordBatch with dictionary arrays is unsupported".to_string(),
}
);
Ok((
schema.data_header,
flight_data.data_header,
flight_data.data_body,
))
}
#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::sync::Arc;
use api::v1::ArrowIpc;
use common_error::ext::ErrorExt;
use common_query::prelude::{greptime_timestamp, greptime_value};
use common_recordbatch::RecordBatches;
use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use super::record_batch_to_ipc;
use crate::error::Error;
use crate::test_util::{self, TestEnv};
fn build_logical_batch(start: usize, rows: usize) -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new(
greptime_timestamp(),
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(greptime_value(), DataType::Float64, true),
Field::new("job", DataType::Utf8, true),
]));
let mut ts = Vec::with_capacity(rows);
let mut values = Vec::with_capacity(rows);
let mut tags = Vec::with_capacity(rows);
for i in start..start + rows {
ts.push(i as i64);
values.push(i as f64);
tags.push("tag_0".to_string());
}
RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(ts)),
Arc::new(Float64Array::from(values)),
Arc::new(StringArray::from(tags)),
],
)
.unwrap()
}
fn build_bulk_request(logical_region_id: RegionId, batch: RecordBatch) -> RegionRequest {
let (schema, data_header, payload) = record_batch_to_ipc(&batch).unwrap();
RegionRequest::BulkInserts(RegionBulkInsertsRequest {
region_id: logical_region_id,
payload: batch,
raw_data: ArrowIpc {
schema,
data_header,
payload,
},
partition_expr_version: None,
})
}
async fn init_dense_metric_region(env: &TestEnv) -> RegionId {
let physical_region_id = env.default_physical_region_id();
env.create_physical_region(
physical_region_id,
&TestEnv::default_table_dir(),
vec![(
MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
"dense".to_string(),
)],
)
.await;
let logical_region_id = env.default_logical_region_id();
let request = test_util::create_logical_region_request(
&["job"],
physical_region_id,
&table_dir("test", logical_region_id.table_id()),
);
env.metric()
.handle_request(logical_region_id, RegionRequest::Create(request))
.await
.unwrap();
logical_region_id
}
#[tokio::test]
async fn test_bulk_insert_empty_batch_returns_zero() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let logical_region_id = env.default_logical_region_id();
let batch = build_logical_batch(0, 0);
let request = RegionRequest::BulkInserts(RegionBulkInsertsRequest {
region_id: logical_region_id,
payload: batch,
raw_data: ArrowIpc::default(),
partition_expr_version: None,
});
let response = env
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap();
assert_eq!(response.affected_rows, 0);
}
#[tokio::test]
async fn test_bulk_insert_physical_region_rejected() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let physical_region_id = env.default_physical_region_id();
let batch = build_logical_batch(0, 2);
let request = build_bulk_request(physical_region_id, batch);
let err = env
.metric()
.handle_request(physical_region_id, request)
.await
.unwrap_err();
let Some(err) = err.as_any().downcast_ref::<Error>() else {
panic!("unexpected error type");
};
assert_matches!(err, Error::UnsupportedRegionRequest { .. });
}
#[tokio::test]
async fn test_bulk_insert_unknown_column_errors() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let logical_region_id = env.default_logical_region_id();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new(
greptime_timestamp(),
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(greptime_value(), DataType::Float64, true),
Field::new("nonexistent_column", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![0i64])),
Arc::new(Float64Array::from(vec![1.0])),
Arc::new(StringArray::from(vec!["val"])),
],
)
.unwrap();
let request = build_bulk_request(logical_region_id, batch);
let err = env
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap_err();
let Some(err) = err.as_any().downcast_ref::<Error>() else {
panic!("unexpected error type");
};
assert_matches!(err, Error::ColumnNotFound { .. });
}
#[tokio::test]
async fn test_bulk_insert_multiple_tag_columns() {
let env = TestEnv::new().await;
let physical_region_id = env.default_physical_region_id();
env.create_physical_region(physical_region_id, &TestEnv::default_table_dir(), vec![])
.await;
let logical_region_id = env.default_logical_region_id();
let request = test_util::create_logical_region_request(
&["host", "region"],
physical_region_id,
&table_dir("test", logical_region_id.table_id()),
);
env.metric()
.handle_request(logical_region_id, RegionRequest::Create(request))
.await
.unwrap();
let schema = Arc::new(ArrowSchema::new(vec![
Field::new(
greptime_timestamp(),
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(greptime_value(), DataType::Float64, true),
Field::new("host", DataType::Utf8, true),
Field::new("region", DataType::Utf8, true),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(TimestampMillisecondArray::from(vec![0i64, 1, 2])),
Arc::new(Float64Array::from(vec![10.0, 20.0, 30.0])),
Arc::new(StringArray::from(vec!["h1", "h2", "h1"])),
Arc::new(StringArray::from(vec!["us-east", "us-west", "eu-west"])),
],
)
.unwrap();
let request = build_bulk_request(logical_region_id, batch);
let response = env
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap();
assert_eq!(response.affected_rows, 3);
let stream = env
.metric()
.scan_to_stream(logical_region_id, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 3);
}
#[tokio::test]
async fn test_bulk_insert_accumulates_rows() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let logical_region_id = env.default_logical_region_id();
let request = build_bulk_request(logical_region_id, build_logical_batch(0, 3));
let response = env
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap();
assert_eq!(response.affected_rows, 3);
let request = build_bulk_request(logical_region_id, build_logical_batch(3, 5));
let response = env
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap();
assert_eq!(response.affected_rows, 5);
let stream = env
.metric()
.scan_to_stream(logical_region_id, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 8);
}
#[tokio::test]
async fn test_bulk_insert_sparse_encoding() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let logical_region_id = env.default_logical_region_id();
let request = build_bulk_request(logical_region_id, build_logical_batch(0, 4));
let response = env
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap();
assert_eq!(response.affected_rows, 4);
let stream = env
.metric()
.scan_to_stream(logical_region_id, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 4);
}
#[tokio::test]
async fn test_bulk_insert_dense_encoding_rejected() {
let env = TestEnv::new().await;
let logical_region_id = init_dense_metric_region(&env).await;
let request = build_bulk_request(logical_region_id, build_logical_batch(0, 2));
let err = env
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap_err();
let Some(err) = err.as_any().downcast_ref::<Error>() else {
panic!("unexpected error type");
};
assert_matches!(err, Error::UnsupportedRegionRequest { .. });
}
#[tokio::test]
async fn test_bulk_insert_matches_put() {
let env_put = TestEnv::new().await;
env_put.init_metric_region().await;
let logical_region_id = env_put.default_logical_region_id();
let schema = test_util::row_schema_with_tags(&["job"]);
let rows = test_util::build_rows(1, 5);
env_put
.metric()
.handle_request(
logical_region_id,
RegionRequest::Put(RegionPutRequest {
rows: api::v1::Rows { schema, rows },
hint: None,
partition_expr_version: None,
}),
)
.await
.unwrap();
let put_stream = env_put
.metric()
.scan_to_stream(logical_region_id, ScanRequest::default())
.await
.unwrap();
let put_batches = RecordBatches::try_collect(put_stream).await.unwrap();
let put_output = put_batches.pretty_print().unwrap();
let env_bulk = TestEnv::new().await;
env_bulk.init_metric_region().await;
let request = build_bulk_request(logical_region_id, build_logical_batch(0, 5));
env_bulk
.metric()
.handle_request(logical_region_id, request)
.await
.unwrap();
let bulk_stream = env_bulk
.metric()
.scan_to_stream(logical_region_id, ScanRequest::default())
.await
.unwrap();
let bulk_batches = RecordBatches::try_collect(bulk_stream).await.unwrap();
let bulk_output = bulk_batches.pretty_print().unwrap();
assert_eq!(put_output, bulk_output);
}
#[test]
fn test_record_batch_to_rows_with_null_values() {
use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use store_api::storage::RegionId;
use crate::engine::bulk_insert::record_batch_to_rows;
let schema = Arc::new(ArrowSchema::new(vec![
Field::new(
greptime_timestamp(),
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
Field::new(greptime_value(), DataType::Float64, true),
Field::new("job", DataType::Utf8, true),
Field::new("host", DataType::Utf8, true),
]));
let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]);
let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]);
let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]);
let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]);
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(ts_array),
Arc::new(val_array),
Arc::new(job_array),
Arc::new(host_array),
],
)
.unwrap();
let region_id = RegionId::new(1, 1);
let rows = record_batch_to_rows(&batch, region_id).unwrap();
assert_eq!(rows.rows.len(), 3);
assert_eq!(rows.schema.len(), 4);
// Row 0: all non-null except host
assert!(rows.rows[0].values[0].value_data.is_some());
assert!(rows.rows[0].values[1].value_data.is_some());
assert!(rows.rows[0].values[2].value_data.is_some());
assert!(rows.rows[0].values[3].value_data.is_none());
// Row 1: null timestamp, null job
assert!(rows.rows[1].values[0].value_data.is_none());
assert!(rows.rows[1].values[1].value_data.is_some());
assert!(rows.rows[1].values[2].value_data.is_none());
assert!(rows.rows[1].values[3].value_data.is_some());
// Row 2: null value
assert!(rows.rows[2].values[0].value_data.is_some());
assert!(rows.rows[2].values[1].value_data.is_none());
assert!(rows.rows[2].values[2].value_data.is_some());
assert!(rows.rows[2].values[3].value_data.is_some());
}
}

View File

@@ -460,7 +460,7 @@ impl MetricEngineInner {
.await
}
fn find_data_region_meta(
pub(crate) fn find_data_region_meta(
&self,
logical_region_id: RegionId,
) -> Result<(RegionId, RegionId, PrimaryKeyEncoding)> {

View File

@@ -52,6 +52,7 @@
#![feature(assert_matches)]
mod batch_modifier;
pub mod config;
mod data_region;
pub mod engine;