Files
greptimedb/src/operator
Lei, HUANG 2b4e12c358 feat: auto-align Prometheus schemas in pending rows batching (#7877)
* feat/auto-schema-align:
 - **Error Handling Improvements**:
   - Removed `CatalogSnafu` context from various `.await` calls in `dashboard.rs`, `influxdb.rs`, `jaeger.rs`, `prometheus.rs`, `event.rs`, and `pipeline.rs` to streamline error handling.

 - **Prometheus Store Enhancements**:
   - Added support for auto-creating tables and adding missing Prometheus tag columns in `prom_store.rs` and `pending_rows_batcher.rs`.
   - Introduced `PendingRowsSchemaAlterer` trait for schema alterations in `pending_rows_batcher.rs`.

 - **Test Additions**:
   - Added tests for new Prometheus store functionalities in `prom_store.rs` and `pending_rows_batcher.rs`.

 - **Error Message Improvements**:
   - Enhanced error messages for catalog access in `error.rs`.

 - **Server Configuration Updates**:
   - Updated server configuration to include Prometheus store options in `server.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* reformat

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Add DataTypes Error Handling and Column Renaming Logic

 - **`error.rs`**: Introduced a new `DataTypes` error variant to handle errors from `datatypes::error::Error`. Updated `ErrorExt` implementation to include `DataTypes`.
 - **`pending_rows_batcher.rs`**: Added functions `find_prom_special_column_names` and `rename_prom_special_columns_for_existing_schema` to handle renaming of special Prometheus columns. Updated `build_prom_create_table_schema` to simplify error handling with
 `ConcreteDataType`.
 - **Tests**: Added a test case `test_rename_prom_special_columns_for_existing_schema` to verify the renaming logic for Prometheus special columns.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 - Refactored `PendingRowsBatcher` to accommodate Prometheus record batches:
   - Introduced `accommodate_record_batch_for_target_schema` to normalize incoming record batches against existing table schemas.
   - Removed `collect_missing_prom_tag_columns` and `rename_prom_special_columns_for_existing_schema` in favor of the new function.
   - Added `unzip_logical_region_schema` to extract schema components.
 - Updated tests in `pending_rows_batcher.rs`:
   - Added tests for `accommodate_record_batch_for_target_schema` to verify handling of missing tag columns and renaming of special columns.
   - Ensured error handling for missing timestamp and field columns in target schema.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Commit Summary

 - **Enhancement in Table Creation Logic**: Updated `prom_store.rs` to modify the handling of `table_options` during table creation. Specifically, `table_options` are now extended differently based on the `AutoCreateTableType`. For `Physical` tables, enforced
 `sst_format=flat` to optimize pending-rows writes by leveraging bulk memtables.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 Enhance Performance Monitoring in `pending_rows_batcher.rs`

 - Added performance monitoring timers to various stages of the `PendingRowsBatcher` process, including schema cache checks, table resolution, schema creation, and record batch alignment.
 - Improved schema handling by adding timers around schema alteration and missing column addition processes.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 - **Enhance Concurrent Write Handling**: Introduced `FlushRegionWrite` and `FlushWriteResult` structs to manage region writes and their results. Added `flush_region_writes_concurrently` function to handle concurrent flushing of region writes based on
 `should_dispatch_concurrently` logic in `pending_rows_batcher.rs`.
 - **Testing Enhancements**: Added tests for concurrent dispatching of region writes and the logic for determining concurrent dispatch in `pending_rows_batcher.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Add Histogram for Flush Stage Elapsed Time

 - **`metrics.rs`**: Introduced a new `HistogramVec` named `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` to track the elapsed time of pending rows batch flush stages.
 - **`pending_rows_batcher.rs`**: Replaced instances of `PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED` with `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` to measure the elapsed time for various flush stages, including `flush_write_region`, `flush_concat_table_batches`,
 `flush_resolve_table`, `flush_fetch_partition_rule`, `flush_split_record_batch`, `flush_filter_record_batch`, `flush_resolve_region_leader`, and `flush_encode_ipc`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* Add design doc for physical table batching in PendingRowsBatcher

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* Add implementation plan for physical table batching in PendingRowsBatcher

* feat/auto-schema-align:
 ### Commit Message

 **Enhance Metric Engine with Physical Batch Processing**

 - **Add `metric-engine` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `metric-engine` as a workspace dependency.

 - **Expose Batch Modifier Functions**: Changed visibility of `TagColumnInfo`, `compute_tsid_array`, and `modify_batch_sparse` in `batch_modifier.rs` to public, and made `batch_modifier` a public module in `lib.rs`.

 - **Implement Physical Batch Processing**:
   - Added functions `bulk_insert_physical_region` and `bulk_insert_logical_region` in `bulk_insert.rs` to handle physical and logical batch insertions.
   - Updated `pending_rows_batcher.rs` to attempt physical batch processing before falling back to logical processing, including new functions `flush_batch_physical` and `flush_batch_per_logical_table`.

 - **Enhance Testing**:
   - Added tests for physical region passthrough and empty batch handling in `bulk_insert.rs`.
   - Introduced `with_mito_config` in `test_util.rs` for customized test environments.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Enhance Batch Processing for Table Creation and Alteration

 - **`prom_store.rs`**:
   - Added `create_tables_if_missing_batch` and
 `add_missing_prom_tag_columns_batch` methods to handle batch creation of tables
 and batch alteration to add missing tag columns.
   - Implemented logic to determine missing tables and columns, and perform batch
 operations accordingly.

 - **`pending_rows_batcher.rs`**:
   - Updated `PendingRowsBatcher` to utilize batch methods for creating tables an
 adding missing columns.
   - Enhanced logic to resolve table schemas and accommodate record batches after
 batch operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* perf: concurrent catalog lookups and eliminate redundant concat_batches on ingest path

Replace sequential catalog_manager.table() calls with concurrent
futures::future::join_all in align_table_batches_to_region_schema.
This affects all three lookup loops: initial table resolution,
post-create resolution, and post-alter schema refresh. Reduces
O(N) sequential RPC latency to O(1) wall-clock time for requests
with many distinct logical tables (e.g. Prometheus remote_write).

Remove the per-logical-table concat_batches in flush_batch_physical.
Instead of merging all chunks of a table into one RecordBatch before
calling modify_batch_sparse, apply modify_batch_sparse directly to
each chunk and collect all modified chunks for a single final concat.
This eliminates one full data copy per logical table on the flush path.

* refactor: extract Prometheus schema alignment helpers into prom_row_builder module

Move six functions and their eight unit tests from pending_rows_batcher.rs
(~2386 lines) into a new prom_row_builder.rs module (~776 lines), leaving
the batcher at ~1665 lines focused on flush/worker machinery.

Extracted functions:
- accommodate_record_batch_for_target_schema (normalize incoming batch
  against existing table schema)
- unzip_logical_region_schema (extract ts/field/tag columns)
- build_prom_create_table_schema (build ColumnSchema vec for table creation)
- align_record_batch_to_schema (reorder/fill/cast columns to target schema)
- rows_to_record_batch (convert proto Rows to Arrow RecordBatch)
- build_arrow_array (build Arrow arrays from proto values)

Cleaned up 12 now-unused imports from pending_rows_batcher.rs.

* feat/auto-schema-align:
 ### Enhance `PendingRowsBatcher` and `prom_row_builder` for Efficient Schema Handling

 - **`pending_rows_batcher.rs`:**
   - Refactored `submit` method to integrate table batch building and alignment into a single method `build_and_align_table_batches`.
   - Removed intermediate `RecordBatch` creation, optimizing the process by directly converting proto `RowInsertRequests` into aligned `RecordBatch`es.
   - Enhanced schema handling by identifying missing columns directly from proto schemas.

 - **`prom_row_builder.rs`:**
   - Introduced `rows_to_aligned_record_batch` for direct conversion of proto `Rows` into aligned `RecordBatch`es.
   - Added `identify_missing_columns_from_proto` to detect absent tag columns without intermediate `RecordBatch`.
   - Implemented `build_prom_create_table_schema_from_proto` to construct table schemas directly from proto schemas.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 Add elapsed time metrics for bulk insert operations

 - Updated `bulk_insert` method in `bulk_insert.rs` to record elapsed time metrics using `MITO_OPERATION_ELAPSED` for both physical and logical regions.
 - Added a new test `test_bulk_insert_records_elapsed_metric` to verify that the elapsed time metric is recorded correctly during bulk insert operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* remove flush per logical region

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 **Refactor `flush_batch` and `flush_batch_physical` functions**

 - Removed unused `catalog` and `schema` variables from `flush_batch` in `pending_rows_batcher.rs`.
 - Updated `flush_batch_physical` to directly use `ctx.current_catalog()` and `ctx.current_schema()` for resolving table names.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Remove Unused Function and Associated Test

 - **File:** `src/servers/src/prom_row_builder.rs`
   - Removed the unused function `build_prom_create_table_schema` which was responsible for building a `Vec<ColumnSchema>` from an Arrow schema.
   - Deleted the associated test `test_build_prom_create_table_schema_from_request_schema` that validated the removed function.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 - **Remove Test**: Deleted the `test_bulk_insert_records_elapsed_metric` test from `bulk_insert.rs`.
 - **Refactor Table Resolution**: Introduced `TableResolutionPlan` struct and refactored table resolution logic in `pending_rows_batcher.rs`.
 - **Enhance Table Handling**: Added functions for collecting non-empty table rows, unique table schemas, and handling table creation and alteration in `pending_rows_batcher.rs`.
 - **Add Tests**: Implemented tests for `collect_non_empty_table_rows` and `collect_unique_table_schemas` in `pending_rows_batcher.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 - **Refactor Error Handling**: Updated error handling in `pending_rows_batcher.rs` and `prom_row_builder.rs` to use `Snafu` error context for more descriptive error messages.
 - **Remove Unused Functionality**: Eliminated the `rows_to_record_batch` function and related test in `prom_row_builder.rs` as it was redundant.
 - **Simplify Function Return Types**: Modified `rows_to_aligned_record_batch` in `prom_row_builder.rs` to return only `RecordBatch` without missing columns, simplifying the function's interface and related tests.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Add Helper Function for Table Options in `prom_store.rs`

 - Introduced `fill_metric_physical_table_options` function to encapsulate logic for setting table options, ensuring the use of flat SST format and physical table metadata.
 - Updated `Instance` implementation to utilize the new helper function for setting table options.
 - Added a unit test `test_metric_physical_table_options_forces_flat_sst_format` to verify the correct application of table options.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 - **Refactor `PendingRowsBatcher`**: Simplified worker retrieval logic in `get_or_spawn_worker` method by using a more concise conditional check.
 - **Metrics Update**: Added `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` metric in `pending_rows_batcher.rs`.
 - **Remove Unused Code**: Deleted multiple test functions related to record batch alignment and schema preparation in `pending_rows_batcher.rs` and `prom_row_builder.rs`.
 - **Function Visibility Change**: Made `build_prom_create_table_schema_from_proto` public in `prom_row_builder.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore: remove plan

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Refactor and Simplify Schema Alteration Logic

 - **Removed Unused Methods**: Deleted `create_table_if_missing` and `add_missing_prom_tag_columns` methods from `PendingRowsSchemaAlterer` trait in `prom_store.rs` and `pending_rows_batcher.rs`.
 - **Error Handling Improvement**: Enhanced error handling in `create_tables_if_missing_batch` method to return a specific error message for unsupported `AutoCreateTableType` in `prom_store.rs`.
 - **Visibility Change**: Made `as_str` method public in `AutoCreateTableType` enum in `insert.rs` to support external access.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Commit Message

 Improve safety in `prom_row_builder.rs`

 - Updated `unzip_logical_region_schema` to use `saturating_sub` for safer capacity calculation of `tag_columns`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 Add TODO comments for future improvements in `pending_rows_batcher.rs`

 - Added a TODO comment to consider bounding the `flush_region_writes_concurrently` function.
 - Added a TODO comment to potentially limit the maximum rows to concatenate in the `flush_batch_physical` function.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Commit Message

 Enhance error handling in `pending_rows_batcher.rs`

 - Updated `collect_unique_table_schemas` to return a `Result` type, enabling error handling for duplicate table names.
 - Modified the function to return an error when duplicate table names are found in `table_rows`.
 - Adjusted test cases to handle the new `Result` return type in `collect_unique_table_schemas`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 - **Refactor `partition_columns` Method**: Updated the `partition_columns` method in `multi_dim.rs`, `partition.rs`, and `splitter.rs` to return a slice reference instead of a cloned vector, improving performance by avoiding unnecessary cloning.
 - **Enhance Partition Handling**: Added functions `collect_tag_columns_and_non_tag_indices` and `strip_partition_columns_from_batch` in `pending_rows_batcher.rs` to manage partition columns more efficiently, including stripping partition columns from record batches.
 - **Update Tests**: Modified existing tests and added new ones in `pending_rows_batcher.rs` to verify the functionality of partition column handling, ensuring correct behavior of the new methods.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Enhance Schema Handling and Validation in `pending_rows_batcher.rs`

 - **Schema Validation Enhancements**:
   - Added checks for essential columns (`timestamp`, `value`) in `collect_tag_columns_and_non_tag_indices`.
   - Introduced `PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT` to ensure minimum column count in `strip_partition_columns_from_batch`.
   - Improved error handling for unexpected data types and duplicated columns.

 - **Function Modifications**:
   - Updated `strip_partition_columns_from_batch` to project essential columns without lookup.
   - Modified `flush_batch_physical` to use `essential_col_indices` instead of `non_tag_indices`.

 - **Test Enhancements**:
   - Added tests for schema validation, including checks for unexpected data types and duplicated columns.
   - Verified correct projection of essential columns in `strip_partition_columns_from_batch`.

 Files affected: `pending_rows_batcher.rs`, `tests`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 - **Add `smallvec` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `smallvec` as a workspace dependency.
 - **Refactor Function**: Renamed `collect_tag_columns_and_non_tag_indices` to `columns_taxonomy` in `pending_rows_batcher.rs` and updated its return type to use `SmallVec`.
 - **Update Tests**: Modified test cases in `pending_rows_batcher.rs` to reflect changes in function name and return type.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 **Refactor `pending_rows_batcher.rs` to Simplify Table ID Handling**

 - Updated `TableBatch` struct to use `TableId` directly instead of `Option<u32>` for `table_id`.
 - Simplified logic in `flush_batch_physical` by removing the check for `None` in `table_id`.
 - Adjusted related logic in `start_worker` to accommodate the change in `table_id` handling.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Enhance Batch Processing Logic

 - **`pending_rows_batcher.rs`**:
   - Moved column taxonomy resolution inside the loop to handle schema variations across batches.
   - Added checks to skip processing if both tag columns and essential column indices are empty.

 - **Tests**:
   - Added `test_modify_batch_sparse_with_taxonomy_per_batch` to verify batch modification logic with varying schemas.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Remove Primary Key Column Check in `pending_rows_batcher.rs`

 - Removed the check for the primary key column and other essential column names in the function `strip_partition_columns_from_batch` within `pending_rows_batcher.rs`.
 - Simplified the logic by eliminating the validation of column order against expected essential names.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Refactor error handling and iteration in `otlp.rs` and `pending_rows_batcher.rs`

 - **`otlp.rs`**: Simplified error handling by removing `CatalogSnafu` context when awaiting table retrieval.
 - **`pending_rows_batcher.rs`**: Streamlined iteration over tables by removing unnecessary `into_iter()` calls, improving code readability and efficiency.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore/metrics-for-bulk:
 Add timing metrics for batch processing in `pending_rows_batcher.rs`

 - Introduced `modify_elapsed` and `columns_taxonomy_elapsed` to measure time spent in `modify_batch_sparse` and `columns_taxonomy` functions.
 - Updated `flush_batch_physical` to record these metrics using `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Commit Summary

 - **Remove Unused Code**: Eliminated the `#[allow(dead_code)]` attribute from the `compute_tsid_array` function in `batch_modifier.rs`.
 - **Error Handling Improvement**: Enhanced error handling in `flush_batch_physical` function by adjusting the `match` block in `pending_rows_batcher.rs`.
 - **Simplify Logic**: Streamlined the logic in `rows_to_aligned_record_batch` by removing unnecessary type casting in `prom_row_builder.rs`.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 **Refactor `flush_batch_physical` in `pending_rows_batcher.rs`:**

 - Moved partition column stripping logic to a single location before processing region batches.
 - Updated the use of `combined_batch` to `stripped_batch` for consistency in batch processing.
 - Removed redundant partition column stripping logic within the region batch loop.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* feat/auto-schema-align:
 ### Update `batch_modifier.rs` Documentation and Parameter Naming

 - Enhanced documentation for `compute_tsid_array` and `modify_batch_sparse` functions to clarify their logic and parameters.
 - Renamed parameter `non_tag_column_indices` to `extra_column_indices` in `modify_batch_sparse` for better clarity.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2026-04-01 02:45:26 +00:00
..