refactor(metric-engine): Refactor PendingRowsBatcher for better testability and benchmarking (#7902)

* perf/schema-align:
 **Refactor and Enhance Error Handling in `pending_rows_batcher.rs`**

 - **Refactored `record_failure` Macro**: Moved the `record_failure` macro outside of the `flush_batch_physical` function to improve code reuse and maintainability.
 - **Enhanced Batch Transformation**: Introduced `transform_logical_batches_to_physical` function to handle the transformation of logical table batches into physical format.
 - **Batch Concatenation**: Added `concat_modified_batches` function to concatenate modified batches into a single batch.
 - **Region Write Splitting**: Implemented `split_and_encode_region_writes` function to split combined batches into region-specific writes based on partition rules.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf/schema-align:
 Add tests for `transform_logical_batches_to_physical` in `pending_rows_batcher.rs`

 - Implemented `mock_tag_batch` function to create mock `RecordBatch` instances for testing.
 - Added multiple test cases for `transform_logical_batches_to_physical`:
   - `test_transform_logical_batches_to_physical_success`: Verifies successful transformation of logical to physical batches.
   - `test_transform_logical_batches_to_physical_taxonomy_failure`: Tests failure scenario when column IDs are missing.
   - `test_transform_logical_batches_to_physical_multiple_batches`: Checks handling of multiple batches.
   - `test_transform_logical_batches_to_physical_mixed_success_failure`: Tests mixed success and failure scenarios.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf/schema-align: refactor `flush_batch_physical` for better testability

Introduced several traits to abstract dependencies on CatalogManager, PartitionRuleManager,
and NodeManager, enabling easier unit testing with mock implementations.

- Added `PhysicalFlushCatalogProvider`, `PhysicalFlushPartitionProvider`, and `PhysicalFlushNodeRequester` traits.
- Implemented adapters for existing managers to satisfy the new traits.
- Refactored `flush_batch_physical` to use these traits instead of concrete manager references.
- Modularized region write planning, resolution, and encoding into standalone functions.
- Added comprehensive unit tests for the refactored logic, including edge cases for table lookup and region routing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf/schema-align:
 ### Enhance Error Handling and Simplify Code in `error.rs` and `pending_rows_batcher.rs`

 - **Error Handling Improvements**:
   - Added new error variants `Partition` and `MetricEngine` in `error.rs` to handle specific error cases.
   - Updated error propagation using `ResultExt` and `context` for better error messages and handling in `pending_rows_batcher.rs`.

 - **Code Simplification**:
   - Removed `FlushWriteResult` enum and refactored `flush_region_writes_concurrently` to return `Result<()>`.
   - Simplified error handling in `flush_batch_physical` and related functions by removing `first_error` and using `Result` for error propagation.

 - **Test Adjustments**:
   - Updated tests to align with the new error handling approach, ensuring they check for specific error messages and conditions.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf/schema-align: refactor `PendingBatch` to use `Option` for cleaner state management

Refactored `PendingBatch` in `pending_rows_batcher.rs` to use `Option<PendingBatch>`
within the worker loop. This change simplifies initialization and cleanup logic
by leveraging `Option::get_or_insert_with` and `Option::take`.

- Updated `PendingBatch` fields `created_at` and `ctx` to be non-optional.
- Modified `drain_batch` to take `&mut Option<PendingBatch>` and return the
  drained batch, removing the need for `flush_with_error`.
- Simplified the worker loop logic for batch creation and flushing.
- Added a unit test `test_drain_batch_takes_initialized_pending_batch_from_option`
  to verify the new draining logic.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf/schema-align: share errors across waiters using `Arc<Error>`

Enhanced error reporting in `PendingRowsBatcher` by using `Arc<Error>` in
`FlushWaiter` and `WorkerCommand`. This allows the same error instance to be
shared among all waiters of a batch, avoiding redundant error string conversions
and providing more structured error information.

- Added `SubmitBatch` variant to `Error` in `error.rs`.
- Updated `FlushWaiter` and `WorkerCommand` to use `std::result::Result<(), Arc<Error>>`.
- Refactored `notify_waiters` to distribute the shared `Arc<Error>`.
- Added `SubmitBatchSnafu` context when receiving results from the worker.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf/schema-align: export types for benchmarking

Exported several internal types and traits from `pending_rows_batcher.rs` to enable
external benchmarking of the physical batch flushing logic.

- Made `PhysicalTableMetadata`, `PhysicalFlushCatalogProvider`,
  `PhysicalFlushPartitionProvider`, `PhysicalFlushNodeRequester`,
  `TableBatch`, and `flush_batch_physical` public.
- Added a new criterion benchmark `flush_batch_physical.rs` to measure the
  performance of physical batch flushing with varying numbers of logical
  tables and rows per table.
- Registered the new benchmark in `src/servers/Cargo.toml`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: typo

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor(servers): improve error handling and documentation in batcher

Refactored error handling in `pending_rows_batcher.rs` by using `ArrowSnafu`
for RecordBatch projection errors and simplified partition rule fetching.
Added comprehensive documentation for `flush_batch_physical` and updated
error display for `SubmitBatch`.

- Added `Location` to `Arrow` error variant for better traceability.
- Updated `SubmitBatch` display to include source error.
- Replaced manual error mapping with `context(error::ArrowSnafu)` in
  `strip_partition_columns_from_batch`.
- Added doc comments to `flush_batch_physical` outlining the pipeline steps.
- Optimized capacity allocation in `transform_logical_batches_to_physical`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor(servers): clarify physical table metadata and simplify planned batch

Renamed `name_to_ids` to `col_name_to_ids` in `PhysicalTableMetadata` to
better reflect its purpose. Refactored `PlannedRegionBatch` to use a
`num_rows()` method instead of storing a redundant `row_count` field.

- Updated `PhysicalTableMetadata` and its usages in `pending_rows_batcher.rs`
  and benchmarks.
- Removed `row_count` field from `PlannedRegionBatch` and added a `num_rows()`
  helper.
- Cleaned up manual `with_context` closures for table lookups.
- Fixed a minor formatting issue in worker command processing.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* refactor(servers): simplify flush write structs and centralize metrics

Removed redundant `row_count` fields from `FlushRegionWrite` and
`PlannedRegionBatch` (made the helper method test-only). Centralized the
incrementing of `FLUSH_TOTAL` and `FLUSH_ROWS` metrics into `flush_batch`
to avoid duplication and ensure consistency.

- Removed `row_count` from `FlushRegionWrite` and `PlannedRegionBatch`.
- Marked `PlannedRegionBatch::num_rows()` as `#[cfg(test)]`.
- Updated `flush_batch` to handle `FLUSH_TOTAL` and `FLUSH_ROWS` metrics.
- Simplified concurrent and sequential flush logic by removing local metric
  updates.
- Cleaned up related tests to match the structural changes.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2026-04-03 10:56:18 +08:00
committed by GitHub
parent f0ea87f52f
commit a424ee1c0a
4 changed files with 1255 additions and 409 deletions

View File

@@ -178,3 +178,7 @@ harness = false
[[bench]]
name = "loki_labels"
harness = false
[[bench]]
name = "flush_batch_physical"
harness = false

View File

@@ -0,0 +1,289 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::region::RegionResponse;
use api::v1::meta::Peer;
use api::v1::region::RegionRequest;
use arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use catalog::error::Result as CatalogResult;
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema as DtColumnSchema, Schema as DtSchema};
use partition::error::Result as PartitionResult;
use partition::partition::{PartitionRule, PartitionRuleRef, RegionMask};
use servers::error::{self, Result};
use servers::pending_rows_batcher::{
PhysicalFlushCatalogProvider, PhysicalFlushNodeRequester, PhysicalFlushPartitionProvider,
PhysicalTableMetadata, TableBatch, flush_batch_physical,
};
use store_api::storage::RegionId;
use table::test_util::table_info::test_table_info;
use tokio::runtime::Runtime;
// ---------------------------------------------------------------------------
// Mock implementations (memory-backed, no I/O)
// ---------------------------------------------------------------------------
struct BenchCatalogProvider {
table: PhysicalTableMetadata,
}
#[async_trait]
impl PhysicalFlushCatalogProvider for BenchCatalogProvider {
async fn physical_table(
&self,
_catalog: &str,
_schema: &str,
_table_name: &str,
_query_ctx: &session::context::QueryContext,
) -> CatalogResult<Option<PhysicalTableMetadata>> {
Ok(Some(self.table.clone()))
}
}
struct BenchPartitionProvider;
struct SingleRegionPartitionRule;
impl PartitionRule for SingleRegionPartitionRule {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn partition_columns(&self) -> &[String] {
&[]
}
fn find_region(
&self,
_values: &[datatypes::prelude::Value],
) -> PartitionResult<store_api::storage::RegionNumber> {
Ok(1)
}
fn split_record_batch(
&self,
record_batch: &RecordBatch,
) -> PartitionResult<HashMap<store_api::storage::RegionNumber, RegionMask>> {
let n = record_batch.num_rows();
Ok(HashMap::from([(
1,
RegionMask::new(arrow::array::BooleanArray::from(vec![true; n]), n),
)]))
}
}
#[async_trait]
impl PhysicalFlushPartitionProvider for BenchPartitionProvider {
async fn find_table_partition_rule(
&self,
_table_info: &table::metadata::TableInfo,
) -> PartitionResult<PartitionRuleRef> {
Ok(Arc::new(SingleRegionPartitionRule))
}
async fn find_region_leader(&self, _region_id: RegionId) -> Result<Peer> {
Ok(Peer {
id: 1,
addr: "bench-node".to_string(),
})
}
}
struct BenchNodeRequester;
#[async_trait]
impl PhysicalFlushNodeRequester for BenchNodeRequester {
async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> error::Result<RegionResponse> {
Ok(RegionResponse::new(0))
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
fn make_physical_table_metadata(num_tags: usize) -> PhysicalTableMetadata {
let mut columns = vec![
DtColumnSchema::new("__primary_key", ConcreteDataType::binary_datatype(), false),
DtColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
DtColumnSchema::new("greptime_value", ConcreteDataType::float64_datatype(), true),
];
let mut name_to_ids = HashMap::new();
let mut column_ids = vec![0u32, 1, 2];
for i in 0..num_tags {
let tag_name = format!("tag{}", i);
let col_id = (i + 3) as u32;
columns.push(DtColumnSchema::new(
&tag_name,
ConcreteDataType::string_datatype(),
true,
));
name_to_ids.insert(tag_name, col_id);
column_ids.push(col_id);
}
let schema = Arc::new(DtSchema::try_new(columns).unwrap());
let mut table_info = test_table_info(1, "phy", "public", "greptime", schema);
table_info.meta.column_ids = column_ids;
PhysicalTableMetadata {
table_info: Arc::new(table_info),
col_name_to_ids: Some(name_to_ids),
}
}
fn make_tag_batch(tag_names: &[&str], num_rows: usize) -> RecordBatch {
let mut fields = vec![
Field::new(
"greptime_timestamp",
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("greptime_value", ArrowDataType::Float64, true),
];
for tag in tag_names {
fields.push(Field::new(*tag, ArrowDataType::Utf8, true));
}
let schema = Arc::new(ArrowSchema::new(fields));
let ts: Vec<i64> = (0..num_rows as i64).collect();
let vals: Vec<f64> = (0..num_rows).map(|i| i as f64).collect();
let mut arrays: Vec<Arc<dyn arrow::array::Array>> = vec![
Arc::new(TimestampMillisecondArray::from(ts)),
Arc::new(Float64Array::from(vals)),
];
for (tag_idx, _tag) in tag_names.iter().enumerate() {
let values: Vec<String> = (0..num_rows)
.map(|i| format!("val-{}-{}", tag_idx, i))
.collect();
arrays.push(Arc::new(StringArray::from(values)));
}
RecordBatch::try_new(schema, arrays).unwrap()
}
fn make_table_batches(
num_logical_tables: usize,
rows_per_table: usize,
tag_names: &[&str],
) -> Vec<TableBatch> {
(0..num_logical_tables)
.map(|i| {
let batch = make_tag_batch(tag_names, rows_per_table);
let row_count = batch.num_rows();
TableBatch {
table_name: format!("logical_{}", i),
table_id: (100 + i) as u32,
batches: vec![batch],
row_count,
}
})
.collect()
}
// ---------------------------------------------------------------------------
// Benchmarks
// ---------------------------------------------------------------------------
fn bench_flush_batch_physical(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let ctx = session::context::QueryContext::arc();
let num_tags = 5;
let tag_names: Vec<String> = (0..num_tags).map(|i| format!("tag{}", i)).collect();
let tag_refs: Vec<&str> = tag_names.iter().map(|s| s.as_str()).collect();
let catalog_provider = BenchCatalogProvider {
table: make_physical_table_metadata(num_tags),
};
let partition_provider = BenchPartitionProvider;
let node_requester = BenchNodeRequester;
let mut group = c.benchmark_group("flush_batch_physical");
// Vary the number of logical tables
for num_tables in [1, 10, 50, 100] {
let rows_per_table = 100;
let table_batches = make_table_batches(num_tables, rows_per_table, &tag_refs);
group.bench_with_input(
BenchmarkId::new("tables", num_tables),
&table_batches,
|b, batches| {
b.iter(|| {
rt.block_on(async {
flush_batch_physical(
batches,
"phy",
&ctx,
&partition_provider,
&node_requester,
&catalog_provider,
)
.await
.unwrap();
});
});
},
);
}
// Vary the number of rows per table
for rows_per_table in [10, 100, 1000, 5000] {
let num_tables = 10;
let table_batches = make_table_batches(num_tables, rows_per_table, &tag_refs);
group.bench_with_input(
BenchmarkId::new("rows_per_table", rows_per_table),
&table_batches,
|b, batches| {
b.iter(|| {
rt.block_on(async {
flush_batch_physical(
batches,
"phy",
&ctx,
&partition_provider,
&node_requester,
&catalog_provider,
)
.await
.unwrap();
});
});
},
);
}
group.finish();
}
criterion_group!(benches, bench_flush_batch_physical);
criterion_main!(benches);

View File

@@ -15,6 +15,7 @@
use std::any::Any;
use std::net::SocketAddr;
use std::string::FromUtf8Error;
use std::sync::Arc;
use axum::http::StatusCode as HttpStatusCode;
use axum::response::{IntoResponse, Response};
@@ -51,6 +52,8 @@ pub enum Error {
Arrow {
#[snafu(source)]
error: arrow_schema::ArrowError,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Internal error: {}", err_msg))]
@@ -685,6 +688,23 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
Partition {
source: partition::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(transparent)]
MetricEngine {
source: metric_engine::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to submit batch: {}", source))]
SubmitBatch { source: Arc<Error> },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -818,6 +838,9 @@ impl ErrorExt for Error {
MemoryLimitExceeded { .. } => StatusCode::RateLimited,
GreptimeProto { source, .. } => source.status_code(),
Partition { source, .. } => source.status_code(),
MetricEngine { source, .. } => source.status_code(),
SubmitBatch { source, .. } => source.status_code(),
}
}

File diff suppressed because it is too large Load Diff