From 2b4e12c358818ef0829cc524aa56bbe38cc37980 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Wed, 1 Apr 2026 10:45:26 +0800 Subject: [PATCH] feat: auto-align Prometheus schemas in pending rows batching (#7877) * feat/auto-schema-align: - **Error Handling Improvements**: - Removed `CatalogSnafu` context from various `.await` calls in `dashboard.rs`, `influxdb.rs`, `jaeger.rs`, `prometheus.rs`, `event.rs`, and `pipeline.rs` to streamline error handling. - **Prometheus Store Enhancements**: - Added support for auto-creating tables and adding missing Prometheus tag columns in `prom_store.rs` and `pending_rows_batcher.rs`. - Introduced `PendingRowsSchemaAlterer` trait for schema alterations in `pending_rows_batcher.rs`. - **Test Additions**: - Added tests for new Prometheus store functionalities in `prom_store.rs` and `pending_rows_batcher.rs`. - **Error Message Improvements**: - Enhanced error messages for catalog access in `error.rs`. - **Server Configuration Updates**: - Updated server configuration to include Prometheus store options in `server.rs`. Signed-off-by: Lei, HUANG * reformat Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Add DataTypes Error Handling and Column Renaming Logic - **`error.rs`**: Introduced a new `DataTypes` error variant to handle errors from `datatypes::error::Error`. Updated `ErrorExt` implementation to include `DataTypes`. - **`pending_rows_batcher.rs`**: Added functions `find_prom_special_column_names` and `rename_prom_special_columns_for_existing_schema` to handle renaming of special Prometheus columns. Updated `build_prom_create_table_schema` to simplify error handling with `ConcreteDataType`. - **Tests**: Added a test case `test_rename_prom_special_columns_for_existing_schema` to verify the renaming logic for Prometheus special columns. Signed-off-by: Lei, HUANG * feat/auto-schema-align: - Refactored `PendingRowsBatcher` to accommodate Prometheus record batches: - Introduced `accommodate_record_batch_for_target_schema` to normalize incoming record batches against existing table schemas. - Removed `collect_missing_prom_tag_columns` and `rename_prom_special_columns_for_existing_schema` in favor of the new function. - Added `unzip_logical_region_schema` to extract schema components. - Updated tests in `pending_rows_batcher.rs`: - Added tests for `accommodate_record_batch_for_target_schema` to verify handling of missing tag columns and renaming of special columns. - Ensured error handling for missing timestamp and field columns in target schema. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Commit Summary - **Enhancement in Table Creation Logic**: Updated `prom_store.rs` to modify the handling of `table_options` during table creation. Specifically, `table_options` are now extended differently based on the `AutoCreateTableType`. For `Physical` tables, enforced `sst_format=flat` to optimize pending-rows writes by leveraging bulk memtables. Signed-off-by: Lei, HUANG * feat/auto-schema-align: Enhance Performance Monitoring in `pending_rows_batcher.rs` - Added performance monitoring timers to various stages of the `PendingRowsBatcher` process, including schema cache checks, table resolution, schema creation, and record batch alignment. - Improved schema handling by adding timers around schema alteration and missing column addition processes. Signed-off-by: Lei, HUANG * feat/auto-schema-align: - **Enhance Concurrent Write Handling**: Introduced `FlushRegionWrite` and `FlushWriteResult` structs to manage region writes and their results. Added `flush_region_writes_concurrently` function to handle concurrent flushing of region writes based on `should_dispatch_concurrently` logic in `pending_rows_batcher.rs`. - **Testing Enhancements**: Added tests for concurrent dispatching of region writes and the logic for determining concurrent dispatch in `pending_rows_batcher.rs`. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Add Histogram for Flush Stage Elapsed Time - **`metrics.rs`**: Introduced a new `HistogramVec` named `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` to track the elapsed time of pending rows batch flush stages. - **`pending_rows_batcher.rs`**: Replaced instances of `PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED` with `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` to measure the elapsed time for various flush stages, including `flush_write_region`, `flush_concat_table_batches`, `flush_resolve_table`, `flush_fetch_partition_rule`, `flush_split_record_batch`, `flush_filter_record_batch`, `flush_resolve_region_leader`, and `flush_encode_ipc`. Signed-off-by: Lei, HUANG * Add design doc for physical table batching in PendingRowsBatcher Signed-off-by: Lei, HUANG * Add implementation plan for physical table batching in PendingRowsBatcher * feat/auto-schema-align: ### Commit Message **Enhance Metric Engine with Physical Batch Processing** - **Add `metric-engine` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `metric-engine` as a workspace dependency. - **Expose Batch Modifier Functions**: Changed visibility of `TagColumnInfo`, `compute_tsid_array`, and `modify_batch_sparse` in `batch_modifier.rs` to public, and made `batch_modifier` a public module in `lib.rs`. - **Implement Physical Batch Processing**: - Added functions `bulk_insert_physical_region` and `bulk_insert_logical_region` in `bulk_insert.rs` to handle physical and logical batch insertions. - Updated `pending_rows_batcher.rs` to attempt physical batch processing before falling back to logical processing, including new functions `flush_batch_physical` and `flush_batch_per_logical_table`. - **Enhance Testing**: - Added tests for physical region passthrough and empty batch handling in `bulk_insert.rs`. - Introduced `with_mito_config` in `test_util.rs` for customized test environments. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Enhance Batch Processing for Table Creation and Alteration - **`prom_store.rs`**: - Added `create_tables_if_missing_batch` and `add_missing_prom_tag_columns_batch` methods to handle batch creation of tables and batch alteration to add missing tag columns. - Implemented logic to determine missing tables and columns, and perform batch operations accordingly. - **`pending_rows_batcher.rs`**: - Updated `PendingRowsBatcher` to utilize batch methods for creating tables an adding missing columns. - Enhanced logic to resolve table schemas and accommodate record batches after batch operations. Signed-off-by: Lei, HUANG * perf: concurrent catalog lookups and eliminate redundant concat_batches on ingest path Replace sequential catalog_manager.table() calls with concurrent futures::future::join_all in align_table_batches_to_region_schema. This affects all three lookup loops: initial table resolution, post-create resolution, and post-alter schema refresh. Reduces O(N) sequential RPC latency to O(1) wall-clock time for requests with many distinct logical tables (e.g. Prometheus remote_write). Remove the per-logical-table concat_batches in flush_batch_physical. Instead of merging all chunks of a table into one RecordBatch before calling modify_batch_sparse, apply modify_batch_sparse directly to each chunk and collect all modified chunks for a single final concat. This eliminates one full data copy per logical table on the flush path. * refactor: extract Prometheus schema alignment helpers into prom_row_builder module Move six functions and their eight unit tests from pending_rows_batcher.rs (~2386 lines) into a new prom_row_builder.rs module (~776 lines), leaving the batcher at ~1665 lines focused on flush/worker machinery. Extracted functions: - accommodate_record_batch_for_target_schema (normalize incoming batch against existing table schema) - unzip_logical_region_schema (extract ts/field/tag columns) - build_prom_create_table_schema (build ColumnSchema vec for table creation) - align_record_batch_to_schema (reorder/fill/cast columns to target schema) - rows_to_record_batch (convert proto Rows to Arrow RecordBatch) - build_arrow_array (build Arrow arrays from proto values) Cleaned up 12 now-unused imports from pending_rows_batcher.rs. * feat/auto-schema-align: ### Enhance `PendingRowsBatcher` and `prom_row_builder` for Efficient Schema Handling - **`pending_rows_batcher.rs`:** - Refactored `submit` method to integrate table batch building and alignment into a single method `build_and_align_table_batches`. - Removed intermediate `RecordBatch` creation, optimizing the process by directly converting proto `RowInsertRequests` into aligned `RecordBatch`es. - Enhanced schema handling by identifying missing columns directly from proto schemas. - **`prom_row_builder.rs`:** - Introduced `rows_to_aligned_record_batch` for direct conversion of proto `Rows` into aligned `RecordBatch`es. - Added `identify_missing_columns_from_proto` to detect absent tag columns without intermediate `RecordBatch`. - Implemented `build_prom_create_table_schema_from_proto` to construct table schemas directly from proto schemas. Signed-off-by: Lei, HUANG * feat/auto-schema-align: Add elapsed time metrics for bulk insert operations - Updated `bulk_insert` method in `bulk_insert.rs` to record elapsed time metrics using `MITO_OPERATION_ELAPSED` for both physical and logical regions. - Added a new test `test_bulk_insert_records_elapsed_metric` to verify that the elapsed time metric is recorded correctly during bulk insert operations. Signed-off-by: Lei, HUANG * remove flush per logical region Signed-off-by: Lei, HUANG * feat/auto-schema-align: **Refactor `flush_batch` and `flush_batch_physical` functions** - Removed unused `catalog` and `schema` variables from `flush_batch` in `pending_rows_batcher.rs`. - Updated `flush_batch_physical` to directly use `ctx.current_catalog()` and `ctx.current_schema()` for resolving table names. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Remove Unused Function and Associated Test - **File:** `src/servers/src/prom_row_builder.rs` - Removed the unused function `build_prom_create_table_schema` which was responsible for building a `Vec` from an Arrow schema. - Deleted the associated test `test_build_prom_create_table_schema_from_request_schema` that validated the removed function. Signed-off-by: Lei, HUANG * feat/auto-schema-align: - **Remove Test**: Deleted the `test_bulk_insert_records_elapsed_metric` test from `bulk_insert.rs`. - **Refactor Table Resolution**: Introduced `TableResolutionPlan` struct and refactored table resolution logic in `pending_rows_batcher.rs`. - **Enhance Table Handling**: Added functions for collecting non-empty table rows, unique table schemas, and handling table creation and alteration in `pending_rows_batcher.rs`. - **Add Tests**: Implemented tests for `collect_non_empty_table_rows` and `collect_unique_table_schemas` in `pending_rows_batcher.rs`. Signed-off-by: Lei, HUANG * feat/auto-schema-align: - **Refactor Error Handling**: Updated error handling in `pending_rows_batcher.rs` and `prom_row_builder.rs` to use `Snafu` error context for more descriptive error messages. - **Remove Unused Functionality**: Eliminated the `rows_to_record_batch` function and related test in `prom_row_builder.rs` as it was redundant. - **Simplify Function Return Types**: Modified `rows_to_aligned_record_batch` in `prom_row_builder.rs` to return only `RecordBatch` without missing columns, simplifying the function's interface and related tests. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Add Helper Function for Table Options in `prom_store.rs` - Introduced `fill_metric_physical_table_options` function to encapsulate logic for setting table options, ensuring the use of flat SST format and physical table metadata. - Updated `Instance` implementation to utilize the new helper function for setting table options. - Added a unit test `test_metric_physical_table_options_forces_flat_sst_format` to verify the correct application of table options. Signed-off-by: Lei, HUANG * feat/auto-schema-align: - **Refactor `PendingRowsBatcher`**: Simplified worker retrieval logic in `get_or_spawn_worker` method by using a more concise conditional check. - **Metrics Update**: Added `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` metric in `pending_rows_batcher.rs`. - **Remove Unused Code**: Deleted multiple test functions related to record batch alignment and schema preparation in `pending_rows_batcher.rs` and `prom_row_builder.rs`. - **Function Visibility Change**: Made `build_prom_create_table_schema_from_proto` public in `prom_row_builder.rs`. Signed-off-by: Lei, HUANG * chore: remove plan Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Refactor and Simplify Schema Alteration Logic - **Removed Unused Methods**: Deleted `create_table_if_missing` and `add_missing_prom_tag_columns` methods from `PendingRowsSchemaAlterer` trait in `prom_store.rs` and `pending_rows_batcher.rs`. - **Error Handling Improvement**: Enhanced error handling in `create_tables_if_missing_batch` method to return a specific error message for unsupported `AutoCreateTableType` in `prom_store.rs`. - **Visibility Change**: Made `as_str` method public in `AutoCreateTableType` enum in `insert.rs` to support external access. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Commit Message Improve safety in `prom_row_builder.rs` - Updated `unzip_logical_region_schema` to use `saturating_sub` for safer capacity calculation of `tag_columns`. Signed-off-by: Lei, HUANG * feat/auto-schema-align: Add TODO comments for future improvements in `pending_rows_batcher.rs` - Added a TODO comment to consider bounding the `flush_region_writes_concurrently` function. - Added a TODO comment to potentially limit the maximum rows to concatenate in the `flush_batch_physical` function. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Commit Message Enhance error handling in `pending_rows_batcher.rs` - Updated `collect_unique_table_schemas` to return a `Result` type, enabling error handling for duplicate table names. - Modified the function to return an error when duplicate table names are found in `table_rows`. - Adjusted test cases to handle the new `Result` return type in `collect_unique_table_schemas`. Signed-off-by: Lei, HUANG * feat/auto-schema-align: - **Refactor `partition_columns` Method**: Updated the `partition_columns` method in `multi_dim.rs`, `partition.rs`, and `splitter.rs` to return a slice reference instead of a cloned vector, improving performance by avoiding unnecessary cloning. - **Enhance Partition Handling**: Added functions `collect_tag_columns_and_non_tag_indices` and `strip_partition_columns_from_batch` in `pending_rows_batcher.rs` to manage partition columns more efficiently, including stripping partition columns from record batches. - **Update Tests**: Modified existing tests and added new ones in `pending_rows_batcher.rs` to verify the functionality of partition column handling, ensuring correct behavior of the new methods. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Enhance Schema Handling and Validation in `pending_rows_batcher.rs` - **Schema Validation Enhancements**: - Added checks for essential columns (`timestamp`, `value`) in `collect_tag_columns_and_non_tag_indices`. - Introduced `PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT` to ensure minimum column count in `strip_partition_columns_from_batch`. - Improved error handling for unexpected data types and duplicated columns. - **Function Modifications**: - Updated `strip_partition_columns_from_batch` to project essential columns without lookup. - Modified `flush_batch_physical` to use `essential_col_indices` instead of `non_tag_indices`. - **Test Enhancements**: - Added tests for schema validation, including checks for unexpected data types and duplicated columns. - Verified correct projection of essential columns in `strip_partition_columns_from_batch`. Files affected: `pending_rows_batcher.rs`, `tests`. Signed-off-by: Lei, HUANG * feat/auto-schema-align: - **Add `smallvec` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `smallvec` as a workspace dependency. - **Refactor Function**: Renamed `collect_tag_columns_and_non_tag_indices` to `columns_taxonomy` in `pending_rows_batcher.rs` and updated its return type to use `SmallVec`. - **Update Tests**: Modified test cases in `pending_rows_batcher.rs` to reflect changes in function name and return type. Signed-off-by: Lei, HUANG * feat/auto-schema-align: **Refactor `pending_rows_batcher.rs` to Simplify Table ID Handling** - Updated `TableBatch` struct to use `TableId` directly instead of `Option` for `table_id`. - Simplified logic in `flush_batch_physical` by removing the check for `None` in `table_id`. - Adjusted related logic in `start_worker` to accommodate the change in `table_id` handling. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Enhance Batch Processing Logic - **`pending_rows_batcher.rs`**: - Moved column taxonomy resolution inside the loop to handle schema variations across batches. - Added checks to skip processing if both tag columns and essential column indices are empty. - **Tests**: - Added `test_modify_batch_sparse_with_taxonomy_per_batch` to verify batch modification logic with varying schemas. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Remove Primary Key Column Check in `pending_rows_batcher.rs` - Removed the check for the primary key column and other essential column names in the function `strip_partition_columns_from_batch` within `pending_rows_batcher.rs`. - Simplified the logic by eliminating the validation of column order against expected essential names. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Refactor error handling and iteration in `otlp.rs` and `pending_rows_batcher.rs` - **`otlp.rs`**: Simplified error handling by removing `CatalogSnafu` context when awaiting table retrieval. - **`pending_rows_batcher.rs`**: Streamlined iteration over tables by removing unnecessary `into_iter()` calls, improving code readability and efficiency. Signed-off-by: Lei, HUANG * chore/metrics-for-bulk: Add timing metrics for batch processing in `pending_rows_batcher.rs` - Introduced `modify_elapsed` and `columns_taxonomy_elapsed` to measure time spent in `modify_batch_sparse` and `columns_taxonomy` functions. - Updated `flush_batch_physical` to record these metrics using `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED`. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Commit Summary - **Remove Unused Code**: Eliminated the `#[allow(dead_code)]` attribute from the `compute_tsid_array` function in `batch_modifier.rs`. - **Error Handling Improvement**: Enhanced error handling in `flush_batch_physical` function by adjusting the `match` block in `pending_rows_batcher.rs`. - **Simplify Logic**: Streamlined the logic in `rows_to_aligned_record_batch` by removing unnecessary type casting in `prom_row_builder.rs`. Signed-off-by: Lei, HUANG * feat/auto-schema-align: **Refactor `flush_batch_physical` in `pending_rows_batcher.rs`:** - Moved partition column stripping logic to a single location before processing region batches. - Updated the use of `combined_batch` to `stripped_batch` for consistency in batch processing. - Removed redundant partition column stripping logic within the region batch loop. Signed-off-by: Lei, HUANG * feat/auto-schema-align: ### Update `batch_modifier.rs` Documentation and Parameter Naming - Enhanced documentation for `compute_tsid_array` and `modify_batch_sparse` functions to clarify their logic and parameters. - Renamed parameter `non_tag_column_indices` to `extra_column_indices` in `modify_batch_sparse` for better clarity. Signed-off-by: Lei, HUANG --------- Signed-off-by: Lei, HUANG --- Cargo.lock | 2 + src/frontend/src/instance/dashboard.rs | 11 +- src/frontend/src/instance/influxdb.rs | 7 +- src/frontend/src/instance/jaeger.rs | 9 +- src/frontend/src/instance/otlp.rs | 5 +- src/frontend/src/instance/prom_store.rs | 317 ++- src/frontend/src/server.rs | 2 + src/metric-engine/src/batch_modifier.rs | 41 +- src/metric-engine/src/engine/bulk_insert.rs | 139 +- src/metric-engine/src/lib.rs | 2 +- src/metric-engine/src/test_util.rs | 16 + src/operator/src/insert.rs | 2 +- src/partition/src/multi_dim.rs | 4 +- src/partition/src/partition.rs | 2 +- src/partition/src/splitter.rs | 42 +- src/servers/Cargo.toml | 2 + src/servers/src/error.rs | 10 +- src/servers/src/http/event.rs | 9 +- src/servers/src/http/prometheus.rs | 21 +- src/servers/src/lib.rs | 1 + src/servers/src/metrics.rs | 7 + src/servers/src/pending_rows_batcher.rs | 2006 +++++++++++++------ src/servers/src/pipeline.rs | 12 +- src/servers/src/prom_row_builder.rs | 557 +++++ 24 files changed, 2551 insertions(+), 675 deletions(-) create mode 100644 src/servers/src/prom_row_builder.rs diff --git a/Cargo.lock b/Cargo.lock index 401ac3b1ca..54be9bbdcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12057,6 +12057,7 @@ dependencies = [ "local-ip-address", "log-query", "loki-proto", + "metric-engine", "mime_guess", "mysql_async", "notify", @@ -12093,6 +12094,7 @@ dependencies = [ "session", "simd-json", "simdutf8", + "smallvec", "snafu 0.8.6", "snap", "socket2 0.5.10", diff --git a/src/frontend/src/instance/dashboard.rs b/src/frontend/src/instance/dashboard.rs index 373961dbfa..5b83a31f20 100644 --- a/src/frontend/src/instance/dashboard.rs +++ b/src/frontend/src/instance/dashboard.rs @@ -33,7 +33,7 @@ use datafusion::sql::TableReference; use datafusion_expr::{DmlStatement, LogicalPlan, lit}; use datatypes::arrow::array::{Array, AsArray}; use servers::error::{ - CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, ExecuteQuerySnafu, NotSupportedSnafu, + CollectRecordbatchSnafu, DataFusionSnafu, ExecuteQuerySnafu, NotSupportedSnafu, TableNotFoundSnafu, }; use servers::query_handler::DashboardDefinition; @@ -139,8 +139,7 @@ impl Instance { DASHBOARD_TABLE_NAME, Some(&ctx), ) - .await - .context(CatalogSnafu)? + .await? { return Ok(table); } @@ -178,8 +177,7 @@ impl Instance { DASHBOARD_TABLE_NAME, Some(&ctx), ) - .await - .context(CatalogSnafu)? + .await? .context(TableNotFoundSnafu { catalog: catalog.to_string(), schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(), @@ -255,8 +253,7 @@ impl Instance { DASHBOARD_TABLE_NAME, Some(&query_ctx), ) - .await - .context(CatalogSnafu)? + .await? { table } else { diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 0c63688262..fe5fdeac77 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -21,9 +21,7 @@ use client::Output; use common_error::ext::BoxedError; use common_time::Timestamp; use common_time::timestamp::TimeUnit; -use servers::error::{ - AuthSnafu, CatalogSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu, -}; +use servers::error::{AuthSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu}; use servers::influxdb::InfluxdbRequest; use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef}; use servers::query_handler::InfluxdbLineProtocolHandler; @@ -92,8 +90,7 @@ impl InfluxdbLineTimestampAligner<'_> { &insert.table_name, Some(query_context), ) - .await - .context(CatalogSnafu)? + .await? .map(|x| x.schema()) .and_then(|schema| { schema.timestamp_column().map(|col| { diff --git a/src/frontend/src/instance/jaeger.rs b/src/frontend/src/instance/jaeger.rs index 607ed80098..e500de7a85 100644 --- a/src/frontend/src/instance/jaeger.rs +++ b/src/frontend/src/instance/jaeger.rs @@ -38,8 +38,7 @@ use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_n use query::QueryEngineRef; use serde_json::Value as JsonValue; use servers::error::{ - CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult, - TableNotFoundSnafu, + CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult, TableNotFoundSnafu, }; use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent}; use servers::otlp::trace::{ @@ -336,8 +335,7 @@ async fn query_trace_table( table_name, Some(&ctx), ) - .await - .context(CatalogSnafu)? + .await? .with_context(|| TableNotFoundSnafu { table: table_name, catalog: ctx.current_catalog(), @@ -425,8 +423,7 @@ async fn get_table( table_name, Some(&ctx), ) - .await - .context(CatalogSnafu)? + .await? .with_context(|| TableNotFoundSnafu { table: table_name, catalog: ctx.current_catalog(), diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 9b21f9924f..59174aa89a 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -26,7 +26,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest; use pipeline::{GreptimePipelineParams, PipelineWay}; -use servers::error::{self, AuthSnafu, CatalogSnafu, Result as ServerResult}; +use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; @@ -255,8 +255,7 @@ impl Instance { let table = self .catalog_manager .table(catalog, &schema, &req.table_name, None) - .await - .context(CatalogSnafu)?; + .await?; let Some(rows) = req.rows.as_mut() else { continue; diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index c8f76753af..10fc2f1790 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -17,7 +17,11 @@ use std::sync::Arc; use api::prom_store::remote::read_request::ResponseType; use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse}; -use api::v1::RowInsertRequests; +use api::v1::alter_table_expr::Kind; +use api::v1::{ + AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef, CreateTableExpr, + RowInsertRequests, SemanticType, +}; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use client::OutputData; @@ -27,19 +31,25 @@ use common_query::Output; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_recordbatch::RecordBatches; use common_telemetry::{debug, tracing}; -use operator::insert::InserterRef; +use operator::insert::{ + AutoCreateTableType, InserterRef, build_create_table_expr, fill_table_options_for_create, +}; use operator::statement::StatementExecutor; use prost::Message; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF, collect_plan_metrics}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef}; +use servers::pending_rows_batcher::PendingRowsSchemaAlterer; use servers::prom_store::{self, Metrics}; use servers::query_handler::{ PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse, }; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; +use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY}; +use store_api::mito_engine_options::SST_FORMAT_KEY; +use table::table_reference::TableReference; use tracing::instrument; use crate::error::{ @@ -50,6 +60,34 @@ use crate::instance::Instance; const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; +fn auto_create_table_type_for_prom_remote_write( + ctx: &QueryContextRef, + with_metric_engine: bool, +) -> AutoCreateTableType { + if with_metric_engine { + let physical_table = ctx + .extension(PHYSICAL_TABLE_PARAM) + .unwrap_or(GREPTIME_PHYSICAL_TABLE) + .to_string(); + AutoCreateTableType::Logical(physical_table) + } else { + AutoCreateTableType::Physical + } +} + +fn required_physical_table_for_create_type(create_type: &AutoCreateTableType) -> Option<&str> { + match create_type { + AutoCreateTableType::Logical(physical_table) => Some(physical_table.as_str()), + _ => None, + } +} + +fn fill_metric_physical_table_options(table_options: &mut HashMap) { + // We always enforce flat format in this ingestion path. + table_options.insert(SST_FORMAT_KEY.to_string(), "flat".to_string()); + table_options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string()); +} + #[inline] fn is_supported(response_type: i32) -> bool { // Only supports samples response right now @@ -159,6 +197,157 @@ impl Instance { } } +#[async_trait] +impl PendingRowsSchemaAlterer for Instance { + async fn create_tables_if_missing_batch( + &self, + catalog: &str, + schema: &str, + tables: &[(&str, &[api::v1::ColumnSchema])], + with_metric_engine: bool, + ctx: QueryContextRef, + ) -> ServerResult<()> { + if tables.is_empty() { + return Ok(()); + } + + let create_type = auto_create_table_type_for_prom_remote_write(&ctx, with_metric_engine); + if let Some(physical_table) = required_physical_table_for_create_type(&create_type) { + self.create_metric_physical_table_if_missing( + catalog, + schema, + physical_table, + ctx.clone(), + ) + .await?; + } + + let engine = if matches!(create_type, AutoCreateTableType::Logical(_)) { + METRIC_ENGINE_NAME + } else { + common_catalog::consts::default_engine() + }; + + // Check which tables actually still need to be created (may have been + // concurrently created by another request). + let mut create_exprs: Vec = Vec::with_capacity(tables.len()); + for &(table_name, request_schema) in tables { + let existing = self + .catalog_manager() + .table(catalog, schema, table_name, Some(ctx.as_ref())) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + if existing.is_some() { + continue; + } + + let table_ref = TableReference::full(catalog, schema, table_name); + let mut create_table_expr = build_create_table_expr(&table_ref, request_schema, engine) + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + + let mut table_options = std::collections::HashMap::with_capacity(4); + fill_table_options_for_create(&mut table_options, &create_type, &ctx); + create_table_expr.table_options.extend(table_options); + create_exprs.push(create_table_expr); + } + + if create_exprs.is_empty() { + return Ok(()); + } + + match create_type { + AutoCreateTableType::Logical(_) => { + // Use the batch API for logical tables. + self.statement_executor + .create_logical_tables(&create_exprs, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + } + AutoCreateTableType::Physical => { + // Physical tables don't have a batch DDL path; create one at a time. + for mut expr in create_exprs { + expr.table_options + .insert(SST_FORMAT_KEY.to_string(), "flat".to_string()); + self.statement_executor + .create_table_inner(&mut expr, None, ctx.clone()) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + } + } + create_type => { + return error::InvalidPromRemoteRequestSnafu { + msg: format!( + "prom remote write only supports logical or physical auto-create: {}", + create_type.as_str() + ), + } + .fail(); + } + } + + Ok(()) + } + + async fn add_missing_prom_tag_columns_batch( + &self, + catalog: &str, + schema: &str, + tables: &[(&str, &[String])], + ctx: QueryContextRef, + ) -> ServerResult<()> { + if tables.is_empty() { + return Ok(()); + } + + let alter_exprs: Vec = tables + .iter() + .filter(|(_, columns)| !columns.is_empty()) + .map(|&(table_name, columns)| { + let add_columns = AddColumns { + add_columns: columns + .iter() + .map(|column_name| AddColumn { + column_def: Some(ColumnDef { + name: column_name.clone(), + data_type: ColumnDataType::String as i32, + is_nullable: true, + semantic_type: SemanticType::Tag as i32, + comment: String::new(), + ..Default::default() + }), + location: None, + add_if_not_exists: true, + }) + .collect(), + }; + + AlterTableExpr { + catalog_name: catalog.to_string(), + schema_name: schema.to_string(), + table_name: table_name.to_string(), + kind: Some(Kind::AddColumns(add_columns)), + } + }) + .collect(); + + if alter_exprs.is_empty() { + return Ok(()); + } + + self.statement_executor + .alter_logical_tables(alter_exprs, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + + Ok(()) + } +} + #[async_trait] impl PromStoreProtocolHandler for Instance { async fn pre_write( @@ -267,6 +456,61 @@ impl PromStoreProtocolHandler for Instance { } } +impl Instance { + async fn create_metric_physical_table_if_missing( + &self, + catalog: &str, + schema: &str, + physical_table: &str, + ctx: QueryContextRef, + ) -> ServerResult<()> { + let table = self + .catalog_manager() + .table(catalog, schema, physical_table, Some(ctx.as_ref())) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + if table.is_some() { + return Ok(()); + } + + let table_ref = TableReference::full(catalog, schema, physical_table); + let default_schema = vec![ + api::v1::ColumnSchema { + column_name: common_query::prelude::greptime_timestamp().to_string(), + datatype: api::v1::ColumnDataType::TimestampMillisecond as i32, + semantic_type: api::v1::SemanticType::Timestamp as i32, + datatype_extension: None, + options: None, + }, + api::v1::ColumnSchema { + column_name: common_query::prelude::greptime_value().to_string(), + datatype: api::v1::ColumnDataType::Float64 as i32, + semantic_type: api::v1::SemanticType::Field as i32, + datatype_extension: None, + options: None, + }, + ]; + let mut create_table_expr = build_create_table_expr( + &table_ref, + &default_schema, + common_catalog::consts::default_engine(), + ) + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + create_table_expr.engine = METRIC_ENGINE_NAME.to_string(); + fill_metric_physical_table_options(&mut create_table_expr.table_options); + + self.statement_executor + .create_table_inner(&mut create_table_expr, None, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + + Ok(()) + } +} + /// This handler is mainly used for `frontend` or `standalone` to directly import /// the metrics collected by itself, thereby avoiding importing metrics through the network, /// thus reducing compression and network transmission overhead, @@ -320,3 +564,72 @@ impl PromStoreProtocolHandler for ExportMetricHandler { unreachable!(); } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use session::context::QueryContext; + + use super::*; + + #[test] + fn test_auto_create_table_type_for_prom_remote_write_metric_engine() { + let mut query_ctx = QueryContext::with( + common_catalog::consts::DEFAULT_CATALOG_NAME, + common_catalog::consts::DEFAULT_SCHEMA_NAME, + ); + query_ctx.set_extension(PHYSICAL_TABLE_PARAM, "metric_physical".to_string()); + let ctx = Arc::new(query_ctx); + + let create_type = auto_create_table_type_for_prom_remote_write(&ctx, true); + match create_type { + AutoCreateTableType::Logical(physical) => assert_eq!(physical, "metric_physical"), + _ => panic!("expected logical table create type"), + } + } + + #[test] + fn test_auto_create_table_type_for_prom_remote_write_without_metric_engine() { + let ctx = Arc::new(QueryContext::with( + common_catalog::consts::DEFAULT_CATALOG_NAME, + common_catalog::consts::DEFAULT_SCHEMA_NAME, + )); + + let create_type = auto_create_table_type_for_prom_remote_write(&ctx, false); + match create_type { + AutoCreateTableType::Physical => {} + _ => panic!("expected physical table create type"), + } + } + + #[test] + fn test_required_physical_table_for_create_type() { + let logical = AutoCreateTableType::Logical("phy_table".to_string()); + assert_eq!( + Some("phy_table"), + required_physical_table_for_create_type(&logical) + ); + + let physical = AutoCreateTableType::Physical; + assert_eq!(None, required_physical_table_for_create_type(&physical)); + } + + #[test] + fn test_metric_physical_table_options_forces_flat_sst_format() { + let mut table_options = HashMap::new(); + + fill_metric_physical_table_options(&mut table_options); + + assert_eq!( + Some("flat"), + table_options.get(SST_FORMAT_KEY).map(String::as_str) + ); + assert_eq!( + Some("true"), + table_options + .get(PHYSICAL_TABLE_METADATA_KEY) + .map(String::as_str) + ); + } +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 4d0db700d1..e66ae718ba 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -130,6 +130,8 @@ where self.instance.partition_manager().clone(), self.instance.node_manager().clone(), self.instance.catalog_manager().clone(), + opts.prom_store.with_metric_engine, + self.instance.clone(), opts.prom_store.pending_rows_flush_interval, opts.prom_store.max_batch_rows, opts.prom_store.max_concurrent_flushes, diff --git a/src/metric-engine/src/batch_modifier.rs b/src/metric-engine/src/batch_modifier.rs index d06eaa976b..76d9bb418a 100644 --- a/src/metric-engine/src/batch_modifier.rs +++ b/src/metric-engine/src/batch_modifier.rs @@ -28,7 +28,7 @@ use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu}; /// Info about a tag column for TSID computation and sparse primary key encoding. #[allow(dead_code)] -pub(crate) struct TagColumnInfo { +pub struct TagColumnInfo { /// Column name (used for label-name hash). pub name: String, /// Column index in the RecordBatch. @@ -37,9 +37,16 @@ pub(crate) struct TagColumnInfo { pub column_id: ColumnId, } -/// Computes `__tsid` values for each row. -#[allow(dead_code)] -pub(crate) fn compute_tsid_array( +/// Computes the TSID for each row in a [RecordBatch]. +/// +/// The TSID is a stable hash of the set of labels (tags) present in each row. +/// It accounts for both the names and values of all non-null tag columns. +/// +/// # Logic +/// - If a row has no nulls across all `sorted_tag_columns`, it uses a precomputed hash of all label names. +/// - If a row has nulls, it dynamically computes a hash of the names of labels that are present (non-null). +/// - In both cases, it then hashes the values of those present labels in the order specified by `sorted_tag_columns`. +pub fn compute_tsid_array( batch: &RecordBatch, sorted_tag_columns: &[TagColumnInfo], tag_arrays: &[&StringArray], @@ -110,12 +117,30 @@ fn build_tag_arrays<'a>( .collect() } -/// Modifies a RecordBatch for sparse primary key encoding. -pub(crate) fn modify_batch_sparse( +/// Modifies a [RecordBatch] to include a sparse primary key column. +/// +/// This function transforms the input `batch` into a new `RecordBatch` where the first column +/// is the generated primary key (named [PRIMARY_KEY_COLUMN_NAME]), followed by columns +/// indicated by `extra_column_indices`. +/// +/// The primary key uses a "sparse" encoding, which compactly represents the row's identity +/// by only including non-null tag values. The encoding, handled by [SparsePrimaryKeyCodec], +/// consists of: +/// 1. The `table_id`. +/// 2. A `tsid` (Time Series ID), which is a hash of the present tags. +/// 3. The actual non-null tag values paired with their `column_id`. +/// +/// # Parameters +/// - `batch`: The source [RecordBatch]. +/// - `table_id`: The ID of the table. +/// - `sorted_tag_columns`: Metadata for tag columns, used for both TSID computation and PK encoding. +/// - `extra_column_indices`: Indices of columns from the original batch to keep in the output +/// (typically the timestamp and value fields). +pub fn modify_batch_sparse( batch: RecordBatch, table_id: u32, sorted_tag_columns: &[TagColumnInfo], - non_tag_column_indices: &[usize], + extra_column_indices: &[usize], ) -> Result { let num_rows = batch.num_rows(); let codec = SparsePrimaryKeyCodec::schemaless(); @@ -151,7 +176,7 @@ pub(crate) fn modify_batch_sparse( ))]; let mut columns: Vec> = vec![Arc::new(pk_array)]; - for &idx in non_tag_column_indices { + for &idx in extra_column_indices { fields.push(batch.schema().fields()[idx].clone()); columns.push(batch.column(idx).clone()); } diff --git a/src/metric-engine/src/engine/bulk_insert.rs b/src/metric-engine/src/engine/bulk_insert.rs index 8122cdc958..300bd34647 100644 --- a/src/metric-engine/src/engine/bulk_insert.rs +++ b/src/metric-engine/src/engine/bulk_insert.rs @@ -34,18 +34,20 @@ use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse}; use crate::engine::MetricEngineInner; use crate::error; use crate::error::Result; +use crate::metrics::MITO_OPERATION_ELAPSED; impl MetricEngineInner { - /// Bulk-inserts logical rows into a metric region. + /// Bulk-inserts rows into a metric region. /// - /// This method accepts a `RegionBulkInsertsRequest` whose payload is a logical - /// `RecordBatch` (timestamp, value and tag columns) for the given logical `region_id`. + /// **Logical region path:** The request payload is a logical `RecordBatch` + /// (timestamp, value and tag columns). It is transformed to physical format + /// via `modify_batch_sparse`, encoded to Arrow IPC, and forwarded as a + /// `BulkInserts` request to the data region. If mito reports + /// `StatusCode::Unsupported`, the request is transparently retried as a `Put`. /// - /// The transformed batch is encoded to Arrow IPC and forwarded as a `BulkInserts` - /// request to the data region, along with the original `partition_expr_version`. - /// If the data region reports `StatusCode::Unsupported` for bulk inserts, the request - /// is transparently retried as a `Put` by converting the original logical batch into - /// `api::v1::Rows`, so callers observe the same semantics as `put_region`. + /// **Physical region path:** The request payload is already in physical format + /// (produced by the batcher's `flush_batch_physical`). It is forwarded directly + /// to the data region with no transformation. /// /// Returns the number of affected rows, or `0` if the input batch is empty. pub async fn bulk_insert_region( @@ -53,13 +55,42 @@ impl MetricEngineInner { region_id: RegionId, request: RegionBulkInsertsRequest, ) -> Result { - ensure!( - !self.is_physical_region(region_id), - error::UnsupportedRegionRequestSnafu { - request: RegionRequest::BulkInserts(request), - } - ); + if request.payload.num_rows() == 0 { + return Ok(0); + } + if self.is_physical_region(region_id) { + let _timer = MITO_OPERATION_ELAPSED + .with_label_values(&["bulk_insert_physical"]) + .start_timer(); + return self.bulk_insert_physical_region(region_id, request).await; + } + let _timer = MITO_OPERATION_ELAPSED + .with_label_values(&["bulk_insert_logical"]) + .start_timer(); + self.bulk_insert_logical_region(region_id, request).await + } + + /// Passthrough for bulk inserts targeting a physical data region. + /// + /// The batch is already in physical format (with `__primary_key`, timestamp, + /// value columns), so no logical-to-physical transformation is needed. + async fn bulk_insert_physical_region( + &self, + region_id: RegionId, + request: RegionBulkInsertsRequest, + ) -> Result { + self.data_region + .write_data(region_id, RegionRequest::BulkInserts(request)) + .await + } + + /// Bulk-inserts logical rows, transforming them to physical format first. + async fn bulk_insert_logical_region( + &self, + region_id: RegionId, + request: RegionBulkInsertsRequest, + ) -> Result { let (physical_region_id, data_region_id, primary_key_encoding) = self.find_data_region_meta(region_id)?; @@ -390,6 +421,7 @@ mod tests { use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray}; use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; use datatypes::arrow::record_batch::RecordBatch; + use mito2::config::MitoConfig; use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING; use store_api::path_utils::table_dir; use store_api::region_engine::RegionEngine; @@ -397,6 +429,7 @@ mod tests { use store_api::storage::{RegionId, ScanRequest}; use super::record_batch_to_ipc; + use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse}; use crate::error::Error; use crate::test_util::{self, TestEnv}; @@ -492,23 +525,81 @@ mod tests { } #[tokio::test] - async fn test_bulk_insert_physical_region_rejected() { - let env = TestEnv::new().await; + async fn test_bulk_insert_physical_region_passthrough() { + // Use flat format so that BulkMemtable is used (supports write_bulk). + let mito_config = MitoConfig { + default_experimental_flat_format: true, + ..Default::default() + }; + let env = TestEnv::with_mito_config("", mito_config, Default::default()).await; env.init_metric_region().await; - let physical_region_id = env.default_physical_region_id(); - let batch = build_logical_batch(0, 2); - let request = build_bulk_request(physical_region_id, batch); + let logical_region_id = env.default_logical_region_id(); - let err = env + // First, do a normal logical bulk insert so we can compare results. + let logical_batch = build_logical_batch(0, 3); + let logical_request = build_bulk_request(logical_region_id, logical_batch.clone()); + let response = env + .metric() + .handle_request(logical_region_id, logical_request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 3); + + // Now build a physical-format batch using modify_batch_sparse (simulating + // what the batcher's flush_batch_physical does) and send it directly to + // the physical region. + let tag_columns = vec![TagColumnInfo { + name: "job".to_string(), + index: 2, + column_id: 2, // column_id for "job" in the physical table + }]; + let non_tag_indices = vec![0, 1]; // timestamp, value + let second_batch = build_logical_batch(3, 3); + let physical_batch = modify_batch_sparse( + second_batch, + logical_region_id.table_id(), + &tag_columns, + &non_tag_indices, + ) + .unwrap(); + let request = build_bulk_request(physical_region_id, physical_batch); + let response = env .metric() .handle_request(physical_region_id, request) .await - .unwrap_err(); - let Some(err) = err.as_any().downcast_ref::() else { - panic!("unexpected error type"); + .unwrap(); + assert_eq!(response.affected_rows, 3); + + // Verify all 6 rows are readable from the logical region. + let stream = env + .metric() + .scan_to_stream(logical_region_id, ScanRequest::default()) + .await + .unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(batches.iter().map(|b| b.num_rows()).sum::(), 6); + } + + #[tokio::test] + async fn test_bulk_insert_physical_region_empty_batch() { + // Use flat format so that BulkMemtable is used (supports write_bulk). + let mito_config = MitoConfig { + default_experimental_flat_format: true, + ..Default::default() }; - assert_matches!(err, Error::UnsupportedRegionRequest { .. }); + let env = TestEnv::with_mito_config("", mito_config, Default::default()).await; + env.init_metric_region().await; + let physical_region_id = env.default_physical_region_id(); + + let batch = build_logical_batch(0, 0); + let request = build_bulk_request(physical_region_id, batch); + let response = env + .metric() + .handle_request(physical_region_id, request) + .await + .unwrap(); + assert_eq!(response.affected_rows, 0); } #[tokio::test] diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index 557baba25a..d209eb7588 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -52,7 +52,7 @@ #![recursion_limit = "256"] -mod batch_modifier; +pub mod batch_modifier; pub mod config; mod data_region; pub mod engine; diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index d3e929cf63..ec55a01903 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -76,6 +76,22 @@ impl TestEnv { } } + /// Returns a new env with specific `prefix`, `mito_config`, and `config` for test. + pub async fn with_mito_config( + prefix: &str, + mito_config: MitoConfig, + config: EngineConfig, + ) -> Self { + let mut mito_env = MitoTestEnv::with_prefix(prefix).await; + let mito = mito_env.create_engine(mito_config).await; + let metric = MetricEngine::try_new(mito.clone(), config).unwrap(); + Self { + mito_env, + mito, + metric, + } + } + /// Returns a new env with specific `prefix` and `mito_env` for test. pub async fn with_mito_env(mut mito_env: MitoTestEnv) -> Self { let mito = mito_env.create_engine(MitoConfig::default()).await; diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index e1f121699e..aecca50e09 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -103,7 +103,7 @@ pub enum AutoCreateTableType { } impl AutoCreateTableType { - fn as_str(&self) -> &'static str { + pub fn as_str(&self) -> &'static str { match self { AutoCreateTableType::Logical(_) => "logical", AutoCreateTableType::Physical => "physical", diff --git a/src/partition/src/multi_dim.rs b/src/partition/src/multi_dim.rs index 8825c6de59..a68479888b 100644 --- a/src/partition/src/multi_dim.rs +++ b/src/partition/src/multi_dim.rs @@ -338,8 +338,8 @@ impl PartitionRule for MultiDimPartitionRule { self } - fn partition_columns(&self) -> Vec { - self.partition_columns.clone() + fn partition_columns(&self) -> &[String] { + &self.partition_columns } fn find_region(&self, values: &[Value]) -> Result { diff --git a/src/partition/src/partition.rs b/src/partition/src/partition.rs index 110f61a39e..f4c585e404 100644 --- a/src/partition/src/partition.rs +++ b/src/partition/src/partition.rs @@ -29,7 +29,7 @@ pub type PartitionRuleRef = Arc; pub trait PartitionRule: Sync + Send { fn as_any(&self) -> &dyn Any; - fn partition_columns(&self) -> Vec; + fn partition_columns(&self) -> &[String]; /// Finds the target region by the partition values. /// diff --git a/src/partition/src/splitter.rs b/src/partition/src/splitter.rs index fa19b74ad3..176422a173 100644 --- a/src/partition/src/splitter.rs +++ b/src/partition/src/splitter.rs @@ -66,7 +66,7 @@ impl<'a> SplitReadRowHelper<'a> { .collect::>(); let partition_cols = partition_rule.partition_columns(); let partition_cols_indexes = partition_cols - .into_iter() + .iter() .map(|col_name| col_name_to_idx.get(&col_name).cloned()) .collect::>(); @@ -176,15 +176,25 @@ mod tests { } #[derive(Debug, Serialize, Deserialize)] - struct MockPartitionRule; + struct MockPartitionRule { + partition_columns: Vec, + } + + impl Default for MockPartitionRule { + fn default() -> Self { + Self { + partition_columns: vec!["id".to_string()], + } + } + } impl PartitionRule for MockPartitionRule { fn as_any(&self) -> &dyn Any { self } - fn partition_columns(&self) -> Vec { - vec!["id".to_string()] + fn partition_columns(&self) -> &[String] { + &self.partition_columns } fn find_region(&self, values: &[Value]) -> Result { @@ -206,15 +216,25 @@ mod tests { } #[derive(Debug, Serialize, Deserialize)] - struct MockMissedColPartitionRule; + struct MockMissedColPartitionRule { + partition_columns: Vec, + } + + impl Default for MockMissedColPartitionRule { + fn default() -> Self { + Self { + partition_columns: vec!["missed_col".to_string()], + } + } + } impl PartitionRule for MockMissedColPartitionRule { fn as_any(&self) -> &dyn Any { self } - fn partition_columns(&self) -> Vec { - vec!["missed_col".to_string()] + fn partition_columns(&self) -> &[String] { + &self.partition_columns } fn find_region(&self, values: &[Value]) -> Result { @@ -243,8 +263,8 @@ mod tests { self } - fn partition_columns(&self) -> Vec { - vec![] + fn partition_columns(&self) -> &[String] { + &[] } fn find_region(&self, _values: &[Value]) -> Result { @@ -261,7 +281,7 @@ mod tests { #[test] fn test_writer_splitter() { let rows = mock_rows(); - let rule = Arc::new(MockPartitionRule) as PartitionRuleRef; + let rule = Arc::new(MockPartitionRule::default()) as PartitionRuleRef; let splitter = RowSplitter::new(rule); let mut splits = splitter.split(rows).unwrap(); @@ -276,7 +296,7 @@ mod tests { #[test] fn test_missed_col_writer_splitter() { let rows = mock_rows(); - let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef; + let rule = Arc::new(MockMissedColPartitionRule::default()) as PartitionRuleRef; let splitter = RowSplitter::new(rule); let mut splits = splitter.split(rows).unwrap(); diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 6531390ca3..55bb41ee51 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -78,6 +78,7 @@ jsonb.workspace = true lazy_static.workspace = true log-query.workspace = true loki-proto.workspace = true +metric-engine.workspace = true mime_guess = "2.0" notify.workspace = true object-pool = "0.5" @@ -114,6 +115,7 @@ serde_json.workspace = true session.workspace = true simd-json.workspace = true simdutf8 = "0.1" +smallvec.workspace = true snafu.workspace = true snap = "1" socket2 = "0.5" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 5fae7a82db..682288b271 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -392,7 +392,7 @@ pub enum Error { location: Location, }, - #[snafu(display("Error accessing catalog"))] + #[snafu(transparent)] Catalog { source: catalog::error::Error, #[snafu(implicit)] @@ -678,6 +678,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(transparent)] + DataTypes { + source: datatypes::error::Error, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -756,6 +763,7 @@ impl ErrorExt for Error { Catalog { source, .. } => source.status_code(), RowWriter { source, .. } => source.status_code(), + DataTypes { source, .. } => source.status_code(), TlsRequired { .. } => StatusCode::Unknown, Auth { source, .. } => source.status_code(), diff --git a/src/servers/src/http/event.rs b/src/servers/src/http/event.rs index 24bb844dc7..dc468a9b75 100644 --- a/src/servers/src/http/event.rs +++ b/src/servers/src/http/event.rs @@ -49,7 +49,7 @@ use table::table_reference::TableReference; use vrl::value::{KeyString, Value as VrlValue}; use crate::error::{ - CatalogSnafu, Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result, + Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result, status_code_to_http_status, }; use crate::http::HttpResponse; @@ -265,12 +265,7 @@ pub async fn query_pipeline_ddl( .map_err(BoxedError::new) .context(OtherSnafu)?; - let message = if handler - .get_table(&table_name, &query_ctx) - .await - .context(CatalogSnafu)? - .is_some() - { + let message = if handler.get_table(&table_name, &query_ctx).await?.is_some() { Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string()) } else if pipeline.is_variant_table_name() { Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string()) diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index 60ad780beb..63149948a5 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -65,9 +65,8 @@ use store_api::metric_engine_consts::{ pub use super::result::prometheus_resp::PrometheusJsonResponse; use crate::error::{ - CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error, - InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu, - UnexpectedResultSnafu, + CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error, InvalidQuerySnafu, + NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu, UnexpectedResultSnafu, }; use crate::http::header::collect_plan_metrics; use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL, is_database_selection_label}; @@ -662,8 +661,7 @@ async fn retrieve_series_from_query_result( table_name, Some(query_ctx), ) - .await - .context(CatalogSnafu)? + .await? .with_context(|| TableNotFoundSnafu { catalog: query_ctx.current_catalog(), schema: query_ctx.current_schema(), @@ -1440,7 +1438,7 @@ async fn retrieve_table_names( }); while let Some(table) = tables_stream.next().await { - let table = table.context(CatalogSnafu)?; + let table = table?; if !table .table_info() .meta @@ -1497,7 +1495,7 @@ async fn retrieve_field_names( .next() .await { - let table = table.context(CatalogSnafu)?; + let table = table?; for column in table.field_columns() { field_columns.insert(column.name); } @@ -1508,8 +1506,7 @@ async fn retrieve_field_names( for table_name in matches { let table = manager .table(catalog, &schema, &table_name, Some(query_ctx)) - .await - .context(CatalogSnafu)? + .await? .with_context(|| TableNotFoundSnafu { catalog: catalog.to_string(), schema: schema.clone(), @@ -1533,8 +1530,7 @@ async fn retrieve_schema_names( let candidate_schemas = catalog_manager .schema_names(catalog, Some(query_ctx)) - .await - .context(CatalogSnafu)?; + .await?; for schema in candidate_schemas { let mut found = true; @@ -1542,8 +1538,7 @@ async fn retrieve_schema_names( if let Some(table_name) = retrieve_metric_name_from_promql(match_item) { let exists = catalog_manager .table_exists(catalog, &schema, &table_name, Some(query_ctx)) - .await - .context(CatalogSnafu)?; + .await?; if !exists { found = false; break; diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 41d73b109f..44587783be 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -42,6 +42,7 @@ pub mod pending_rows_batcher; mod pipeline; pub mod postgres; pub mod prom_remote_write; +pub(crate) mod prom_row_builder; pub mod prom_store; pub mod prometheus; pub mod prometheus_handler; diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index e3bff7fdbc..6f151db539 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -177,6 +177,13 @@ lazy_static! { vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0] ) .unwrap(); + pub static ref PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( + "greptime_prom_store_pending_rows_batch_flush_stage_elapsed", + "Elapsed time of pending rows batch flush stages in seconds", + &["stage"], + vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0] + ) + .unwrap(); /// Http prometheus read duration per database. pub static ref METRIC_HTTP_PROM_STORE_READ_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_servers_http_prometheus_read_elapsed", diff --git a/src/servers/src/pending_rows_batcher.rs b/src/servers/src/pending_rows_batcher.rs index f8486e3636..b6e07d2a81 100644 --- a/src/servers/src/pending_rows_batcher.rs +++ b/src/servers/src/pending_rows_batcher.rs @@ -12,53 +12,79 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::{Duration, Instant}; -use api::helper::ColumnDataTypeWrapper; +use api::v1::meta::Peer; use api::v1::region::{ BulkInsertRequest, RegionRequest, RegionRequestHeader, bulk_insert_request, region_request, }; -use api::v1::value::ValueData; -use api::v1::{ArrowIpc, RowInsertRequests, Rows}; -use arrow::array::{ - ArrayRef, Float64Builder, StringBuilder, TimestampMicrosecondBuilder, - TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, - new_null_array, -}; -use arrow::compute::{cast, concat_batches, filter_record_batch}; -use arrow::datatypes::{Field, Schema as ArrowSchema}; +use api::v1::{ArrowIpc, ColumnSchema, RowInsertRequests, Rows}; +use arrow::compute::{concat_batches, filter_record_batch}; +use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, TimeUnit}; use arrow::record_batch::RecordBatch; -use arrow_schema::TimeUnit; +use async_trait::async_trait; use bytes::Bytes; use catalog::CatalogManagerRef; use common_grpc::flight::{FlightEncoder, FlightMessage}; use common_meta::node_manager::NodeManagerRef; -use common_query::prelude::GREPTIME_PHYSICAL_TABLE; +use common_query::prelude::{GREPTIME_PHYSICAL_TABLE, greptime_timestamp, greptime_value}; use common_telemetry::tracing_context::TracingContext; -use common_telemetry::{debug, error, info, warn}; +use common_telemetry::{debug, error, warn}; use dashmap::DashMap; use dashmap::mapref::entry::Entry; -use datatypes::data_type::DataType; -use datatypes::prelude::ConcreteDataType; +use metric_engine::batch_modifier::{TagColumnInfo, modify_batch_sparse}; use partition::manager::PartitionRuleManagerRef; use session::context::QueryContextRef; -use snafu::{ResultExt, ensure}; -use store_api::storage::RegionId; +use smallvec::SmallVec; +use snafu::{OptionExt, ensure}; +use store_api::storage::{RegionId, TableId}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, mpsc, oneshot}; use crate::error; use crate::error::{Error, Result}; use crate::metrics::{ FLUSH_DROPPED_ROWS, FLUSH_ELAPSED, FLUSH_FAILURES, FLUSH_ROWS, FLUSH_TOTAL, PENDING_BATCHES, - PENDING_ROWS, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED, PENDING_WORKERS, + PENDING_ROWS, PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED, PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED, + PENDING_WORKERS, +}; +use crate::prom_row_builder::{ + build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto, + rows_to_aligned_record_batch, }; const PHYSICAL_TABLE_KEY: &str = "physical_table"; /// Whether wait for ingestion result before reply to client. const PENDING_ROWS_BATCH_SYNC_ENV: &str = "PENDING_ROWS_BATCH_SYNC"; const WORKER_IDLE_TIMEOUT_MULTIPLIER: u32 = 3; +const PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT: usize = 3; + +#[async_trait] +pub trait PendingRowsSchemaAlterer: Send + Sync { + /// Batch-create multiple logical tables that are missing. + /// Each entry is `(table_name, request_schema)`. + async fn create_tables_if_missing_batch( + &self, + catalog: &str, + schema: &str, + tables: &[(&str, &[ColumnSchema])], + with_metric_engine: bool, + ctx: QueryContextRef, + ) -> Result<()>; + + /// Batch-alter multiple logical tables to add missing tag columns. + /// Each entry is `(table_name, missing_column_names)`. + async fn add_missing_prom_tag_columns_batch( + &self, + catalog: &str, + schema: &str, + tables: &[(&str, &[String])], + ctx: QueryContextRef, + ) -> Result<()>; +} + +pub type PendingRowsSchemaAltererRef = Arc; #[derive(Debug, Clone, Hash, Eq, PartialEq)] struct BatchKey { @@ -70,10 +96,22 @@ struct BatchKey { #[derive(Debug)] struct TableBatch { table_name: String, + table_id: TableId, batches: Vec, row_count: usize, } +/// Intermediate planning state for resolving and preparing logical tables +/// before row-to-batch alignment. +struct TableResolutionPlan { + /// Resolved table schema and table id by logical table name. + region_schemas: HashMap, u32)>, + /// Missing tables that need to be created before alignment. + tables_to_create: Vec<(String, Vec)>, + /// Existing tables that need tag-column schema evolution. + tables_to_alter: Vec<(String, Vec)>, +} + struct PendingBatch { tables: HashMap, created_at: Option, @@ -101,7 +139,7 @@ struct PendingWorker { enum WorkerCommand { Submit { - table_batches: Vec<(String, RecordBatch)>, + table_batches: Vec<(String, u32, RecordBatch)>, total_rows: usize, ctx: QueryContextRef, response_tx: oneshot::Sender>, @@ -134,6 +172,8 @@ pub struct PendingRowsBatcher { flush_semaphore: Arc, inflight_semaphore: Arc, worker_channel_capacity: usize, + prom_store_with_metric_engine: bool, + schema_alterer: PendingRowsSchemaAltererRef, pending_rows_batch_sync: bool, shutdown: broadcast::Sender<()>, } @@ -144,6 +184,8 @@ impl PendingRowsBatcher { partition_manager: PartitionRuleManagerRef, node_manager: NodeManagerRef, catalog_manager: CatalogManagerRef, + prom_store_with_metric_engine: bool, + schema_alterer: PendingRowsSchemaAltererRef, flush_interval: Duration, max_batch_rows: usize, max_concurrent_flushes: usize, @@ -178,6 +220,8 @@ impl PendingRowsBatcher { partition_manager, node_manager, catalog_manager, + prom_store_with_metric_engine, + schema_alterer, flush_semaphore: Arc::new(Semaphore::new(max_concurrent_flushes)), inflight_semaphore: Arc::new(Semaphore::new(max_inflight_requests)), worker_channel_capacity, @@ -189,20 +233,13 @@ impl PendingRowsBatcher { pub async fn submit(&self, requests: RowInsertRequests, ctx: QueryContextRef) -> Result { let (table_batches, total_rows) = { let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["submit_build_table_batches"]) + .with_label_values(&["submit_build_and_align"]) .start_timer(); - build_table_batches(requests)? + self.build_and_align_table_batches(requests, &ctx).await? }; if total_rows == 0 { return Ok(0); } - let table_batches = { - let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["submit_align_region_schema"]) - .start_timer(); - self.align_table_batches_to_region_schema(table_batches, &ctx) - .await? - }; let permit = { let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED @@ -212,7 +249,7 @@ impl PendingRowsBatcher { .clone() .acquire_owned() .await - .map_err(|_| Error::BatcherChannelClosed)? + .map_err(|_| error::BatcherChannelClosedSnafu.build())? }; let (response_tx, response_rx) = oneshot::channel(); @@ -260,7 +297,9 @@ impl PendingRowsBatcher { let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED .with_label_values(&["submit_wait_flush_result"]) .start_timer(); - response_rx.await.map_err(|_| Error::BatcherChannelClosed)? + response_rx + .await + .map_err(|_| error::BatcherChannelClosedSnafu.build())? }; result.map(|()| total_rows as u64) } else { @@ -268,43 +307,337 @@ impl PendingRowsBatcher { } } - async fn align_table_batches_to_region_schema( + /// Converts proto `RowInsertRequests` directly into aligned `RecordBatch`es + /// in a single pass, handling table creation, schema alteration, column + /// renaming, reordering, and null-filling without building intermediate + /// RecordBatches. + async fn build_and_align_table_batches( &self, - table_batches: Vec<(String, RecordBatch)>, + requests: RowInsertRequests, ctx: &QueryContextRef, - ) -> Result> { + ) -> Result<(Vec<(String, u32, RecordBatch)>, usize)> { let catalog = ctx.current_catalog().to_string(); let schema = ctx.current_schema(); - let mut region_schemas: HashMap> = HashMap::new(); - let mut aligned_batches = Vec::with_capacity(table_batches.len()); - for (table_name, record_batch) in table_batches { - let region_schema = if let Some(region_schema) = region_schemas.get(&table_name) { - region_schema.clone() + let (table_rows, total_rows) = Self::collect_non_empty_table_rows(requests); + if total_rows == 0 { + return Ok((Vec::new(), 0)); + } + + let unique_tables = Self::collect_unique_table_schemas(&table_rows)?; + let mut plan = self + .plan_table_resolution(&catalog, &schema, ctx, &unique_tables) + .await?; + + self.create_missing_tables_and_refresh_schemas( + &catalog, + &schema, + ctx, + &table_rows, + &mut plan, + ) + .await?; + + self.alter_tables_and_refresh_schemas(&catalog, &schema, ctx, &mut plan) + .await?; + + let aligned_batches = Self::build_aligned_batches(&table_rows, &plan.region_schemas)?; + + Ok((aligned_batches, total_rows)) + } + + /// Extracts non-empty `(table_name, rows)` pairs and computes total row + /// count across the retained entries. + fn collect_non_empty_table_rows(requests: RowInsertRequests) -> (Vec<(String, Rows)>, usize) { + let mut table_rows: Vec<(String, Rows)> = Vec::with_capacity(requests.inserts.len()); + let mut total_rows = 0; + + for request in requests.inserts { + let Some(rows) = request.rows else { + continue; + }; + if rows.rows.is_empty() { + continue; + } + + total_rows += rows.rows.len(); + table_rows.push((request.table_name, rows)); + } + + (table_rows, total_rows) + } + + /// Returns unique `(table_name, proto_schema)` pairs while keeping the + /// first-seen schema for duplicate table names. + fn collect_unique_table_schemas( + table_rows: &[(String, Rows)], + ) -> Result> { + let mut unique_tables: Vec<(&str, &[ColumnSchema])> = Vec::with_capacity(table_rows.len()); + let mut seen = HashSet::new(); + + for (table_name, rows) in table_rows { + if seen.insert(table_name.as_str()) { + unique_tables.push((table_name.as_str(), &rows.schema)); } else { - let table = self - .catalog_manager - .table(&catalog, &schema, &table_name, Some(ctx.as_ref())) - .await - .map_err(|err| Error::Internal { - err_msg: format!( - "Failed to resolve table {} for pending batch alignment: {}", - table_name, err - ), - })? - .ok_or_else(|| Error::Internal { - err_msg: format!( - "Table not found during pending batch alignment: {}", - table_name - ), - })?; - let region_schema = table.table_info().meta.schema.arrow_schema().clone(); - region_schemas.insert(table_name.clone(), region_schema.clone()); - region_schema + // table_rows should group rows by table name. + return error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Found duplicated table name in RowInsertRequest: {}", + table_name + ), + } + .fail(); + } + } + + Ok(unique_tables) + } + + /// Resolves table metadata and classifies each table into existing, + /// to-create, and to-alter groups used by subsequent DDL steps. + async fn plan_table_resolution( + &self, + catalog: &str, + schema: &str, + ctx: &QueryContextRef, + unique_tables: &[(&str, &[ColumnSchema])], + ) -> Result { + let mut plan = TableResolutionPlan { + region_schemas: HashMap::with_capacity(unique_tables.len()), + tables_to_create: Vec::new(), + tables_to_alter: Vec::new(), + }; + + let resolved_tables = { + let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED + .with_label_values(&["align_resolve_table"]) + .start_timer(); + futures::future::join_all(unique_tables.iter().map(|(table_name, _)| { + self.catalog_manager + .table(catalog, schema, table_name, Some(ctx.as_ref())) + })) + .await + }; + + for ((table_name, rows_schema), table_result) in unique_tables.iter().zip(resolved_tables) { + let table = table_result?; + + if let Some(table) = table { + let table_info = table.table_info(); + let table_id = table_info.ident.table_id; + let region_schema = table_info.meta.schema.arrow_schema().clone(); + + let missing_columns = { + let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED + .with_label_values(&["align_identify_missing_columns"]) + .start_timer(); + identify_missing_columns_from_proto(rows_schema, region_schema.as_ref())? + }; + if !missing_columns.is_empty() { + plan.tables_to_alter + .push(((*table_name).to_string(), missing_columns)); + } + plan.region_schemas + .insert((*table_name).to_string(), (region_schema, table_id)); + } else { + let request_schema = { + let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED + .with_label_values(&["align_build_create_table_schema"]) + .start_timer(); + build_prom_create_table_schema_from_proto(rows_schema)? + }; + plan.tables_to_create + .push(((*table_name).to_string(), request_schema)); + } + } + + Ok(plan) + } + + /// Batch-creates missing tables, refreshes their schema metadata, and + /// enqueues follow-up alters for extra tag columns discovered in later rows. + async fn create_missing_tables_and_refresh_schemas( + &self, + catalog: &str, + schema: &str, + ctx: &QueryContextRef, + table_rows: &[(String, Rows)], + plan: &mut TableResolutionPlan, + ) -> Result<()> { + if plan.tables_to_create.is_empty() { + return Ok(()); + } + + let create_refs: Vec<(&str, &[ColumnSchema])> = plan + .tables_to_create + .iter() + .map(|(name, schema)| (name.as_str(), schema.as_slice())) + .collect(); + + { + let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED + .with_label_values(&["align_batch_create_tables"]) + .start_timer(); + self.schema_alterer + .create_tables_if_missing_batch( + catalog, + schema, + &create_refs, + self.prom_store_with_metric_engine, + ctx.clone(), + ) + .await?; + } + + let created_table_results = { + let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED + .with_label_values(&["align_resolve_table_after_create"]) + .start_timer(); + futures::future::join_all(plan.tables_to_create.iter().map(|(table_name, _)| { + self.catalog_manager + .table(catalog, schema, table_name, Some(ctx.as_ref())) + })) + .await + }; + + for ((table_name, _), table_result) in + plan.tables_to_create.iter().zip(created_table_results) + { + let table = table_result?.with_context(|| error::UnexpectedResultSnafu { + reason: format!( + "Table not found after pending batch create attempt: {}", + table_name + ), + })?; + let table_info = table.table_info(); + let table_id = table_info.ident.table_id; + let region_schema = table_info.meta.schema.arrow_schema().clone(); + plan.region_schemas + .insert(table_name.clone(), (region_schema, table_id)); + } + + Self::enqueue_alter_for_new_tables(table_rows, plan)?; + + Ok(()) + } + + /// For newly created tables, re-checks all row schemas and appends alter + /// operations when additional tag columns are still missing. + fn enqueue_alter_for_new_tables( + table_rows: &[(String, Rows)], + plan: &mut TableResolutionPlan, + ) -> Result<()> { + let created_tables: HashSet<&str> = plan + .tables_to_create + .iter() + .map(|(table_name, _)| table_name.as_str()) + .collect(); + + for (table_name, rows) in table_rows { + if !created_tables.contains(table_name.as_str()) { + continue; + } + + let Some((region_schema, _)) = plan.region_schemas.get(table_name) else { + continue; }; - let record_batch = align_record_batch_to_schema(record_batch, region_schema.as_ref())?; - aligned_batches.push((table_name, record_batch)); + let missing_columns = identify_missing_columns_from_proto(&rows.schema, region_schema)?; + if missing_columns.is_empty() + || plan + .tables_to_alter + .iter() + .any(|(existing_name, _)| existing_name == table_name) + { + continue; + } + + plan.tables_to_alter + .push((table_name.clone(), missing_columns)); + } + + Ok(()) + } + + /// Batch-alters tables that have missing tag columns and refreshes the + /// in-memory schema map used for row alignment. + async fn alter_tables_and_refresh_schemas( + &self, + catalog: &str, + schema: &str, + ctx: &QueryContextRef, + plan: &mut TableResolutionPlan, + ) -> Result<()> { + if plan.tables_to_alter.is_empty() { + return Ok(()); + } + + let alter_refs: Vec<(&str, &[String])> = plan + .tables_to_alter + .iter() + .map(|(name, cols)| (name.as_str(), cols.as_slice())) + .collect(); + { + let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED + .with_label_values(&["align_batch_add_missing_columns"]) + .start_timer(); + self.schema_alterer + .add_missing_prom_tag_columns_batch(catalog, schema, &alter_refs, ctx.clone()) + .await?; + } + + let altered_table_results = { + let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED + .with_label_values(&["align_resolve_table_after_schema_alter"]) + .start_timer(); + futures::future::join_all(plan.tables_to_alter.iter().map(|(table_name, _)| { + self.catalog_manager + .table(catalog, schema, table_name, Some(ctx.as_ref())) + })) + .await + }; + + for ((table_name, _), table_result) in + plan.tables_to_alter.iter().zip(altered_table_results) + { + let table = table_result?.with_context(|| error::UnexpectedResultSnafu { + reason: format!( + "Table not found after pending batch schema alter: {}", + table_name + ), + })?; + let table_info = table.table_info(); + let table_id = table_info.ident.table_id; + let refreshed_region_schema = table_info.meta.schema.arrow_schema().clone(); + plan.region_schemas + .insert(table_name.clone(), (refreshed_region_schema, table_id)); + } + + Ok(()) + } + + /// Converts proto rows to `RecordBatch` values aligned to resolved region + /// schemas and returns `(table_name, table_id, batch)` tuples. + fn build_aligned_batches( + table_rows: &[(String, Rows)], + region_schemas: &HashMap, u32)>, + ) -> Result> { + let mut aligned_batches = Vec::with_capacity(table_rows.len()); + for (table_name, rows) in table_rows { + let (region_schema, table_id) = + region_schemas.get(table_name).cloned().with_context(|| { + error::UnexpectedResultSnafu { + reason: format!("Region schema not resolved for table: {}", table_name), + } + })?; + + let record_batch = { + let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED + .with_label_values(&["align_rows_to_record_batch"]) + .start_timer(); + rows_to_aligned_record_batch(rows, region_schema.as_ref())? + }; + aligned_batches.push((table_name.clone(), table_id, record_batch)); } Ok(aligned_batches) @@ -422,9 +755,10 @@ fn start_worker( batch.waiters.push(FlushWaiter { response_tx, _permit }); - for (table_name, record_batch) in table_batches { + for (table_name, table_id, record_batch) in table_batches { let entry = batch.tables.entry(table_name.clone()).or_insert_with(|| TableBatch { table_name, + table_id, batches: Vec::new(), row_count: 0, }); @@ -579,6 +913,199 @@ async fn spawn_flush( } } +struct FlushRegionWrite { + region_id: RegionId, + row_count: usize, + datanode: Peer, + request: RegionRequest, +} + +enum FlushWriteResult { + Success { row_count: usize }, + Failed { row_count: usize, message: String }, +} + +fn should_dispatch_concurrently(region_write_count: usize) -> bool { + region_write_count > 1 +} + +/// Classifies columns in a logical-table batch for sparse primary-key conversion. +/// +/// Returns: +/// - `Vec`: all Utf8 tag columns sorted by tag name, used for +/// TSID and sparse primary-key encoding. +/// - `SmallVec<[usize; 3]>`: indices of columns copied into the physical batch +/// after `__primary_key`, ordered as `[greptime_timestamp, greptime_value, +/// partition_tag_columns...]`. +fn columns_taxonomy( + batch_schema: &Arc, + table_name: &str, + name_to_ids: &HashMap, + partition_columns: &HashSet<&str>, +) -> Result<(Vec, SmallVec<[usize; 3]>)> { + let mut tag_columns = Vec::new(); + let mut essential_column_indices = + SmallVec::<[usize; 3]>::with_capacity(2 + partition_columns.len()); + // Placeholder for greptime_timestamp and greptime_value + essential_column_indices.push(0); + essential_column_indices.push(0); + + let mut timestamp_index = None; + let mut value_index = None; + + for (index, field) in batch_schema.fields().iter().enumerate() { + match field.data_type() { + ArrowDataType::Utf8 => { + let column_id = name_to_ids.get(field.name()).copied().with_context(|| { + error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Column '{}' from logical table '{}' not found in physical table column IDs", + field.name(), + table_name + ), + } + })?; + tag_columns.push(TagColumnInfo { + name: field.name().clone(), + index, + column_id, + }); + + if partition_columns.contains(field.name().as_str()) { + essential_column_indices.push(index); + } + } + ArrowDataType::Timestamp(TimeUnit::Millisecond, _) => { + ensure!( + timestamp_index.replace(index).is_none(), + error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Duplicated timestamp column in logical table '{}' batch schema", + table_name + ), + } + ); + } + ArrowDataType::Float64 => { + ensure!( + value_index.replace(index).is_none(), + error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Duplicated value column in logical table '{}' batch schema", + table_name + ), + } + ); + } + datatype => { + return error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Unexpected data type '{datatype:?}' in logical table '{}' batch schema", + table_name + ), + } + .fail(); + } + } + } + + let timestamp_index = + timestamp_index.with_context(|| error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Missing essential column '{}' in logical table '{}' batch schema", + greptime_timestamp(), + table_name + ), + })?; + let value_index = value_index.with_context(|| error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Missing essential column '{}' in logical table '{}' batch schema", + greptime_value(), + table_name + ), + })?; + + tag_columns.sort_by(|a, b| a.name.cmp(&b.name)); + + essential_column_indices[0] = timestamp_index; + essential_column_indices[1] = value_index; + + Ok((tag_columns, essential_column_indices)) +} + +fn strip_partition_columns_from_batch(batch: RecordBatch) -> Result { + ensure!( + batch.num_columns() >= PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT, + error::InternalSnafu { + err_msg: format!( + "Expected at least {} columns in physical batch, got {}", + PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT, + batch.num_columns() + ), + } + ); + let essential_indices: Vec = (0..PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT).collect(); + batch + .project(&essential_indices) + .map_err(|err| Error::Internal { + err_msg: format!("Failed to project essential columns from RecordBatch: {err}"), + }) +} + +async fn flush_region_writes_concurrently( + node_manager: NodeManagerRef, + writes: Vec, +) -> Vec { + if !should_dispatch_concurrently(writes.len()) { + let mut results = Vec::with_capacity(writes.len()); + for write in writes { + let datanode = node_manager.datanode(&write.datanode).await; + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_write_region"]) + .start_timer(); + match datanode.handle(write.request).await { + Ok(_) => results.push(FlushWriteResult::Success { + row_count: write.row_count, + }), + Err(err) => results.push(FlushWriteResult::Failed { + row_count: write.row_count, + message: format!( + "Bulk insert flush failed for region {}: {:?}", + write.region_id, err + ), + }), + } + } + return results; + } + + let write_futures = writes.into_iter().map(|write| { + let node_manager = node_manager.clone(); + async move { + let datanode = node_manager.datanode(&write.datanode).await; + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_write_region"]) + .start_timer(); + + match datanode.handle(write.request).await { + Ok(_) => FlushWriteResult::Success { + row_count: write.row_count, + }, + Err(err) => FlushWriteResult::Failed { + row_count: write.row_count, + message: format!( + "Bulk insert flush failed for region {}: {:?}", + write.region_id, err + ), + }, + } + } + }); + + // todo(hl): should be bounded. + futures::future::join_all(write_futures).await +} + async fn flush_batch( flush: FlushBatch, partition_manager: PartitionRuleManagerRef, @@ -594,231 +1121,27 @@ async fn flush_batch( let start = Instant::now(); let mut first_error: Option = None; - let catalog = ctx.current_catalog().to_string(); - let schema = ctx.current_schema(); - - macro_rules! record_failure { - ($row_count:expr, $msg:expr) => {{ - let msg = $msg; - if first_error.is_none() { - first_error = Some(msg.clone()); - } - mark_flush_failure($row_count, &msg); - }}; - } - - for table_batch in table_batches { - let Some(first_batch) = table_batch.batches.first() else { - continue; - }; - - let schema_ref = first_batch.schema(); - let record_batch = { - let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["flush_concat_table_batches"]) - .start_timer(); - match concat_batches(&schema_ref, &table_batch.batches) { - Ok(batch) => batch, - Err(err) => { - record_failure!( - table_batch.row_count, - format!( - "Failed to concat table batch {}: {:?}", - table_batch.table_name, err - ) - ); - continue; - } - } - }; - - let table = { - let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["flush_resolve_table"]) - .start_timer(); - match catalog_manager - .table( - &catalog, - &schema, - &table_batch.table_name, - Some(ctx.as_ref()), - ) - .await - { - Ok(Some(table)) => table, - Ok(None) => { - record_failure!( - table_batch.row_count, - format!( - "Table not found during pending flush: {}", - table_batch.table_name - ) - ); - continue; - } - Err(err) => { - record_failure!( - table_batch.row_count, - format!( - "Failed to resolve table {} for pending flush: {:?}", - table_batch.table_name, err - ) - ); - continue; - } - } - }; - let table_info = table.table_info(); - - let partition_rule = { - let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["flush_fetch_partition_rule"]) - .start_timer(); - match partition_manager - .find_table_partition_rule(&table_info) - .await - { - Ok(rule) => rule, - Err(err) => { - record_failure!( - table_batch.row_count, - format!( - "Failed to fetch partition rule for table {}: {:?}", - table_batch.table_name, err - ) - ); - continue; - } - } - }; - - let region_masks = { - let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["flush_split_record_batch"]) - .start_timer(); - match partition_rule.0.split_record_batch(&record_batch) { - Ok(masks) => masks, - Err(err) => { - record_failure!( - table_batch.row_count, - format!( - "Failed to split record batch for table {}: {:?}", - table_batch.table_name, err - ) - ); - continue; - } - } - }; - - for (region_number, mask) in region_masks { - if mask.select_none() { - continue; - } - - let region_batch = if mask.select_all() { - record_batch.clone() - } else { - let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["flush_filter_record_batch"]) - .start_timer(); - match filter_record_batch(&record_batch, mask.array()) { - Ok(batch) => batch, - Err(err) => { - record_failure!( - table_batch.row_count, - format!( - "Failed to filter record batch for table {}: {:?}", - table_batch.table_name, err - ) - ); - continue; - } - } - }; - - let row_count = region_batch.num_rows(); - if row_count == 0 { - continue; - } - - let region_id = RegionId::new(table_info.table_id(), region_number); - let datanode = { - let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["flush_resolve_region_leader"]) - .start_timer(); - match partition_manager.find_region_leader(region_id).await { - Ok(peer) => peer, - Err(err) => { - record_failure!( - row_count, - format!("Failed to resolve region leader {}: {:?}", region_id, err) - ); - continue; - } - } - }; - - let (schema_bytes, data_header, payload) = { - let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["flush_encode_ipc"]) - .start_timer(); - match record_batch_to_ipc(region_batch) { - Ok(encoded) => encoded, - Err(err) => { - record_failure!( - row_count, - format!( - "Failed to encode Arrow IPC for region {}: {:?}", - region_id, err - ) - ); - continue; - } - } - }; - - let request = RegionRequest { - header: Some(RegionRequestHeader { - tracing_context: TracingContext::from_current_span().to_w3c(), - ..Default::default() - }), - body: Some(region_request::Body::BulkInsert(BulkInsertRequest { - region_id: region_id.as_u64(), - partition_expr_version: None, - body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { - schema: schema_bytes, - data_header, - payload, - })), - })), - }; - - let datanode = node_manager.datanode(&datanode).await; - let _timer = PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED - .with_label_values(&["flush_write_region"]) - .start_timer(); - match datanode.handle(request).await { - Ok(_) => { - FLUSH_TOTAL.inc(); - FLUSH_ROWS.observe(row_count as f64); - } - Err(err) => { - record_failure!( - row_count, - format!( - "Bulk insert flush failed for region {}: {:?}", - region_id, err - ) - ); - } - } - } - } + // Physical-table-level flush: transform all logical table batches + // into physical format and write them together. + let physical_table_name = ctx + .extension(PHYSICAL_TABLE_KEY) + .unwrap_or(GREPTIME_PHYSICAL_TABLE) + .to_string(); + flush_batch_physical( + &table_batches, + total_row_count, + &physical_table_name, + &ctx, + &partition_manager, + &node_manager, + &catalog_manager, + &mut first_error, + ) + .await; let elapsed = start.elapsed().as_secs_f64(); FLUSH_ELAPSED.observe(elapsed); - info!( + debug!( "Pending rows batch flushed, total rows: {}, elapsed time: {}s", total_row_count, elapsed ); @@ -826,6 +1149,370 @@ async fn flush_batch( notify_waiters(waiters, &first_error); } +/// Attempts to flush all table batches by transforming them into the physical +/// table format (sparse primary key encoding) and writing directly to the +/// physical data regions. +/// +/// This is the only flush path. Any failure in resolving or transforming the +/// physical flush inputs is recorded as flush failure and reported to waiters. +#[allow(clippy::too_many_arguments)] +async fn flush_batch_physical( + table_batches: &[TableBatch], + total_row_count: usize, + physical_table_name: &str, + ctx: &QueryContextRef, + partition_manager: &PartitionRuleManagerRef, + node_manager: &NodeManagerRef, + catalog_manager: &CatalogManagerRef, + first_error: &mut Option, +) { + macro_rules! record_failure { + ($row_count:expr, $msg:expr) => {{ + let msg = $msg; + if first_error.is_none() { + *first_error = Some(msg.clone()); + } + mark_flush_failure($row_count, &msg); + }}; + } + + // 1. Resolve the physical table and get column ID mapping + let physical_table = { + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_resolve_table"]) + .start_timer(); + match catalog_manager + .table( + ctx.current_catalog(), + &ctx.current_schema(), + physical_table_name, + Some(ctx.as_ref()), + ) + .await + { + Ok(Some(table)) => table, + Ok(None) => { + record_failure!( + total_row_count, + format!( + "Physical table '{}' not found during pending flush", + physical_table_name + ) + ); + return; + } + Err(err) => { + record_failure!( + total_row_count, + format!( + "Failed to resolve physical table '{}' for pending flush: {:?}", + physical_table_name, err + ) + ); + return; + } + } + }; + + let physical_table_info = physical_table.table_info(); + let name_to_ids = match physical_table_info.name_to_ids() { + Some(ids) => ids, + None => { + record_failure!( + total_row_count, + format!( + "Physical table '{}' has no column IDs for pending flush", + physical_table_name + ) + ); + return; + } + }; + + // 2. Get the physical table's partition rule (one lookup instead of N) + let partition_rule = { + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_fetch_partition_rule"]) + .start_timer(); + match partition_manager + .find_table_partition_rule(&physical_table_info) + .await + { + Ok(rule) => rule, + Err(err) => { + record_failure!( + total_row_count, + format!( + "Failed to fetch partition rule for physical table '{}': {:?}", + physical_table_name, err + ) + ); + return; + } + } + }; + let partition_columns = partition_rule.0.partition_columns(); + let partition_columns_set: HashSet<&str> = + partition_columns.iter().map(String::as_str).collect(); + + // 3. Transform each logical table batch into physical format + let mut modified_batches: Vec = Vec::with_capacity(table_batches.len()); + let mut modified_row_count: usize = 0; + + let mut modify_elapsed = Duration::ZERO; + let mut columns_taxonomy_elapsed = Duration::ZERO; + + 'next_table: for table_batch in table_batches { + let table_id = table_batch.table_id; + + // Transform each chunk to physical format directly, avoiding an + // intermediate concat_batches per logical table. + for batch in &table_batch.batches { + // Identify tag columns and non-tag columns from the logical batch schema. + // Chunks within a table_batch may have different schemas if new tag columns + // are added between submits. + // In prom batches, Float64 = value, Timestamp = timestamp, Utf8 = tags. + let batch_schema = batch.schema(); + let start = Instant::now(); + let (tag_columns, essential_col_indices) = match columns_taxonomy( + &batch_schema, + &table_batch.table_name, + &name_to_ids, + &partition_columns_set, + ) { + Ok(columns) => columns, + Err(err) => { + warn!( + "Failed to resolve columns for logical table '{}': {:?}", + table_batch.table_name, err + ); + record_failure!(table_batch.row_count, err.to_string()); + continue 'next_table; + } + }; + + columns_taxonomy_elapsed += start.elapsed(); + if tag_columns.is_empty() && essential_col_indices.is_empty() { + continue; + } + + let modified = { + let start = Instant::now(); + match modify_batch_sparse( + batch.clone(), + table_id, + &tag_columns, + &essential_col_indices, + ) { + Ok(batch) => { + modify_elapsed += start.elapsed(); + batch + } + Err(err) => { + record_failure!( + table_batch.row_count, + format!( + "Failed to modify batch for logical table '{}': {:?}", + table_batch.table_name, err + ) + ); + continue 'next_table; + } + } + }; + + modified_row_count += modified.num_rows(); + modified_batches.push(modified); + } + } + + PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_modify_batch"]) + .observe(modify_elapsed.as_secs_f64()); + PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_columns_taxonomy"]) + .observe(columns_taxonomy_elapsed.as_secs_f64()); + + if modified_batches.is_empty() { + if first_error.is_none() { + record_failure!( + total_row_count, + format!( + "No batches can be transformed for physical table '{}' during pending flush", + physical_table_name + ) + ); + } + return; + } + + // 4. Concatenate all modified batches (all share the same physical schema) + let combined_batch = { + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_concat_all"]) + .start_timer(); + let combined_schema = modified_batches[0].schema(); + // todo(hl): maybe limit max rows to concat. + match concat_batches(&combined_schema, &modified_batches) { + Ok(batch) => batch, + Err(err) => { + record_failure!( + modified_row_count, + format!("Failed to concat modified batches: {:?}", err) + ); + return; + } + } + }; + + // 5. Split by physical partition rule and send to regions + let physical_table_id = physical_table_info.table_id(); + let region_masks = { + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_split_record_batch"]) + .start_timer(); + match partition_rule.0.split_record_batch(&combined_batch) { + Ok(masks) => masks, + Err(err) => { + record_failure!( + total_row_count, + format!( + "Failed to split combined batch for physical table '{}': {:?}", + physical_table_name, err + ) + ); + return; + } + } + }; + + let stripped_batch = if partition_columns.is_empty() { + combined_batch + } else { + // Strip partition columns before encoding and sending requests. + match strip_partition_columns_from_batch(combined_batch) { + Ok(batch) => batch, + Err(err) => { + record_failure!( + total_row_count, + format!( + "Failed to strip partition columns for physical table '{}': {:?}", + physical_table_name, err + ) + ); + return; + } + } + }; + + let mut region_writes = Vec::new(); + for (region_number, mask) in region_masks { + if mask.select_none() { + continue; + } + + let region_batch = if mask.select_all() { + stripped_batch.clone() + } else { + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_filter_record_batch"]) + .start_timer(); + match filter_record_batch(&stripped_batch, mask.array()) { + Ok(batch) => batch, + Err(err) => { + record_failure!( + total_row_count, + format!( + "Failed to filter combined batch for physical table '{}': {:?}", + physical_table_name, err + ) + ); + continue; + } + } + }; + + let row_count = region_batch.num_rows(); + if row_count == 0 { + continue; + } + + let region_id = RegionId::new(physical_table_id, region_number); + let datanode = { + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_resolve_region_leader"]) + .start_timer(); + match partition_manager.find_region_leader(region_id).await { + Ok(peer) => peer, + Err(err) => { + record_failure!( + row_count, + format!( + "Failed to resolve region leader for physical region {}: {:?}", + region_id, err + ) + ); + continue; + } + } + }; + + let (schema_bytes, data_header, payload) = { + let _timer = PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED + .with_label_values(&["flush_physical_encode_ipc"]) + .start_timer(); + match record_batch_to_ipc(region_batch) { + Ok(encoded) => encoded, + Err(err) => { + record_failure!( + row_count, + format!( + "Failed to encode Arrow IPC for physical region {}: {:?}", + region_id, err + ) + ); + continue; + } + } + }; + + let request = RegionRequest { + header: Some(RegionRequestHeader { + tracing_context: TracingContext::from_current_span().to_w3c(), + ..Default::default() + }), + body: Some(region_request::Body::BulkInsert(BulkInsertRequest { + region_id: region_id.as_u64(), + partition_expr_version: None, + body: Some(bulk_insert_request::Body::ArrowIpc(ArrowIpc { + schema: schema_bytes, + data_header, + payload, + })), + })), + }; + + region_writes.push(FlushRegionWrite { + region_id, + row_count, + datanode, + request, + }); + } + + for result in flush_region_writes_concurrently(node_manager.clone(), region_writes).await { + match result { + FlushWriteResult::Success { row_count } => { + FLUSH_TOTAL.inc(); + FLUSH_ROWS.observe(row_count as f64); + } + FlushWriteResult::Failed { row_count, message } => { + record_failure!(row_count, message); + } + } + } +} + fn notify_waiters(waiters: Vec, first_error: &Option) { for waiter in waiters { let result = match first_error { @@ -865,192 +1552,6 @@ fn flush_with_error(batch: &mut PendingBatch, message: &str) { mark_flush_failure(row_count, message); } -fn build_table_batches(requests: RowInsertRequests) -> Result<(Vec<(String, RecordBatch)>, usize)> { - let mut table_batches = Vec::with_capacity(requests.inserts.len()); - let mut total_rows = 0; - - for request in requests.inserts { - let Some(rows) = request.rows else { - continue; - }; - if rows.rows.is_empty() { - continue; - } - - let record_batch = rows_to_record_batch(&rows)?; - total_rows += record_batch.num_rows(); - table_batches.push((request.table_name, record_batch)); - } - - Ok((table_batches, total_rows)) -} - -fn align_record_batch_to_schema( - record_batch: RecordBatch, - target_schema: &ArrowSchema, -) -> Result { - let source_schema = record_batch.schema(); - if source_schema.as_ref() == target_schema { - return Ok(record_batch); - } - - for source_field in source_schema.fields() { - if target_schema - .column_with_name(source_field.name()) - .is_none() - { - return Err(Error::Internal { - err_msg: format!( - "Failed to align record batch schema, column '{}' not found in target schema", - source_field.name() - ), - }); - } - } - - let row_count = record_batch.num_rows(); - let mut columns = Vec::with_capacity(target_schema.fields().len()); - for target_field in target_schema.fields() { - let column = if let Some((index, source_field)) = - source_schema.column_with_name(target_field.name()) - { - let source_column = record_batch.column(index).clone(); - if source_field.data_type() == target_field.data_type() { - source_column - } else { - cast(source_column.as_ref(), target_field.data_type()).map_err(|err| { - Error::Internal { - err_msg: format!( - "Failed to cast column '{}' to target type {:?}: {}", - target_field.name(), - target_field.data_type(), - err - ), - } - })? - } - } else { - new_null_array(target_field.data_type(), row_count) - }; - columns.push(column); - } - - RecordBatch::try_new(Arc::new(target_schema.clone()), columns).map_err(|err| Error::Internal { - err_msg: format!("Failed to build aligned record batch: {}", err), - }) -} - -fn rows_to_record_batch(rows: &Rows) -> Result { - let row_count = rows.rows.len(); - let column_count = rows.schema.len(); - - for (idx, row) in rows.rows.iter().enumerate() { - ensure!( - row.values.len() == column_count, - error::InternalSnafu { - err_msg: format!( - "Column count mismatch in row {}, expected {}, got {}", - idx, - column_count, - row.values.len() - ) - } - ); - } - - let mut fields = Vec::with_capacity(column_count); - let mut columns = Vec::with_capacity(column_count); - - for (idx, column_schema) in rows.schema.iter().enumerate() { - let datatype_wrapper = ColumnDataTypeWrapper::try_new( - column_schema.datatype, - column_schema.datatype_extension.clone(), - )?; - let data_type = ConcreteDataType::from(datatype_wrapper); - fields.push(Field::new( - column_schema.column_name.clone(), - data_type.as_arrow_type(), - true, - )); - columns.push(build_arrow_array( - rows, - idx, - &column_schema.column_name, - data_type.as_arrow_type(), - row_count, - )?); - } - - RecordBatch::try_new(Arc::new(ArrowSchema::new(fields)), columns).context(error::ArrowSnafu) -} - -fn build_arrow_array( - rows: &Rows, - col_idx: usize, - column_name: &String, - column_data_type: arrow::datatypes::DataType, - row_count: usize, -) -> Result { - macro_rules! build_array { - ($builder:expr, $( $pattern:pat => $value:expr ),+ $(,)?) => {{ - let mut builder = $builder; - for row in &rows.rows { - match row.values[col_idx].value_data.as_ref() { - $(Some($pattern) => builder.append_value($value),)+ - Some(v) => { - return error::InvalidPromRemoteRequestSnafu { - msg: format!("Unexpected value: {:?}", v), - } - .fail(); - } - None => builder.append_null(), - } - } - Arc::new(builder.finish()) as ArrayRef - }}; - } - - let array: ArrayRef = match column_data_type { - arrow::datatypes::DataType::Float64 => { - build_array!(Float64Builder::with_capacity(row_count), ValueData::F64Value(v) => *v) - } - arrow::datatypes::DataType::Utf8 => build_array!( - StringBuilder::with_capacity(row_count, 0), - ValueData::StringValue(v) => v - ), - arrow::datatypes::DataType::Timestamp(u, _) => match u { - TimeUnit::Second => build_array!( - TimestampSecondBuilder::with_capacity(row_count), - ValueData::TimestampSecondValue(v) => *v - ), - TimeUnit::Millisecond => build_array!( - TimestampMillisecondBuilder::with_capacity(row_count), - ValueData::TimestampMillisecondValue(v) => *v - ), - TimeUnit::Microsecond => build_array!( - TimestampMicrosecondBuilder::with_capacity(row_count), - ValueData::DatetimeValue(v) => *v, - ValueData::TimestampMicrosecondValue(v) => *v - ), - TimeUnit::Nanosecond => build_array!( - TimestampNanosecondBuilder::with_capacity(row_count), - ValueData::TimestampNanosecondValue(v) => *v - ), - }, - ty => { - return error::InvalidPromRemoteRequestSnafu { - msg: format!( - "Unexpected column type {:?}, column name: {}", - ty, column_name - ), - } - .fail(); - } - }; - - Ok(array) -} - fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes)> { let mut encoder = FlightEncoder::default(); let schema = encoder.encode_schema(record_batch.schema().as_ref()); @@ -1077,132 +1578,154 @@ fn record_batch_to_ipc(record_batch: RecordBatch) -> Result<(Bytes, Bytes, Bytes #[cfg(test)] mod tests { + use std::collections::{HashMap, HashSet}; use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::time::Duration; - use api::v1::value::ValueData; - use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; - use arrow::array::{Array, Float64Array, Int32Array, Int64Array, StringArray}; - use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use api::region::RegionResponse; + use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse}; + use api::v1::meta::Peer; + use api::v1::region::{InsertRequests, RegionRequest}; + use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows}; + use arrow::array::{BinaryArray, StringArray, TimestampMillisecondArray}; + use arrow::datatypes::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; use arrow::record_batch::RecordBatch; + use async_trait::async_trait; + use common_meta::error::Result as MetaResult; + use common_meta::node_manager::{ + Datanode, DatanodeManager, DatanodeRef, Flownode, FlownodeManager, FlownodeRef, + }; + use common_query::request::QueryRequest; + use common_recordbatch::SendableRecordBatchStream; use dashmap::DashMap; + use smallvec::SmallVec; + use store_api::storage::RegionId; use tokio::sync::mpsc; + use tokio::time::sleep; use super::{ - BatchKey, PendingWorker, WorkerCommand, align_record_batch_to_schema, - remove_worker_if_same_channel, rows_to_record_batch, should_close_worker_on_idle_timeout, + BatchKey, Error, FlushRegionWrite, FlushWriteResult, PendingRowsBatcher, PendingWorker, + WorkerCommand, columns_taxonomy, flush_region_writes_concurrently, + remove_worker_if_same_channel, should_close_worker_on_idle_timeout, + should_dispatch_concurrently, strip_partition_columns_from_batch, }; + fn mock_rows(row_count: usize, schema_name: &str) -> Rows { + Rows { + schema: vec![ColumnSchema { + column_name: schema_name.to_string(), + ..Default::default() + }], + rows: (0..row_count).map(|_| Row { values: vec![] }).collect(), + } + } + #[test] - fn test_rows_to_record_batch() { - let rows = Rows { - schema: vec![ - ColumnSchema { - column_name: "ts".to_string(), - datatype: ColumnDataType::TimestampMillisecond as i32, - semantic_type: SemanticType::Timestamp as i32, - ..Default::default() + fn test_collect_non_empty_table_rows_filters_empty_payloads() { + let requests = RowInsertRequests { + inserts: vec![ + RowInsertRequest { + table_name: "cpu".to_string(), + rows: Some(mock_rows(2, "host")), }, - ColumnSchema { - column_name: "value".to_string(), - datatype: ColumnDataType::Float64 as i32, - semantic_type: SemanticType::Field as i32, - ..Default::default() + RowInsertRequest { + table_name: "mem".to_string(), + rows: Some(mock_rows(0, "host")), }, - ColumnSchema { - column_name: "host".to_string(), - datatype: ColumnDataType::String as i32, - semantic_type: SemanticType::Tag as i32, - ..Default::default() - }, - ], - rows: vec![ - Row { - values: vec![ - Value { - value_data: Some(ValueData::TimestampMillisecondValue(1000)), - }, - Value { - value_data: Some(ValueData::F64Value(42.0)), - }, - Value { - value_data: Some(ValueData::StringValue("h1".to_string())), - }, - ], - }, - Row { - values: vec![ - Value { - value_data: Some(ValueData::TimestampMillisecondValue(2000)), - }, - Value { value_data: None }, - Value { - value_data: Some(ValueData::StringValue("h2".to_string())), - }, - ], + RowInsertRequest { + table_name: "disk".to_string(), + rows: None, }, ], }; - let rb = rows_to_record_batch(&rows).unwrap(); - assert_eq!(2, rb.num_rows()); - assert_eq!(3, rb.num_columns()); + let (table_rows, total_rows) = PendingRowsBatcher::collect_non_empty_table_rows(requests); + + assert_eq!(2, total_rows); + assert_eq!(1, table_rows.len()); + assert_eq!("cpu", table_rows[0].0); + assert_eq!(2, table_rows[0].1.rows.len()); } - #[test] - fn test_align_record_batch_to_schema_reorder_and_fill_missing() { - let source_schema = Arc::new(ArrowSchema::new(vec![ - Field::new("host", DataType::Utf8, true), - Field::new("value", DataType::Float64, true), - ])); - let source = RecordBatch::try_new( - source_schema, - vec![ - Arc::new(StringArray::from(vec!["h1"])), - Arc::new(Float64Array::from(vec![42.0])), - ], - ) - .unwrap(); - - let target = ArrowSchema::new(vec![ - Field::new("ts", DataType::Int64, true), - Field::new("host", DataType::Utf8, true), - Field::new("value", DataType::Float64, true), - ]); - - let aligned = align_record_batch_to_schema(source, &target).unwrap(); - assert_eq!(aligned.schema().as_ref(), &target); - assert_eq!(1, aligned.num_rows()); - assert_eq!(3, aligned.num_columns()); - let ts = aligned - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert!(ts.is_null(0)); + #[derive(Clone)] + struct ConcurrentMockDatanode { + delay: Duration, + inflight: Arc, + max_inflight: Arc, } - #[test] - fn test_align_record_batch_to_schema_cast_column_type() { - let source_schema = Arc::new(ArrowSchema::new(vec![Field::new( - "value", - DataType::Int32, - true, - )])); - let source = RecordBatch::try_new( - source_schema, - vec![Arc::new(Int32Array::from(vec![Some(7), None]))], - ) - .unwrap(); + #[async_trait] + impl Datanode for ConcurrentMockDatanode { + async fn handle(&self, _request: RegionRequest) -> MetaResult { + let now = self.inflight.fetch_add(1, Ordering::SeqCst) + 1; + loop { + let max = self.max_inflight.load(Ordering::SeqCst); + if now <= max { + break; + } + if self + .max_inflight + .compare_exchange(max, now, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + break; + } + } - let target = ArrowSchema::new(vec![Field::new("value", DataType::Int64, true)]); - let aligned = align_record_batch_to_schema(source, &target).unwrap(); - let value = aligned - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(Some(7), value.iter().next().flatten()); - assert!(value.is_null(1)); + sleep(self.delay).await; + self.inflight.fetch_sub(1, Ordering::SeqCst); + Ok(RegionResponse::new(0)) + } + + async fn handle_query( + &self, + _request: QueryRequest, + ) -> MetaResult { + unimplemented!() + } + } + + #[derive(Clone)] + struct ConcurrentMockNodeManager { + datanodes: Arc>, + } + + #[async_trait] + impl DatanodeManager for ConcurrentMockNodeManager { + async fn datanode(&self, node: &Peer) -> DatanodeRef { + self.datanodes + .get(&node.id) + .expect("datanode not found") + .clone() + } + } + + struct NoopFlownode; + + #[async_trait] + impl Flownode for NoopFlownode { + async fn handle(&self, _request: FlowRequest) -> MetaResult { + unimplemented!() + } + + async fn handle_inserts(&self, _request: InsertRequests) -> MetaResult { + unimplemented!() + } + + async fn handle_mark_window_dirty( + &self, + _req: DirtyWindowRequests, + ) -> MetaResult { + unimplemented!() + } + } + + #[async_trait] + impl FlownodeManager for ConcurrentMockNodeManager { + async fn flownode(&self, _node: &Peer) -> FlownodeRef { + Arc::new(NoopFlownode) + } } #[test] @@ -1250,4 +1773,339 @@ mod tests { assert!(!should_close_worker_on_idle_timeout(1, 0)); assert!(!should_close_worker_on_idle_timeout(0, 1)); } + + #[tokio::test] + async fn test_flush_region_writes_concurrently_dispatches_multiple_datanodes() { + let inflight = Arc::new(AtomicUsize::new(0)); + let max_inflight = Arc::new(AtomicUsize::new(0)); + let datanode1: DatanodeRef = Arc::new(ConcurrentMockDatanode { + delay: Duration::from_millis(100), + inflight: inflight.clone(), + max_inflight: max_inflight.clone(), + }); + let datanode2: DatanodeRef = Arc::new(ConcurrentMockDatanode { + delay: Duration::from_millis(100), + inflight, + max_inflight: max_inflight.clone(), + }); + + let mut datanodes = HashMap::new(); + datanodes.insert(1, datanode1); + datanodes.insert(2, datanode2); + let node_manager = Arc::new(ConcurrentMockNodeManager { + datanodes: Arc::new(datanodes), + }); + + let writes = vec![ + FlushRegionWrite { + region_id: RegionId::new(1024, 1), + row_count: 10, + datanode: Peer { + id: 1, + addr: "node1".to_string(), + }, + request: RegionRequest::default(), + }, + FlushRegionWrite { + region_id: RegionId::new(1024, 2), + row_count: 12, + datanode: Peer { + id: 2, + addr: "node2".to_string(), + }, + request: RegionRequest::default(), + }, + ]; + + let results = flush_region_writes_concurrently(node_manager, writes).await; + assert_eq!(2, results.len()); + assert!( + results + .iter() + .all(|result| matches!(result, FlushWriteResult::Success { .. })) + ); + assert!(max_inflight.load(Ordering::SeqCst) >= 2); + } + + #[test] + fn test_should_dispatch_concurrently_by_region_count() { + assert!(!should_dispatch_concurrently(0)); + assert!(!should_dispatch_concurrently(1)); + assert!(should_dispatch_concurrently(2)); + } + + #[test] + fn test_strip_partition_columns_from_batch_removes_partition_tags() { + let batch = RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![ + Field::new("__primary_key", ArrowDataType::Binary, false), + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new("host", ArrowDataType::Utf8, true), + ])), + vec![ + Arc::new(BinaryArray::from(vec![b"k1".as_slice()])), + Arc::new(TimestampMillisecondArray::from(vec![1000_i64])), + Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])), + Arc::new(StringArray::from(vec!["node-1"])), + ], + ) + .unwrap(); + + let stripped = strip_partition_columns_from_batch(batch).unwrap(); + + assert_eq!(3, stripped.num_columns()); + assert_eq!("__primary_key", stripped.schema().field(0).name()); + assert_eq!("greptime_timestamp", stripped.schema().field(1).name()); + assert_eq!("greptime_value", stripped.schema().field(2).name()); + } + + #[test] + fn test_strip_partition_columns_from_batch_projects_essential_columns_without_lookup() { + let batch = RecordBatch::try_new( + Arc::new(ArrowSchema::new(vec![ + Field::new("__primary_key", ArrowDataType::Binary, false), + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new("host", ArrowDataType::Utf8, true), + ])), + vec![ + Arc::new(BinaryArray::from(vec![b"k1".as_slice()])), + Arc::new(TimestampMillisecondArray::from(vec![1000_i64])), + Arc::new(arrow::array::Float64Array::from(vec![42.0_f64])), + Arc::new(StringArray::from(vec!["node-1"])), + ], + ) + .unwrap(); + + let stripped = strip_partition_columns_from_batch(batch).unwrap(); + + assert_eq!(3, stripped.num_columns()); + assert_eq!("__primary_key", stripped.schema().field(0).name()); + assert_eq!("greptime_timestamp", stripped.schema().field(1).name()); + assert_eq!("greptime_value", stripped.schema().field(2).name()); + } + + #[test] + fn test_collect_tag_columns_and_non_tag_indices_keeps_partition_tag_column() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new("host", ArrowDataType::Utf8, true), + Field::new("region", ArrowDataType::Utf8, true), + ])); + let name_to_ids = + HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]); + let partition_columns = HashSet::from(["host"]); + + let (tag_columns, non_tag_indices) = + columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap(); + + assert_eq!(2, tag_columns.len()); + assert_eq!(&[0, 1, 2], non_tag_indices.as_slice()); + } + + #[test] + fn test_collect_tag_columns_and_non_tag_indices_prioritizes_essential_columns() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("host", ArrowDataType::Utf8, true), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("region", ArrowDataType::Utf8, true), + ])); + let name_to_ids = + HashMap::from([("host".to_string(), 1_u32), ("region".to_string(), 2_u32)]); + let partition_columns = HashSet::from(["host", "region"]); + + let (_tag_columns, non_tag_indices): (_, SmallVec<[usize; 3]>) = + columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns).unwrap(); + + assert_eq!(&[2, 1, 0, 3], non_tag_indices.as_slice()); + } + + #[test] + fn test_collect_tag_columns_and_non_tag_indices_rejects_unexpected_data_type() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new("host", ArrowDataType::Utf8, true), + Field::new("invalid", ArrowDataType::Boolean, true), + ])); + let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]); + let partition_columns = HashSet::from(["host"]); + + let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns); + + assert!(matches!( + result, + Err(Error::InvalidPromRemoteRequest { .. }) + )); + } + + #[test] + fn test_collect_tag_columns_and_non_tag_indices_rejects_int64_timestamp_column() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("greptime_timestamp", ArrowDataType::Int64, false), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new("host", ArrowDataType::Utf8, true), + ])); + let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]); + let partition_columns = HashSet::from(["host"]); + + let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns); + + assert!(matches!( + result, + Err(Error::InvalidPromRemoteRequest { .. }) + )); + } + + #[test] + fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_timestamp_column() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "ts1", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new( + "ts2", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new("host", ArrowDataType::Utf8, true), + ])); + let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]); + let partition_columns = HashSet::from(["host"]); + + let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns); + + assert!(matches!( + result, + Err(Error::InvalidPromRemoteRequest { .. }) + )); + } + + #[test] + fn test_collect_tag_columns_and_non_tag_indices_rejects_duplicated_value_column() { + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("value1", ArrowDataType::Float64, true), + Field::new("value2", ArrowDataType::Float64, true), + Field::new("host", ArrowDataType::Utf8, true), + ])); + let name_to_ids = HashMap::from([("host".to_string(), 1_u32)]); + let partition_columns = HashSet::from(["host"]); + + let result = columns_taxonomy(&schema, "cpu", &name_to_ids, &partition_columns); + + assert!(matches!( + result, + Err(Error::InvalidPromRemoteRequest { .. }) + )); + } + + #[test] + fn test_modify_batch_sparse_with_taxonomy_per_batch() { + use arrow::array::BinaryArray; + use metric_engine::batch_modifier::modify_batch_sparse; + + let schema1 = Arc::new(ArrowSchema::new(vec![ + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new("tag1", ArrowDataType::Utf8, true), + ])); + + let schema2 = Arc::new(ArrowSchema::new(vec![ + Field::new( + "greptime_timestamp", + ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None), + false, + ), + Field::new("greptime_value", ArrowDataType::Float64, true), + Field::new("tag1", ArrowDataType::Utf8, true), + Field::new("tag2", ArrowDataType::Utf8, true), + ])); + let batch2 = RecordBatch::try_new( + schema2.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![2000])), + Arc::new(arrow::array::Float64Array::from(vec![2.0])), + Arc::new(StringArray::from(vec!["v1"])), + Arc::new(StringArray::from(vec!["v2"])), + ], + ) + .unwrap(); + + let name_to_ids = HashMap::from([("tag1".to_string(), 1), ("tag2".to_string(), 2)]); + let partition_columns = HashSet::new(); + + // A batch that only has tag1, same values as batch2 for ts and val. + let batch3 = RecordBatch::try_new( + schema1.clone(), + vec![ + Arc::new(TimestampMillisecondArray::from(vec![2000])), + Arc::new(arrow::array::Float64Array::from(vec![2.0])), + Arc::new(StringArray::from(vec!["v1"])), + ], + ) + .unwrap(); + + // Simulate the new loop logic in flush_batch_physical: + // Resolve taxonomy FOR EACH BATCH. + let (tag_columns2, indices2) = + columns_taxonomy(&batch2.schema(), "table", &name_to_ids, &partition_columns).unwrap(); + let modified2 = modify_batch_sparse(batch2, 123, &tag_columns2, &indices2).unwrap(); + + let (tag_columns3, indices3) = + columns_taxonomy(&batch3.schema(), "table", &name_to_ids, &partition_columns).unwrap(); + let modified3 = modify_batch_sparse(batch3, 123, &tag_columns3, &indices3).unwrap(); + + let pk2 = modified2 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let pk3 = modified3 + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + + // Now they SHOULD be different because tag2 is included in pk2 but not in pk3. + assert_ne!( + pk2.value(0), + pk3.value(0), + "PK should be different because batch2 has tag2!" + ); + } } diff --git a/src/servers/src/pipeline.rs b/src/servers/src/pipeline.rs index fe7a2f48f3..9de2d63b97 100644 --- a/src/servers/src/pipeline.rs +++ b/src/servers/src/pipeline.rs @@ -28,7 +28,7 @@ use session::context::{Channel, QueryContextRef}; use snafu::ResultExt; use vrl::value::Value as VrlValue; -use crate::error::{CatalogSnafu, PipelineSnafu, Result}; +use crate::error::{PipelineSnafu, Result}; use crate::http::event::PipelineIngestRequest; use crate::metrics::{ METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE, @@ -89,10 +89,7 @@ async fn run_identity_pipeline( let table = if pipeline_ctx.channel == Channel::Prometheus { None } else { - handler - .get_table(&table_name, query_ctx) - .await - .context(CatalogSnafu)? + handler.get_table(&table_name, query_ctx).await? }; identity_pipeline(data_array, table, pipeline_ctx) .map(|opt_map| ContextReq::from_opt_map(opt_map, table_name)) @@ -141,10 +138,7 @@ async fn run_custom_pipeline( } }; - let table = handler - .get_table(&table_name, query_ctx) - .await - .context(CatalogSnafu)?; + let table = handler.get_table(&table_name, query_ctx).await?; schema_info.set_table(table); for pipeline_map in pipeline_maps { diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs new file mode 100644 index 0000000000..0fddc0938a --- /dev/null +++ b/src/servers/src/prom_row_builder.rs @@ -0,0 +1,557 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Prometheus row-level helpers for converting proto `Rows` into Arrow +//! `RecordBatch`es and aligning / normalizing their schemas against +//! existing table schemas in the catalog. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use api::helper::ColumnDataTypeWrapper; +use api::v1::value::ValueData; +use api::v1::{ColumnSchema, Rows, SemanticType}; +use arrow::array::{ + ArrayRef, Float64Builder, StringBuilder, TimestampMicrosecondBuilder, + TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder, + new_null_array, +}; +use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema}; +use arrow::record_batch::RecordBatch; +use arrow_schema::TimeUnit; +use common_query::prelude::{greptime_timestamp, greptime_value}; +use datatypes::data_type::DataType; +use datatypes::prelude::ConcreteDataType; +use snafu::{OptionExt, ResultExt, ensure}; + +use crate::error; +use crate::error::Result; + +/// Extract timestamp, field, and tag column names from a logical region schema. +fn unzip_logical_region_schema( + target_schema: &ArrowSchema, +) -> Result<(String, String, HashSet)> { + let mut timestamp_column = None; + let mut field_column = None; + let mut tag_columns = HashSet::with_capacity(target_schema.fields.len().saturating_sub(2)); + for field in target_schema.fields() { + if field.name() == greptime_timestamp() { + timestamp_column = Some(field.name().clone()); + continue; + } + + if field.name() == greptime_value() { + field_column = Some(field.name().clone()); + continue; + } + + if timestamp_column.is_none() && matches!(field.data_type(), ArrowDataType::Timestamp(_, _)) + { + timestamp_column = Some(field.name().clone()); + continue; + } + + if field_column.is_none() && matches!(field.data_type(), ArrowDataType::Float64) { + field_column = Some(field.name().clone()); + continue; + } + tag_columns.insert(field.name().clone()); + } + + let timestamp_column = timestamp_column.with_context(|| error::UnexpectedResultSnafu { + reason: "Failed to locate timestamp column in target schema".to_string(), + })?; + let field_column = field_column.with_context(|| error::UnexpectedResultSnafu { + reason: "Failed to locate field column in target schema".to_string(), + })?; + + Ok((timestamp_column, field_column, tag_columns)) +} + +/// Directly converts proto `Rows` into a `RecordBatch` aligned to the given +/// `target_schema`, handling Prometheus column renaming (timestamp/value), +/// reordering, type casting, and null-filling in a single pass. +pub(crate) fn rows_to_aligned_record_batch( + rows: &Rows, + target_schema: &ArrowSchema, +) -> Result { + let row_count = rows.rows.len(); + let column_count = rows.schema.len(); + + for (idx, row) in rows.rows.iter().enumerate() { + ensure!( + row.values.len() == column_count, + error::InternalSnafu { + err_msg: format!( + "Column count mismatch in row {}, expected {}, got {}", + idx, + column_count, + row.values.len() + ) + } + ); + } + + let (target_ts_name, target_field_name, _target_tags) = + unzip_logical_region_schema(target_schema)?; + + // Map effective target column name → (source column index, source arrow type). + // Handles prom renames: Timestamp → target ts name, Float64 → target field name. + let mut source_map: HashMap<&str, (usize, ArrowDataType)> = + HashMap::with_capacity(rows.schema.len()); + + for (src_idx, col) in rows.schema.iter().enumerate() { + let wrapper = ColumnDataTypeWrapper::try_new(col.datatype, col.datatype_extension.clone())?; + let src_arrow_type = ConcreteDataType::from(wrapper).as_arrow_type(); + + match &src_arrow_type { + ArrowDataType::Float64 => { + source_map.insert(&target_field_name, (src_idx, src_arrow_type)); + } + ArrowDataType::Timestamp(unit, _) => { + ensure!( + unit == &TimeUnit::Millisecond, + error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Unexpected remote write batch timestamp unit, expect millisecond, got: {}", + unit + ) + } + ); + source_map.insert(&target_ts_name, (src_idx, src_arrow_type)); + } + ArrowDataType::Utf8 => { + source_map.insert(&col.column_name, (src_idx, src_arrow_type)); + } + other => { + return error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Unexpected remote write batch field type {}, field name: {}", + other, col.column_name + ), + } + .fail(); + } + } + } + + // Build columns in target schema order + let mut columns = Vec::with_capacity(target_schema.fields().len()); + for target_field in target_schema.fields() { + if let Some((src_idx, src_arrow_type)) = source_map.get(target_field.name().as_str()) { + let array = build_arrow_array( + rows, + *src_idx, + &rows.schema[*src_idx].column_name, + src_arrow_type.clone(), + row_count, + )?; + columns.push(array); + } else { + columns.push(new_null_array(target_field.data_type(), row_count)); + } + } + + let batch = RecordBatch::try_new(Arc::new(target_schema.clone()), columns) + .context(error::ArrowSnafu)?; + Ok(batch) +} + +/// Identify tag columns in the proto `rows_schema` that are absent from the +/// target region schema, without building an intermediate `RecordBatch`. +pub(crate) fn identify_missing_columns_from_proto( + rows_schema: &[ColumnSchema], + target_schema: &ArrowSchema, +) -> Result> { + let (_, _, target_tags) = unzip_logical_region_schema(target_schema)?; + let mut missing = Vec::new(); + for col in rows_schema { + let wrapper = ColumnDataTypeWrapper::try_new(col.datatype, col.datatype_extension.clone())?; + let arrow_type = ConcreteDataType::from(wrapper).as_arrow_type(); + if matches!(arrow_type, ArrowDataType::Utf8) + && !target_tags.contains(&col.column_name) + && target_schema.column_with_name(&col.column_name).is_none() + { + missing.push(col.column_name.clone()); + } + } + Ok(missing) +} + +/// Build a `Vec` suitable for creating a new Prometheus logical table +/// directly from the proto `rows.schema`, avoiding the round-trip through Arrow schema. +pub fn build_prom_create_table_schema_from_proto( + rows_schema: &[ColumnSchema], +) -> Result> { + rows_schema + .iter() + .map(|col| { + let semantic_type = if col.datatype == api::v1::ColumnDataType::TimestampMillisecond as i32 { + SemanticType::Timestamp + } else if col.datatype == api::v1::ColumnDataType::Float64 as i32 { + SemanticType::Field + } else { + // tag columns must be String type + ensure!(col.datatype == api::v1::ColumnDataType::String as i32, error::InvalidPromRemoteRequestSnafu{ + msg: format!( + "Failed to build create table schema, tag column '{}' must be String but got datatype {}", + col.column_name, col.datatype + ) + }); + SemanticType::Tag + }; + + Ok(ColumnSchema { + column_name: col.column_name.clone(), + datatype: col.datatype, + semantic_type: semantic_type as i32, + datatype_extension: col.datatype_extension.clone(), + options: None, + }) + }) + .collect() +} + +/// Build a single Arrow array for the given column index from proto `Rows`. +fn build_arrow_array( + rows: &Rows, + col_idx: usize, + column_name: &String, + column_data_type: arrow::datatypes::DataType, + row_count: usize, +) -> Result { + macro_rules! build_array { + ($builder:expr, $( $pattern:pat => $value:expr ),+ $(,)?) => {{ + let mut builder = $builder; + for row in &rows.rows { + match row.values[col_idx].value_data.as_ref() { + $(Some($pattern) => builder.append_value($value),)+ + Some(v) => { + return error::InvalidPromRemoteRequestSnafu { + msg: format!("Unexpected value: {:?}", v), + } + .fail(); + } + None => builder.append_null(), + } + } + Arc::new(builder.finish()) as ArrayRef + }}; + } + + let array: ArrayRef = match column_data_type { + arrow::datatypes::DataType::Float64 => { + build_array!(Float64Builder::with_capacity(row_count), ValueData::F64Value(v) => *v) + } + arrow::datatypes::DataType::Utf8 => build_array!( + StringBuilder::with_capacity(row_count, 0), + ValueData::StringValue(v) => v + ), + arrow::datatypes::DataType::Timestamp(u, _) => match u { + TimeUnit::Second => build_array!( + TimestampSecondBuilder::with_capacity(row_count), + ValueData::TimestampSecondValue(v) => *v + ), + TimeUnit::Millisecond => build_array!( + TimestampMillisecondBuilder::with_capacity(row_count), + ValueData::TimestampMillisecondValue(v) => *v + ), + TimeUnit::Microsecond => build_array!( + TimestampMicrosecondBuilder::with_capacity(row_count), + ValueData::DatetimeValue(v) => *v, + ValueData::TimestampMicrosecondValue(v) => *v + ), + TimeUnit::Nanosecond => build_array!( + TimestampNanosecondBuilder::with_capacity(row_count), + ValueData::TimestampNanosecondValue(v) => *v + ), + }, + ty => { + return error::InvalidPromRemoteRequestSnafu { + msg: format!( + "Unexpected column type {:?}, column name: {}", + ty, column_name + ), + } + .fail(); + } + }; + + Ok(array) +} + +#[cfg(test)] +mod tests { + use api::v1::value::ValueData; + use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; + use arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray}; + use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}; + + use super::{ + build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto, + rows_to_aligned_record_batch, + }; + + #[test] + fn test_rows_to_aligned_record_batch_renames_and_reorders() { + let rows = Rows { + schema: vec![ + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "host".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "greptime_value".to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ], + rows: vec![ + Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(1000)), + }, + Value { + value_data: Some(ValueData::StringValue("h1".to_string())), + }, + Value { + value_data: Some(ValueData::F64Value(42.0)), + }, + ], + }, + Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(2000)), + }, + Value { + value_data: Some(ValueData::StringValue("h2".to_string())), + }, + Value { + value_data: Some(ValueData::F64Value(99.0)), + }, + ], + }, + ], + }; + + // Target schema has renamed columns and different ordering. + let target = ArrowSchema::new(vec![ + Field::new( + "my_ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("host", DataType::Utf8, true), + Field::new("my_value", DataType::Float64, true), + ]); + + let batch = rows_to_aligned_record_batch(&rows, &target).unwrap(); + assert_eq!(batch.schema().as_ref(), &target); + assert_eq!(2, batch.num_rows()); + assert_eq!(3, batch.num_columns()); + + let ts = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ts.value(0), 1000); + assert_eq!(ts.value(1), 2000); + + let hosts = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(hosts.value(0), "h1"); + assert_eq!(hosts.value(1), "h2"); + + let values = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(values.value(0), 42.0); + assert_eq!(values.value(1), 99.0); + } + + #[test] + fn test_rows_to_aligned_record_batch_fills_nulls() { + let rows = Rows { + schema: vec![ + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "host".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "instance".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "greptime_value".to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ], + rows: vec![Row { + values: vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(1000)), + }, + Value { + value_data: Some(ValueData::StringValue("h1".to_string())), + }, + Value { + value_data: Some(ValueData::StringValue("i1".to_string())), + }, + Value { + value_data: Some(ValueData::F64Value(1.0)), + }, + ], + }], + }; + + // Target schema has "host" but not "instance"; also has "region" which is missing from source. + let target = ArrowSchema::new(vec![ + Field::new( + "my_ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("host", DataType::Utf8, true), + Field::new("region", DataType::Utf8, true), + Field::new("my_value", DataType::Float64, true), + ]); + + let batch = rows_to_aligned_record_batch(&rows, &target).unwrap(); + assert_eq!(batch.schema().as_ref(), &target); + assert_eq!(1, batch.num_rows()); + assert_eq!(4, batch.num_columns()); + + // "region" column should be null-filled. + let region = batch + .column(2) + .as_any() + .downcast_ref::() + .unwrap(); + assert!(region.is_null(0)); + } + + #[test] + fn test_identify_missing_columns_from_proto() { + let rows_schema = vec![ + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "host".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "instance".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "greptime_value".to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ]; + + let target = ArrowSchema::new(vec![ + Field::new( + "my_ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("host", DataType::Utf8, true), + Field::new("my_value", DataType::Float64, true), + ]); + + let missing = identify_missing_columns_from_proto(&rows_schema, &target).unwrap(); + assert_eq!(missing, vec!["instance".to_string()]); + } + + #[test] + fn test_build_prom_create_table_schema_from_proto() { + let rows_schema = vec![ + ColumnSchema { + column_name: "greptime_timestamp".to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "job".to_string(), + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + ..Default::default() + }, + ColumnSchema { + column_name: "greptime_value".to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + ..Default::default() + }, + ]; + + let schema = build_prom_create_table_schema_from_proto(&rows_schema).unwrap(); + assert_eq!(3, schema.len()); + + assert_eq!("greptime_timestamp", schema[0].column_name); + assert_eq!(SemanticType::Timestamp as i32, schema[0].semantic_type); + assert_eq!( + ColumnDataType::TimestampMillisecond as i32, + schema[0].datatype + ); + + assert_eq!("job", schema[1].column_name); + assert_eq!(SemanticType::Tag as i32, schema[1].semantic_type); + assert_eq!(ColumnDataType::String as i32, schema[1].datatype); + + assert_eq!("greptime_value", schema[2].column_name); + assert_eq!(SemanticType::Field as i32, schema[2].semantic_type); + assert_eq!(ColumnDataType::Float64 as i32, schema[2].datatype); + } +}