mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-16 13:00:40 +00:00
feat: auto-align Prometheus schemas in pending rows batching (#7877)
* feat/auto-schema-align: - **Error Handling Improvements**: - Removed `CatalogSnafu` context from various `.await` calls in `dashboard.rs`, `influxdb.rs`, `jaeger.rs`, `prometheus.rs`, `event.rs`, and `pipeline.rs` to streamline error handling. - **Prometheus Store Enhancements**: - Added support for auto-creating tables and adding missing Prometheus tag columns in `prom_store.rs` and `pending_rows_batcher.rs`. - Introduced `PendingRowsSchemaAlterer` trait for schema alterations in `pending_rows_batcher.rs`. - **Test Additions**: - Added tests for new Prometheus store functionalities in `prom_store.rs` and `pending_rows_batcher.rs`. - **Error Message Improvements**: - Enhanced error messages for catalog access in `error.rs`. - **Server Configuration Updates**: - Updated server configuration to include Prometheus store options in `server.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * reformat Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Add DataTypes Error Handling and Column Renaming Logic - **`error.rs`**: Introduced a new `DataTypes` error variant to handle errors from `datatypes::error::Error`. Updated `ErrorExt` implementation to include `DataTypes`. - **`pending_rows_batcher.rs`**: Added functions `find_prom_special_column_names` and `rename_prom_special_columns_for_existing_schema` to handle renaming of special Prometheus columns. Updated `build_prom_create_table_schema` to simplify error handling with `ConcreteDataType`. - **Tests**: Added a test case `test_rename_prom_special_columns_for_existing_schema` to verify the renaming logic for Prometheus special columns. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: - Refactored `PendingRowsBatcher` to accommodate Prometheus record batches: - Introduced `accommodate_record_batch_for_target_schema` to normalize incoming record batches against existing table schemas. - Removed `collect_missing_prom_tag_columns` and `rename_prom_special_columns_for_existing_schema` in favor of the new function. - Added `unzip_logical_region_schema` to extract schema components. - Updated tests in `pending_rows_batcher.rs`: - Added tests for `accommodate_record_batch_for_target_schema` to verify handling of missing tag columns and renaming of special columns. - Ensured error handling for missing timestamp and field columns in target schema. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Commit Summary - **Enhancement in Table Creation Logic**: Updated `prom_store.rs` to modify the handling of `table_options` during table creation. Specifically, `table_options` are now extended differently based on the `AutoCreateTableType`. For `Physical` tables, enforced `sst_format=flat` to optimize pending-rows writes by leveraging bulk memtables. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: Enhance Performance Monitoring in `pending_rows_batcher.rs` - Added performance monitoring timers to various stages of the `PendingRowsBatcher` process, including schema cache checks, table resolution, schema creation, and record batch alignment. - Improved schema handling by adding timers around schema alteration and missing column addition processes. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: - **Enhance Concurrent Write Handling**: Introduced `FlushRegionWrite` and `FlushWriteResult` structs to manage region writes and their results. Added `flush_region_writes_concurrently` function to handle concurrent flushing of region writes based on `should_dispatch_concurrently` logic in `pending_rows_batcher.rs`. - **Testing Enhancements**: Added tests for concurrent dispatching of region writes and the logic for determining concurrent dispatch in `pending_rows_batcher.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Add Histogram for Flush Stage Elapsed Time - **`metrics.rs`**: Introduced a new `HistogramVec` named `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` to track the elapsed time of pending rows batch flush stages. - **`pending_rows_batcher.rs`**: Replaced instances of `PENDING_ROWS_BATCH_INGEST_STAGE_ELAPSED` with `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` to measure the elapsed time for various flush stages, including `flush_write_region`, `flush_concat_table_batches`, `flush_resolve_table`, `flush_fetch_partition_rule`, `flush_split_record_batch`, `flush_filter_record_batch`, `flush_resolve_region_leader`, and `flush_encode_ipc`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * Add design doc for physical table batching in PendingRowsBatcher Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * Add implementation plan for physical table batching in PendingRowsBatcher * feat/auto-schema-align: ### Commit Message **Enhance Metric Engine with Physical Batch Processing** - **Add `metric-engine` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `metric-engine` as a workspace dependency. - **Expose Batch Modifier Functions**: Changed visibility of `TagColumnInfo`, `compute_tsid_array`, and `modify_batch_sparse` in `batch_modifier.rs` to public, and made `batch_modifier` a public module in `lib.rs`. - **Implement Physical Batch Processing**: - Added functions `bulk_insert_physical_region` and `bulk_insert_logical_region` in `bulk_insert.rs` to handle physical and logical batch insertions. - Updated `pending_rows_batcher.rs` to attempt physical batch processing before falling back to logical processing, including new functions `flush_batch_physical` and `flush_batch_per_logical_table`. - **Enhance Testing**: - Added tests for physical region passthrough and empty batch handling in `bulk_insert.rs`. - Introduced `with_mito_config` in `test_util.rs` for customized test environments. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Enhance Batch Processing for Table Creation and Alteration - **`prom_store.rs`**: - Added `create_tables_if_missing_batch` and `add_missing_prom_tag_columns_batch` methods to handle batch creation of tables and batch alteration to add missing tag columns. - Implemented logic to determine missing tables and columns, and perform batch operations accordingly. - **`pending_rows_batcher.rs`**: - Updated `PendingRowsBatcher` to utilize batch methods for creating tables an adding missing columns. - Enhanced logic to resolve table schemas and accommodate record batches after batch operations. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * perf: concurrent catalog lookups and eliminate redundant concat_batches on ingest path Replace sequential catalog_manager.table() calls with concurrent futures::future::join_all in align_table_batches_to_region_schema. This affects all three lookup loops: initial table resolution, post-create resolution, and post-alter schema refresh. Reduces O(N) sequential RPC latency to O(1) wall-clock time for requests with many distinct logical tables (e.g. Prometheus remote_write). Remove the per-logical-table concat_batches in flush_batch_physical. Instead of merging all chunks of a table into one RecordBatch before calling modify_batch_sparse, apply modify_batch_sparse directly to each chunk and collect all modified chunks for a single final concat. This eliminates one full data copy per logical table on the flush path. * refactor: extract Prometheus schema alignment helpers into prom_row_builder module Move six functions and their eight unit tests from pending_rows_batcher.rs (~2386 lines) into a new prom_row_builder.rs module (~776 lines), leaving the batcher at ~1665 lines focused on flush/worker machinery. Extracted functions: - accommodate_record_batch_for_target_schema (normalize incoming batch against existing table schema) - unzip_logical_region_schema (extract ts/field/tag columns) - build_prom_create_table_schema (build ColumnSchema vec for table creation) - align_record_batch_to_schema (reorder/fill/cast columns to target schema) - rows_to_record_batch (convert proto Rows to Arrow RecordBatch) - build_arrow_array (build Arrow arrays from proto values) Cleaned up 12 now-unused imports from pending_rows_batcher.rs. * feat/auto-schema-align: ### Enhance `PendingRowsBatcher` and `prom_row_builder` for Efficient Schema Handling - **`pending_rows_batcher.rs`:** - Refactored `submit` method to integrate table batch building and alignment into a single method `build_and_align_table_batches`. - Removed intermediate `RecordBatch` creation, optimizing the process by directly converting proto `RowInsertRequests` into aligned `RecordBatch`es. - Enhanced schema handling by identifying missing columns directly from proto schemas. - **`prom_row_builder.rs`:** - Introduced `rows_to_aligned_record_batch` for direct conversion of proto `Rows` into aligned `RecordBatch`es. - Added `identify_missing_columns_from_proto` to detect absent tag columns without intermediate `RecordBatch`. - Implemented `build_prom_create_table_schema_from_proto` to construct table schemas directly from proto schemas. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: Add elapsed time metrics for bulk insert operations - Updated `bulk_insert` method in `bulk_insert.rs` to record elapsed time metrics using `MITO_OPERATION_ELAPSED` for both physical and logical regions. - Added a new test `test_bulk_insert_records_elapsed_metric` to verify that the elapsed time metric is recorded correctly during bulk insert operations. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * remove flush per logical region Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: **Refactor `flush_batch` and `flush_batch_physical` functions** - Removed unused `catalog` and `schema` variables from `flush_batch` in `pending_rows_batcher.rs`. - Updated `flush_batch_physical` to directly use `ctx.current_catalog()` and `ctx.current_schema()` for resolving table names. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Remove Unused Function and Associated Test - **File:** `src/servers/src/prom_row_builder.rs` - Removed the unused function `build_prom_create_table_schema` which was responsible for building a `Vec<ColumnSchema>` from an Arrow schema. - Deleted the associated test `test_build_prom_create_table_schema_from_request_schema` that validated the removed function. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: - **Remove Test**: Deleted the `test_bulk_insert_records_elapsed_metric` test from `bulk_insert.rs`. - **Refactor Table Resolution**: Introduced `TableResolutionPlan` struct and refactored table resolution logic in `pending_rows_batcher.rs`. - **Enhance Table Handling**: Added functions for collecting non-empty table rows, unique table schemas, and handling table creation and alteration in `pending_rows_batcher.rs`. - **Add Tests**: Implemented tests for `collect_non_empty_table_rows` and `collect_unique_table_schemas` in `pending_rows_batcher.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: - **Refactor Error Handling**: Updated error handling in `pending_rows_batcher.rs` and `prom_row_builder.rs` to use `Snafu` error context for more descriptive error messages. - **Remove Unused Functionality**: Eliminated the `rows_to_record_batch` function and related test in `prom_row_builder.rs` as it was redundant. - **Simplify Function Return Types**: Modified `rows_to_aligned_record_batch` in `prom_row_builder.rs` to return only `RecordBatch` without missing columns, simplifying the function's interface and related tests. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Add Helper Function for Table Options in `prom_store.rs` - Introduced `fill_metric_physical_table_options` function to encapsulate logic for setting table options, ensuring the use of flat SST format and physical table metadata. - Updated `Instance` implementation to utilize the new helper function for setting table options. - Added a unit test `test_metric_physical_table_options_forces_flat_sst_format` to verify the correct application of table options. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: - **Refactor `PendingRowsBatcher`**: Simplified worker retrieval logic in `get_or_spawn_worker` method by using a more concise conditional check. - **Metrics Update**: Added `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED` metric in `pending_rows_batcher.rs`. - **Remove Unused Code**: Deleted multiple test functions related to record batch alignment and schema preparation in `pending_rows_batcher.rs` and `prom_row_builder.rs`. - **Function Visibility Change**: Made `build_prom_create_table_schema_from_proto` public in `prom_row_builder.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore: remove plan Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Refactor and Simplify Schema Alteration Logic - **Removed Unused Methods**: Deleted `create_table_if_missing` and `add_missing_prom_tag_columns` methods from `PendingRowsSchemaAlterer` trait in `prom_store.rs` and `pending_rows_batcher.rs`. - **Error Handling Improvement**: Enhanced error handling in `create_tables_if_missing_batch` method to return a specific error message for unsupported `AutoCreateTableType` in `prom_store.rs`. - **Visibility Change**: Made `as_str` method public in `AutoCreateTableType` enum in `insert.rs` to support external access. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Commit Message Improve safety in `prom_row_builder.rs` - Updated `unzip_logical_region_schema` to use `saturating_sub` for safer capacity calculation of `tag_columns`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: Add TODO comments for future improvements in `pending_rows_batcher.rs` - Added a TODO comment to consider bounding the `flush_region_writes_concurrently` function. - Added a TODO comment to potentially limit the maximum rows to concatenate in the `flush_batch_physical` function. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Commit Message Enhance error handling in `pending_rows_batcher.rs` - Updated `collect_unique_table_schemas` to return a `Result` type, enabling error handling for duplicate table names. - Modified the function to return an error when duplicate table names are found in `table_rows`. - Adjusted test cases to handle the new `Result` return type in `collect_unique_table_schemas`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: - **Refactor `partition_columns` Method**: Updated the `partition_columns` method in `multi_dim.rs`, `partition.rs`, and `splitter.rs` to return a slice reference instead of a cloned vector, improving performance by avoiding unnecessary cloning. - **Enhance Partition Handling**: Added functions `collect_tag_columns_and_non_tag_indices` and `strip_partition_columns_from_batch` in `pending_rows_batcher.rs` to manage partition columns more efficiently, including stripping partition columns from record batches. - **Update Tests**: Modified existing tests and added new ones in `pending_rows_batcher.rs` to verify the functionality of partition column handling, ensuring correct behavior of the new methods. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Enhance Schema Handling and Validation in `pending_rows_batcher.rs` - **Schema Validation Enhancements**: - Added checks for essential columns (`timestamp`, `value`) in `collect_tag_columns_and_non_tag_indices`. - Introduced `PHYSICAL_REGION_ESSENTIAL_COLUMN_COUNT` to ensure minimum column count in `strip_partition_columns_from_batch`. - Improved error handling for unexpected data types and duplicated columns. - **Function Modifications**: - Updated `strip_partition_columns_from_batch` to project essential columns without lookup. - Modified `flush_batch_physical` to use `essential_col_indices` instead of `non_tag_indices`. - **Test Enhancements**: - Added tests for schema validation, including checks for unexpected data types and duplicated columns. - Verified correct projection of essential columns in `strip_partition_columns_from_batch`. Files affected: `pending_rows_batcher.rs`, `tests`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: - **Add `smallvec` Dependency**: Updated `Cargo.lock` and `Cargo.toml` to include `smallvec` as a workspace dependency. - **Refactor Function**: Renamed `collect_tag_columns_and_non_tag_indices` to `columns_taxonomy` in `pending_rows_batcher.rs` and updated its return type to use `SmallVec`. - **Update Tests**: Modified test cases in `pending_rows_batcher.rs` to reflect changes in function name and return type. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: **Refactor `pending_rows_batcher.rs` to Simplify Table ID Handling** - Updated `TableBatch` struct to use `TableId` directly instead of `Option<u32>` for `table_id`. - Simplified logic in `flush_batch_physical` by removing the check for `None` in `table_id`. - Adjusted related logic in `start_worker` to accommodate the change in `table_id` handling. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Enhance Batch Processing Logic - **`pending_rows_batcher.rs`**: - Moved column taxonomy resolution inside the loop to handle schema variations across batches. - Added checks to skip processing if both tag columns and essential column indices are empty. - **Tests**: - Added `test_modify_batch_sparse_with_taxonomy_per_batch` to verify batch modification logic with varying schemas. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Remove Primary Key Column Check in `pending_rows_batcher.rs` - Removed the check for the primary key column and other essential column names in the function `strip_partition_columns_from_batch` within `pending_rows_batcher.rs`. - Simplified the logic by eliminating the validation of column order against expected essential names. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Refactor error handling and iteration in `otlp.rs` and `pending_rows_batcher.rs` - **`otlp.rs`**: Simplified error handling by removing `CatalogSnafu` context when awaiting table retrieval. - **`pending_rows_batcher.rs`**: Streamlined iteration over tables by removing unnecessary `into_iter()` calls, improving code readability and efficiency. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/metrics-for-bulk: Add timing metrics for batch processing in `pending_rows_batcher.rs` - Introduced `modify_elapsed` and `columns_taxonomy_elapsed` to measure time spent in `modify_batch_sparse` and `columns_taxonomy` functions. - Updated `flush_batch_physical` to record these metrics using `PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Commit Summary - **Remove Unused Code**: Eliminated the `#[allow(dead_code)]` attribute from the `compute_tsid_array` function in `batch_modifier.rs`. - **Error Handling Improvement**: Enhanced error handling in `flush_batch_physical` function by adjusting the `match` block in `pending_rows_batcher.rs`. - **Simplify Logic**: Streamlined the logic in `rows_to_aligned_record_batch` by removing unnecessary type casting in `prom_row_builder.rs`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: **Refactor `flush_batch_physical` in `pending_rows_batcher.rs`:** - Moved partition column stripping logic to a single location before processing region batches. - Updated the use of `combined_batch` to `stripped_batch` for consistency in batch processing. - Removed redundant partition column stripping logic within the region batch loop. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * feat/auto-schema-align: ### Update `batch_modifier.rs` Documentation and Parameter Naming - Enhanced documentation for `compute_tsid_array` and `modify_batch_sparse` functions to clarify their logic and parameters. - Renamed parameter `non_tag_column_indices` to `extra_column_indices` in `modify_batch_sparse` for better clarity. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -12057,6 +12057,7 @@ dependencies = [
|
||||
"local-ip-address",
|
||||
"log-query",
|
||||
"loki-proto",
|
||||
"metric-engine",
|
||||
"mime_guess",
|
||||
"mysql_async",
|
||||
"notify",
|
||||
@@ -12093,6 +12094,7 @@ dependencies = [
|
||||
"session",
|
||||
"simd-json",
|
||||
"simdutf8",
|
||||
"smallvec",
|
||||
"snafu 0.8.6",
|
||||
"snap",
|
||||
"socket2 0.5.10",
|
||||
|
||||
@@ -33,7 +33,7 @@ use datafusion::sql::TableReference;
|
||||
use datafusion_expr::{DmlStatement, LogicalPlan, lit};
|
||||
use datatypes::arrow::array::{Array, AsArray};
|
||||
use servers::error::{
|
||||
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, ExecuteQuerySnafu, NotSupportedSnafu,
|
||||
CollectRecordbatchSnafu, DataFusionSnafu, ExecuteQuerySnafu, NotSupportedSnafu,
|
||||
TableNotFoundSnafu,
|
||||
};
|
||||
use servers::query_handler::DashboardDefinition;
|
||||
@@ -139,8 +139,7 @@ impl Instance {
|
||||
DASHBOARD_TABLE_NAME,
|
||||
Some(&ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.await?
|
||||
{
|
||||
return Ok(table);
|
||||
}
|
||||
@@ -178,8 +177,7 @@ impl Instance {
|
||||
DASHBOARD_TABLE_NAME,
|
||||
Some(&ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.await?
|
||||
.context(TableNotFoundSnafu {
|
||||
catalog: catalog.to_string(),
|
||||
schema: DEFAULT_PRIVATE_SCHEMA_NAME.to_string(),
|
||||
@@ -255,8 +253,7 @@ impl Instance {
|
||||
DASHBOARD_TABLE_NAME,
|
||||
Some(&query_ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.await?
|
||||
{
|
||||
table
|
||||
} else {
|
||||
|
||||
@@ -21,9 +21,7 @@ use client::Output;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_time::Timestamp;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use servers::error::{
|
||||
AuthSnafu, CatalogSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu,
|
||||
};
|
||||
use servers::error::{AuthSnafu, Error, TimestampOverflowSnafu, UnexpectedResultSnafu};
|
||||
use servers::influxdb::InfluxdbRequest;
|
||||
use servers::interceptor::{LineProtocolInterceptor, LineProtocolInterceptorRef};
|
||||
use servers::query_handler::InfluxdbLineProtocolHandler;
|
||||
@@ -92,8 +90,7 @@ impl InfluxdbLineTimestampAligner<'_> {
|
||||
&insert.table_name,
|
||||
Some(query_context),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.await?
|
||||
.map(|x| x.schema())
|
||||
.and_then(|schema| {
|
||||
schema.timestamp_column().map(|col| {
|
||||
|
||||
@@ -38,8 +38,7 @@ use datafusion_expr::{Expr, ExprFunctionExt, SortExpr, col, lit, lit_timestamp_n
|
||||
use query::QueryEngineRef;
|
||||
use serde_json::Value as JsonValue;
|
||||
use servers::error::{
|
||||
CatalogSnafu, CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult,
|
||||
TableNotFoundSnafu,
|
||||
CollectRecordbatchSnafu, DataFusionSnafu, Result as ServerResult, TableNotFoundSnafu,
|
||||
};
|
||||
use servers::http::jaeger::{JAEGER_QUERY_TABLE_NAME_KEY, QueryTraceParams, TraceUserAgent};
|
||||
use servers::otlp::trace::{
|
||||
@@ -336,8 +335,7 @@ async fn query_trace_table(
|
||||
table_name,
|
||||
Some(&ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.await?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table: table_name,
|
||||
catalog: ctx.current_catalog(),
|
||||
@@ -425,8 +423,7 @@ async fn get_table(
|
||||
table_name,
|
||||
Some(&ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.await?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
table: table_name,
|
||||
catalog: ctx.current_catalog(),
|
||||
|
||||
@@ -26,7 +26,7 @@ use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use otel_arrow_rust::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
use pipeline::{GreptimePipelineParams, PipelineWay};
|
||||
use servers::error::{self, AuthSnafu, CatalogSnafu, Result as ServerResult};
|
||||
use servers::error::{self, AuthSnafu, Result as ServerResult};
|
||||
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
|
||||
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
|
||||
use servers::otlp;
|
||||
@@ -255,8 +255,7 @@ impl Instance {
|
||||
let table = self
|
||||
.catalog_manager
|
||||
.table(catalog, &schema, &req.table_name, None)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
.await?;
|
||||
|
||||
let Some(rows) = req.rows.as_mut() else {
|
||||
continue;
|
||||
|
||||
@@ -17,7 +17,11 @@ use std::sync::Arc;
|
||||
|
||||
use api::prom_store::remote::read_request::ResponseType;
|
||||
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse};
|
||||
use api::v1::RowInsertRequests;
|
||||
use api::v1::alter_table_expr::Kind;
|
||||
use api::v1::{
|
||||
AddColumn, AddColumns, AlterTableExpr, ColumnDataType, ColumnDef, CreateTableExpr,
|
||||
RowInsertRequests, SemanticType,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use client::OutputData;
|
||||
@@ -27,19 +31,25 @@ use common_query::Output;
|
||||
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
|
||||
use common_recordbatch::RecordBatches;
|
||||
use common_telemetry::{debug, tracing};
|
||||
use operator::insert::InserterRef;
|
||||
use operator::insert::{
|
||||
AutoCreateTableType, InserterRef, build_create_table_expr, fill_table_options_for_create,
|
||||
};
|
||||
use operator::statement::StatementExecutor;
|
||||
use prost::Message;
|
||||
use servers::error::{self, AuthSnafu, Result as ServerResult};
|
||||
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF, collect_plan_metrics};
|
||||
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
|
||||
use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
|
||||
use servers::pending_rows_batcher::PendingRowsSchemaAlterer;
|
||||
use servers::prom_store::{self, Metrics};
|
||||
use servers::query_handler::{
|
||||
PromStoreProtocolHandler, PromStoreProtocolHandlerRef, PromStoreResponse,
|
||||
};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
|
||||
use store_api::mito_engine_options::SST_FORMAT_KEY;
|
||||
use table::table_reference::TableReference;
|
||||
use tracing::instrument;
|
||||
|
||||
use crate::error::{
|
||||
@@ -50,6 +60,34 @@ use crate::instance::Instance;
|
||||
|
||||
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
|
||||
|
||||
fn auto_create_table_type_for_prom_remote_write(
|
||||
ctx: &QueryContextRef,
|
||||
with_metric_engine: bool,
|
||||
) -> AutoCreateTableType {
|
||||
if with_metric_engine {
|
||||
let physical_table = ctx
|
||||
.extension(PHYSICAL_TABLE_PARAM)
|
||||
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
|
||||
.to_string();
|
||||
AutoCreateTableType::Logical(physical_table)
|
||||
} else {
|
||||
AutoCreateTableType::Physical
|
||||
}
|
||||
}
|
||||
|
||||
fn required_physical_table_for_create_type(create_type: &AutoCreateTableType) -> Option<&str> {
|
||||
match create_type {
|
||||
AutoCreateTableType::Logical(physical_table) => Some(physical_table.as_str()),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn fill_metric_physical_table_options(table_options: &mut HashMap<String, String>) {
|
||||
// We always enforce flat format in this ingestion path.
|
||||
table_options.insert(SST_FORMAT_KEY.to_string(), "flat".to_string());
|
||||
table_options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "true".to_string());
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn is_supported(response_type: i32) -> bool {
|
||||
// Only supports samples response right now
|
||||
@@ -159,6 +197,157 @@ impl Instance {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PendingRowsSchemaAlterer for Instance {
|
||||
async fn create_tables_if_missing_batch(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
tables: &[(&str, &[api::v1::ColumnSchema])],
|
||||
with_metric_engine: bool,
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<()> {
|
||||
if tables.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let create_type = auto_create_table_type_for_prom_remote_write(&ctx, with_metric_engine);
|
||||
if let Some(physical_table) = required_physical_table_for_create_type(&create_type) {
|
||||
self.create_metric_physical_table_if_missing(
|
||||
catalog,
|
||||
schema,
|
||||
physical_table,
|
||||
ctx.clone(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let engine = if matches!(create_type, AutoCreateTableType::Logical(_)) {
|
||||
METRIC_ENGINE_NAME
|
||||
} else {
|
||||
common_catalog::consts::default_engine()
|
||||
};
|
||||
|
||||
// Check which tables actually still need to be created (may have been
|
||||
// concurrently created by another request).
|
||||
let mut create_exprs: Vec<CreateTableExpr> = Vec::with_capacity(tables.len());
|
||||
for &(table_name, request_schema) in tables {
|
||||
let existing = self
|
||||
.catalog_manager()
|
||||
.table(catalog, schema, table_name, Some(ctx.as_ref()))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
if existing.is_some() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let table_ref = TableReference::full(catalog, schema, table_name);
|
||||
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema, engine)
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
let mut table_options = std::collections::HashMap::with_capacity(4);
|
||||
fill_table_options_for_create(&mut table_options, &create_type, &ctx);
|
||||
create_table_expr.table_options.extend(table_options);
|
||||
create_exprs.push(create_table_expr);
|
||||
}
|
||||
|
||||
if create_exprs.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match create_type {
|
||||
AutoCreateTableType::Logical(_) => {
|
||||
// Use the batch API for logical tables.
|
||||
self.statement_executor
|
||||
.create_logical_tables(&create_exprs, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
}
|
||||
AutoCreateTableType::Physical => {
|
||||
// Physical tables don't have a batch DDL path; create one at a time.
|
||||
for mut expr in create_exprs {
|
||||
expr.table_options
|
||||
.insert(SST_FORMAT_KEY.to_string(), "flat".to_string());
|
||||
self.statement_executor
|
||||
.create_table_inner(&mut expr, None, ctx.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
}
|
||||
}
|
||||
create_type => {
|
||||
return error::InvalidPromRemoteRequestSnafu {
|
||||
msg: format!(
|
||||
"prom remote write only supports logical or physical auto-create: {}",
|
||||
create_type.as_str()
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn add_missing_prom_tag_columns_batch(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
tables: &[(&str, &[String])],
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<()> {
|
||||
if tables.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let alter_exprs: Vec<AlterTableExpr> = tables
|
||||
.iter()
|
||||
.filter(|(_, columns)| !columns.is_empty())
|
||||
.map(|&(table_name, columns)| {
|
||||
let add_columns = AddColumns {
|
||||
add_columns: columns
|
||||
.iter()
|
||||
.map(|column_name| AddColumn {
|
||||
column_def: Some(ColumnDef {
|
||||
name: column_name.clone(),
|
||||
data_type: ColumnDataType::String as i32,
|
||||
is_nullable: true,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
comment: String::new(),
|
||||
..Default::default()
|
||||
}),
|
||||
location: None,
|
||||
add_if_not_exists: true,
|
||||
})
|
||||
.collect(),
|
||||
};
|
||||
|
||||
AlterTableExpr {
|
||||
catalog_name: catalog.to_string(),
|
||||
schema_name: schema.to_string(),
|
||||
table_name: table_name.to_string(),
|
||||
kind: Some(Kind::AddColumns(add_columns)),
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
if alter_exprs.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.statement_executor
|
||||
.alter_logical_tables(alter_exprs, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl PromStoreProtocolHandler for Instance {
|
||||
async fn pre_write(
|
||||
@@ -267,6 +456,61 @@ impl PromStoreProtocolHandler for Instance {
|
||||
}
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
async fn create_metric_physical_table_if_missing(
|
||||
&self,
|
||||
catalog: &str,
|
||||
schema: &str,
|
||||
physical_table: &str,
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<()> {
|
||||
let table = self
|
||||
.catalog_manager()
|
||||
.table(catalog, schema, physical_table, Some(ctx.as_ref()))
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
if table.is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let table_ref = TableReference::full(catalog, schema, physical_table);
|
||||
let default_schema = vec![
|
||||
api::v1::ColumnSchema {
|
||||
column_name: common_query::prelude::greptime_timestamp().to_string(),
|
||||
datatype: api::v1::ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: api::v1::SemanticType::Timestamp as i32,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
api::v1::ColumnSchema {
|
||||
column_name: common_query::prelude::greptime_value().to_string(),
|
||||
datatype: api::v1::ColumnDataType::Float64 as i32,
|
||||
semantic_type: api::v1::SemanticType::Field as i32,
|
||||
datatype_extension: None,
|
||||
options: None,
|
||||
},
|
||||
];
|
||||
let mut create_table_expr = build_create_table_expr(
|
||||
&table_ref,
|
||||
&default_schema,
|
||||
common_catalog::consts::default_engine(),
|
||||
)
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
|
||||
fill_metric_physical_table_options(&mut create_table_expr.table_options);
|
||||
|
||||
self.statement_executor
|
||||
.create_table_inner(&mut create_table_expr, None, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// This handler is mainly used for `frontend` or `standalone` to directly import
|
||||
/// the metrics collected by itself, thereby avoiding importing metrics through the network,
|
||||
/// thus reducing compression and network transmission overhead,
|
||||
@@ -320,3 +564,72 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::Arc;
|
||||
|
||||
use session::context::QueryContext;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_auto_create_table_type_for_prom_remote_write_metric_engine() {
|
||||
let mut query_ctx = QueryContext::with(
|
||||
common_catalog::consts::DEFAULT_CATALOG_NAME,
|
||||
common_catalog::consts::DEFAULT_SCHEMA_NAME,
|
||||
);
|
||||
query_ctx.set_extension(PHYSICAL_TABLE_PARAM, "metric_physical".to_string());
|
||||
let ctx = Arc::new(query_ctx);
|
||||
|
||||
let create_type = auto_create_table_type_for_prom_remote_write(&ctx, true);
|
||||
match create_type {
|
||||
AutoCreateTableType::Logical(physical) => assert_eq!(physical, "metric_physical"),
|
||||
_ => panic!("expected logical table create type"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_auto_create_table_type_for_prom_remote_write_without_metric_engine() {
|
||||
let ctx = Arc::new(QueryContext::with(
|
||||
common_catalog::consts::DEFAULT_CATALOG_NAME,
|
||||
common_catalog::consts::DEFAULT_SCHEMA_NAME,
|
||||
));
|
||||
|
||||
let create_type = auto_create_table_type_for_prom_remote_write(&ctx, false);
|
||||
match create_type {
|
||||
AutoCreateTableType::Physical => {}
|
||||
_ => panic!("expected physical table create type"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_required_physical_table_for_create_type() {
|
||||
let logical = AutoCreateTableType::Logical("phy_table".to_string());
|
||||
assert_eq!(
|
||||
Some("phy_table"),
|
||||
required_physical_table_for_create_type(&logical)
|
||||
);
|
||||
|
||||
let physical = AutoCreateTableType::Physical;
|
||||
assert_eq!(None, required_physical_table_for_create_type(&physical));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_metric_physical_table_options_forces_flat_sst_format() {
|
||||
let mut table_options = HashMap::new();
|
||||
|
||||
fill_metric_physical_table_options(&mut table_options);
|
||||
|
||||
assert_eq!(
|
||||
Some("flat"),
|
||||
table_options.get(SST_FORMAT_KEY).map(String::as_str)
|
||||
);
|
||||
assert_eq!(
|
||||
Some("true"),
|
||||
table_options
|
||||
.get(PHYSICAL_TABLE_METADATA_KEY)
|
||||
.map(String::as_str)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -130,6 +130,8 @@ where
|
||||
self.instance.partition_manager().clone(),
|
||||
self.instance.node_manager().clone(),
|
||||
self.instance.catalog_manager().clone(),
|
||||
opts.prom_store.with_metric_engine,
|
||||
self.instance.clone(),
|
||||
opts.prom_store.pending_rows_flush_interval,
|
||||
opts.prom_store.max_batch_rows,
|
||||
opts.prom_store.max_concurrent_flushes,
|
||||
|
||||
@@ -28,7 +28,7 @@ use crate::error::{EncodePrimaryKeySnafu, Result, UnexpectedRequestSnafu};
|
||||
|
||||
/// Info about a tag column for TSID computation and sparse primary key encoding.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) struct TagColumnInfo {
|
||||
pub struct TagColumnInfo {
|
||||
/// Column name (used for label-name hash).
|
||||
pub name: String,
|
||||
/// Column index in the RecordBatch.
|
||||
@@ -37,9 +37,16 @@ pub(crate) struct TagColumnInfo {
|
||||
pub column_id: ColumnId,
|
||||
}
|
||||
|
||||
/// Computes `__tsid` values for each row.
|
||||
#[allow(dead_code)]
|
||||
pub(crate) fn compute_tsid_array(
|
||||
/// Computes the TSID for each row in a [RecordBatch].
|
||||
///
|
||||
/// The TSID is a stable hash of the set of labels (tags) present in each row.
|
||||
/// It accounts for both the names and values of all non-null tag columns.
|
||||
///
|
||||
/// # Logic
|
||||
/// - If a row has no nulls across all `sorted_tag_columns`, it uses a precomputed hash of all label names.
|
||||
/// - If a row has nulls, it dynamically computes a hash of the names of labels that are present (non-null).
|
||||
/// - In both cases, it then hashes the values of those present labels in the order specified by `sorted_tag_columns`.
|
||||
pub fn compute_tsid_array(
|
||||
batch: &RecordBatch,
|
||||
sorted_tag_columns: &[TagColumnInfo],
|
||||
tag_arrays: &[&StringArray],
|
||||
@@ -110,12 +117,30 @@ fn build_tag_arrays<'a>(
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Modifies a RecordBatch for sparse primary key encoding.
|
||||
pub(crate) fn modify_batch_sparse(
|
||||
/// Modifies a [RecordBatch] to include a sparse primary key column.
|
||||
///
|
||||
/// This function transforms the input `batch` into a new `RecordBatch` where the first column
|
||||
/// is the generated primary key (named [PRIMARY_KEY_COLUMN_NAME]), followed by columns
|
||||
/// indicated by `extra_column_indices`.
|
||||
///
|
||||
/// The primary key uses a "sparse" encoding, which compactly represents the row's identity
|
||||
/// by only including non-null tag values. The encoding, handled by [SparsePrimaryKeyCodec],
|
||||
/// consists of:
|
||||
/// 1. The `table_id`.
|
||||
/// 2. A `tsid` (Time Series ID), which is a hash of the present tags.
|
||||
/// 3. The actual non-null tag values paired with their `column_id`.
|
||||
///
|
||||
/// # Parameters
|
||||
/// - `batch`: The source [RecordBatch].
|
||||
/// - `table_id`: The ID of the table.
|
||||
/// - `sorted_tag_columns`: Metadata for tag columns, used for both TSID computation and PK encoding.
|
||||
/// - `extra_column_indices`: Indices of columns from the original batch to keep in the output
|
||||
/// (typically the timestamp and value fields).
|
||||
pub fn modify_batch_sparse(
|
||||
batch: RecordBatch,
|
||||
table_id: u32,
|
||||
sorted_tag_columns: &[TagColumnInfo],
|
||||
non_tag_column_indices: &[usize],
|
||||
extra_column_indices: &[usize],
|
||||
) -> Result<RecordBatch> {
|
||||
let num_rows = batch.num_rows();
|
||||
let codec = SparsePrimaryKeyCodec::schemaless();
|
||||
@@ -151,7 +176,7 @@ pub(crate) fn modify_batch_sparse(
|
||||
))];
|
||||
let mut columns: Vec<Arc<dyn Array>> = vec![Arc::new(pk_array)];
|
||||
|
||||
for &idx in non_tag_column_indices {
|
||||
for &idx in extra_column_indices {
|
||||
fields.push(batch.schema().fields()[idx].clone());
|
||||
columns.push(batch.column(idx).clone());
|
||||
}
|
||||
|
||||
@@ -34,18 +34,20 @@ use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
|
||||
use crate::engine::MetricEngineInner;
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::metrics::MITO_OPERATION_ELAPSED;
|
||||
|
||||
impl MetricEngineInner {
|
||||
/// Bulk-inserts logical rows into a metric region.
|
||||
/// Bulk-inserts rows into a metric region.
|
||||
///
|
||||
/// This method accepts a `RegionBulkInsertsRequest` whose payload is a logical
|
||||
/// `RecordBatch` (timestamp, value and tag columns) for the given logical `region_id`.
|
||||
/// **Logical region path:** The request payload is a logical `RecordBatch`
|
||||
/// (timestamp, value and tag columns). It is transformed to physical format
|
||||
/// via `modify_batch_sparse`, encoded to Arrow IPC, and forwarded as a
|
||||
/// `BulkInserts` request to the data region. If mito reports
|
||||
/// `StatusCode::Unsupported`, the request is transparently retried as a `Put`.
|
||||
///
|
||||
/// The transformed batch is encoded to Arrow IPC and forwarded as a `BulkInserts`
|
||||
/// request to the data region, along with the original `partition_expr_version`.
|
||||
/// If the data region reports `StatusCode::Unsupported` for bulk inserts, the request
|
||||
/// is transparently retried as a `Put` by converting the original logical batch into
|
||||
/// `api::v1::Rows`, so callers observe the same semantics as `put_region`.
|
||||
/// **Physical region path:** The request payload is already in physical format
|
||||
/// (produced by the batcher's `flush_batch_physical`). It is forwarded directly
|
||||
/// to the data region with no transformation.
|
||||
///
|
||||
/// Returns the number of affected rows, or `0` if the input batch is empty.
|
||||
pub async fn bulk_insert_region(
|
||||
@@ -53,13 +55,42 @@ impl MetricEngineInner {
|
||||
region_id: RegionId,
|
||||
request: RegionBulkInsertsRequest,
|
||||
) -> Result<AffectedRows> {
|
||||
ensure!(
|
||||
!self.is_physical_region(region_id),
|
||||
error::UnsupportedRegionRequestSnafu {
|
||||
request: RegionRequest::BulkInserts(request),
|
||||
}
|
||||
);
|
||||
if request.payload.num_rows() == 0 {
|
||||
return Ok(0);
|
||||
}
|
||||
if self.is_physical_region(region_id) {
|
||||
let _timer = MITO_OPERATION_ELAPSED
|
||||
.with_label_values(&["bulk_insert_physical"])
|
||||
.start_timer();
|
||||
return self.bulk_insert_physical_region(region_id, request).await;
|
||||
}
|
||||
|
||||
let _timer = MITO_OPERATION_ELAPSED
|
||||
.with_label_values(&["bulk_insert_logical"])
|
||||
.start_timer();
|
||||
self.bulk_insert_logical_region(region_id, request).await
|
||||
}
|
||||
|
||||
/// Passthrough for bulk inserts targeting a physical data region.
|
||||
///
|
||||
/// The batch is already in physical format (with `__primary_key`, timestamp,
|
||||
/// value columns), so no logical-to-physical transformation is needed.
|
||||
async fn bulk_insert_physical_region(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
request: RegionBulkInsertsRequest,
|
||||
) -> Result<AffectedRows> {
|
||||
self.data_region
|
||||
.write_data(region_id, RegionRequest::BulkInserts(request))
|
||||
.await
|
||||
}
|
||||
|
||||
/// Bulk-inserts logical rows, transforming them to physical format first.
|
||||
async fn bulk_insert_logical_region(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
request: RegionBulkInsertsRequest,
|
||||
) -> Result<AffectedRows> {
|
||||
let (physical_region_id, data_region_id, primary_key_encoding) =
|
||||
self.find_data_region_meta(region_id)?;
|
||||
|
||||
@@ -390,6 +421,7 @@ mod tests {
|
||||
use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
|
||||
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use mito2::config::MitoConfig;
|
||||
use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
|
||||
use store_api::path_utils::table_dir;
|
||||
use store_api::region_engine::RegionEngine;
|
||||
@@ -397,6 +429,7 @@ mod tests {
|
||||
use store_api::storage::{RegionId, ScanRequest};
|
||||
|
||||
use super::record_batch_to_ipc;
|
||||
use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
|
||||
use crate::error::Error;
|
||||
use crate::test_util::{self, TestEnv};
|
||||
|
||||
@@ -492,23 +525,81 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_physical_region_rejected() {
|
||||
let env = TestEnv::new().await;
|
||||
async fn test_bulk_insert_physical_region_passthrough() {
|
||||
// Use flat format so that BulkMemtable is used (supports write_bulk).
|
||||
let mito_config = MitoConfig {
|
||||
default_experimental_flat_format: true,
|
||||
..Default::default()
|
||||
};
|
||||
let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
|
||||
env.init_metric_region().await;
|
||||
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
let batch = build_logical_batch(0, 2);
|
||||
let request = build_bulk_request(physical_region_id, batch);
|
||||
let logical_region_id = env.default_logical_region_id();
|
||||
|
||||
let err = env
|
||||
// First, do a normal logical bulk insert so we can compare results.
|
||||
let logical_batch = build_logical_batch(0, 3);
|
||||
let logical_request = build_bulk_request(logical_region_id, logical_batch.clone());
|
||||
let response = env
|
||||
.metric()
|
||||
.handle_request(logical_region_id, logical_request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.affected_rows, 3);
|
||||
|
||||
// Now build a physical-format batch using modify_batch_sparse (simulating
|
||||
// what the batcher's flush_batch_physical does) and send it directly to
|
||||
// the physical region.
|
||||
let tag_columns = vec![TagColumnInfo {
|
||||
name: "job".to_string(),
|
||||
index: 2,
|
||||
column_id: 2, // column_id for "job" in the physical table
|
||||
}];
|
||||
let non_tag_indices = vec![0, 1]; // timestamp, value
|
||||
let second_batch = build_logical_batch(3, 3);
|
||||
let physical_batch = modify_batch_sparse(
|
||||
second_batch,
|
||||
logical_region_id.table_id(),
|
||||
&tag_columns,
|
||||
&non_tag_indices,
|
||||
)
|
||||
.unwrap();
|
||||
let request = build_bulk_request(physical_region_id, physical_batch);
|
||||
let response = env
|
||||
.metric()
|
||||
.handle_request(physical_region_id, request)
|
||||
.await
|
||||
.unwrap_err();
|
||||
let Some(err) = err.as_any().downcast_ref::<Error>() else {
|
||||
panic!("unexpected error type");
|
||||
.unwrap();
|
||||
assert_eq!(response.affected_rows, 3);
|
||||
|
||||
// Verify all 6 rows are readable from the logical region.
|
||||
let stream = env
|
||||
.metric()
|
||||
.scan_to_stream(logical_region_id, ScanRequest::default())
|
||||
.await
|
||||
.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 6);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_bulk_insert_physical_region_empty_batch() {
|
||||
// Use flat format so that BulkMemtable is used (supports write_bulk).
|
||||
let mito_config = MitoConfig {
|
||||
default_experimental_flat_format: true,
|
||||
..Default::default()
|
||||
};
|
||||
assert_matches!(err, Error::UnsupportedRegionRequest { .. });
|
||||
let env = TestEnv::with_mito_config("", mito_config, Default::default()).await;
|
||||
env.init_metric_region().await;
|
||||
let physical_region_id = env.default_physical_region_id();
|
||||
|
||||
let batch = build_logical_batch(0, 0);
|
||||
let request = build_bulk_request(physical_region_id, batch);
|
||||
let response = env
|
||||
.metric()
|
||||
.handle_request(physical_region_id, request)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(response.affected_rows, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
||||
@@ -52,7 +52,7 @@
|
||||
|
||||
#![recursion_limit = "256"]
|
||||
|
||||
mod batch_modifier;
|
||||
pub mod batch_modifier;
|
||||
pub mod config;
|
||||
mod data_region;
|
||||
pub mod engine;
|
||||
|
||||
@@ -76,6 +76,22 @@ impl TestEnv {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a new env with specific `prefix`, `mito_config`, and `config` for test.
|
||||
pub async fn with_mito_config(
|
||||
prefix: &str,
|
||||
mito_config: MitoConfig,
|
||||
config: EngineConfig,
|
||||
) -> Self {
|
||||
let mut mito_env = MitoTestEnv::with_prefix(prefix).await;
|
||||
let mito = mito_env.create_engine(mito_config).await;
|
||||
let metric = MetricEngine::try_new(mito.clone(), config).unwrap();
|
||||
Self {
|
||||
mito_env,
|
||||
mito,
|
||||
metric,
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a new env with specific `prefix` and `mito_env` for test.
|
||||
pub async fn with_mito_env(mut mito_env: MitoTestEnv) -> Self {
|
||||
let mito = mito_env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
@@ -103,7 +103,7 @@ pub enum AutoCreateTableType {
|
||||
}
|
||||
|
||||
impl AutoCreateTableType {
|
||||
fn as_str(&self) -> &'static str {
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
AutoCreateTableType::Logical(_) => "logical",
|
||||
AutoCreateTableType::Physical => "physical",
|
||||
|
||||
@@ -338,8 +338,8 @@ impl PartitionRule for MultiDimPartitionRule {
|
||||
self
|
||||
}
|
||||
|
||||
fn partition_columns(&self) -> Vec<String> {
|
||||
self.partition_columns.clone()
|
||||
fn partition_columns(&self) -> &[String] {
|
||||
&self.partition_columns
|
||||
}
|
||||
|
||||
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
|
||||
|
||||
@@ -29,7 +29,7 @@ pub type PartitionRuleRef = Arc<dyn PartitionRule>;
|
||||
pub trait PartitionRule: Sync + Send {
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
|
||||
fn partition_columns(&self) -> Vec<String>;
|
||||
fn partition_columns(&self) -> &[String];
|
||||
|
||||
/// Finds the target region by the partition values.
|
||||
///
|
||||
|
||||
@@ -66,7 +66,7 @@ impl<'a> SplitReadRowHelper<'a> {
|
||||
.collect::<HashMap<_, _>>();
|
||||
let partition_cols = partition_rule.partition_columns();
|
||||
let partition_cols_indexes = partition_cols
|
||||
.into_iter()
|
||||
.iter()
|
||||
.map(|col_name| col_name_to_idx.get(&col_name).cloned())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -176,15 +176,25 @@ mod tests {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct MockPartitionRule;
|
||||
struct MockPartitionRule {
|
||||
partition_columns: Vec<String>,
|
||||
}
|
||||
|
||||
impl Default for MockPartitionRule {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
partition_columns: vec!["id".to_string()],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartitionRule for MockPartitionRule {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn partition_columns(&self) -> Vec<String> {
|
||||
vec!["id".to_string()]
|
||||
fn partition_columns(&self) -> &[String] {
|
||||
&self.partition_columns
|
||||
}
|
||||
|
||||
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
|
||||
@@ -206,15 +216,25 @@ mod tests {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct MockMissedColPartitionRule;
|
||||
struct MockMissedColPartitionRule {
|
||||
partition_columns: Vec<String>,
|
||||
}
|
||||
|
||||
impl Default for MockMissedColPartitionRule {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
partition_columns: vec!["missed_col".to_string()],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl PartitionRule for MockMissedColPartitionRule {
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn partition_columns(&self) -> Vec<String> {
|
||||
vec!["missed_col".to_string()]
|
||||
fn partition_columns(&self) -> &[String] {
|
||||
&self.partition_columns
|
||||
}
|
||||
|
||||
fn find_region(&self, values: &[Value]) -> Result<RegionNumber> {
|
||||
@@ -243,8 +263,8 @@ mod tests {
|
||||
self
|
||||
}
|
||||
|
||||
fn partition_columns(&self) -> Vec<String> {
|
||||
vec![]
|
||||
fn partition_columns(&self) -> &[String] {
|
||||
&[]
|
||||
}
|
||||
|
||||
fn find_region(&self, _values: &[Value]) -> Result<RegionNumber> {
|
||||
@@ -261,7 +281,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_writer_splitter() {
|
||||
let rows = mock_rows();
|
||||
let rule = Arc::new(MockPartitionRule) as PartitionRuleRef;
|
||||
let rule = Arc::new(MockPartitionRule::default()) as PartitionRuleRef;
|
||||
let splitter = RowSplitter::new(rule);
|
||||
|
||||
let mut splits = splitter.split(rows).unwrap();
|
||||
@@ -276,7 +296,7 @@ mod tests {
|
||||
#[test]
|
||||
fn test_missed_col_writer_splitter() {
|
||||
let rows = mock_rows();
|
||||
let rule = Arc::new(MockMissedColPartitionRule) as PartitionRuleRef;
|
||||
let rule = Arc::new(MockMissedColPartitionRule::default()) as PartitionRuleRef;
|
||||
|
||||
let splitter = RowSplitter::new(rule);
|
||||
let mut splits = splitter.split(rows).unwrap();
|
||||
|
||||
@@ -78,6 +78,7 @@ jsonb.workspace = true
|
||||
lazy_static.workspace = true
|
||||
log-query.workspace = true
|
||||
loki-proto.workspace = true
|
||||
metric-engine.workspace = true
|
||||
mime_guess = "2.0"
|
||||
notify.workspace = true
|
||||
object-pool = "0.5"
|
||||
@@ -114,6 +115,7 @@ serde_json.workspace = true
|
||||
session.workspace = true
|
||||
simd-json.workspace = true
|
||||
simdutf8 = "0.1"
|
||||
smallvec.workspace = true
|
||||
snafu.workspace = true
|
||||
snap = "1"
|
||||
socket2 = "0.5"
|
||||
|
||||
@@ -392,7 +392,7 @@ pub enum Error {
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Error accessing catalog"))]
|
||||
#[snafu(transparent)]
|
||||
Catalog {
|
||||
source: catalog::error::Error,
|
||||
#[snafu(implicit)]
|
||||
@@ -678,6 +678,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(transparent)]
|
||||
DataTypes {
|
||||
source: datatypes::error::Error,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -756,6 +763,7 @@ impl ErrorExt for Error {
|
||||
|
||||
Catalog { source, .. } => source.status_code(),
|
||||
RowWriter { source, .. } => source.status_code(),
|
||||
DataTypes { source, .. } => source.status_code(),
|
||||
|
||||
TlsRequired { .. } => StatusCode::Unknown,
|
||||
Auth { source, .. } => source.status_code(),
|
||||
|
||||
@@ -49,7 +49,7 @@ use table::table_reference::TableReference;
|
||||
use vrl::value::{KeyString, Value as VrlValue};
|
||||
|
||||
use crate::error::{
|
||||
CatalogSnafu, Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result,
|
||||
Error, InvalidParameterSnafu, OtherSnafu, ParseJsonSnafu, PipelineSnafu, Result,
|
||||
status_code_to_http_status,
|
||||
};
|
||||
use crate::http::HttpResponse;
|
||||
@@ -265,12 +265,7 @@ pub async fn query_pipeline_ddl(
|
||||
.map_err(BoxedError::new)
|
||||
.context(OtherSnafu)?;
|
||||
|
||||
let message = if handler
|
||||
.get_table(&table_name, &query_ctx)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.is_some()
|
||||
{
|
||||
let message = if handler.get_table(&table_name, &query_ctx).await?.is_some() {
|
||||
Some(CREATE_TABLE_SQL_TABLE_EXISTS.to_string())
|
||||
} else if pipeline.is_variant_table_name() {
|
||||
Some(CREATE_TABLE_SQL_SUFFIX_EXISTS.to_string())
|
||||
|
||||
@@ -65,9 +65,8 @@ use store_api::metric_engine_consts::{
|
||||
|
||||
pub use super::result::prometheus_resp::PrometheusJsonResponse;
|
||||
use crate::error::{
|
||||
CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
|
||||
InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
|
||||
UnexpectedResultSnafu,
|
||||
CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error, InvalidQuerySnafu,
|
||||
NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu, UnexpectedResultSnafu,
|
||||
};
|
||||
use crate::http::header::collect_plan_metrics;
|
||||
use crate::prom_store::{FIELD_NAME_LABEL, METRIC_NAME_LABEL, is_database_selection_label};
|
||||
@@ -662,8 +661,7 @@ async fn retrieve_series_from_query_result(
|
||||
table_name,
|
||||
Some(query_ctx),
|
||||
)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.await?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
catalog: query_ctx.current_catalog(),
|
||||
schema: query_ctx.current_schema(),
|
||||
@@ -1440,7 +1438,7 @@ async fn retrieve_table_names(
|
||||
});
|
||||
|
||||
while let Some(table) = tables_stream.next().await {
|
||||
let table = table.context(CatalogSnafu)?;
|
||||
let table = table?;
|
||||
if !table
|
||||
.table_info()
|
||||
.meta
|
||||
@@ -1497,7 +1495,7 @@ async fn retrieve_field_names(
|
||||
.next()
|
||||
.await
|
||||
{
|
||||
let table = table.context(CatalogSnafu)?;
|
||||
let table = table?;
|
||||
for column in table.field_columns() {
|
||||
field_columns.insert(column.name);
|
||||
}
|
||||
@@ -1508,8 +1506,7 @@ async fn retrieve_field_names(
|
||||
for table_name in matches {
|
||||
let table = manager
|
||||
.table(catalog, &schema, &table_name, Some(query_ctx))
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
.await?
|
||||
.with_context(|| TableNotFoundSnafu {
|
||||
catalog: catalog.to_string(),
|
||||
schema: schema.clone(),
|
||||
@@ -1533,8 +1530,7 @@ async fn retrieve_schema_names(
|
||||
|
||||
let candidate_schemas = catalog_manager
|
||||
.schema_names(catalog, Some(query_ctx))
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
.await?;
|
||||
|
||||
for schema in candidate_schemas {
|
||||
let mut found = true;
|
||||
@@ -1542,8 +1538,7 @@ async fn retrieve_schema_names(
|
||||
if let Some(table_name) = retrieve_metric_name_from_promql(match_item) {
|
||||
let exists = catalog_manager
|
||||
.table_exists(catalog, &schema, &table_name, Some(query_ctx))
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
.await?;
|
||||
if !exists {
|
||||
found = false;
|
||||
break;
|
||||
|
||||
@@ -42,6 +42,7 @@ pub mod pending_rows_batcher;
|
||||
mod pipeline;
|
||||
pub mod postgres;
|
||||
pub mod prom_remote_write;
|
||||
pub(crate) mod prom_row_builder;
|
||||
pub mod prom_store;
|
||||
pub mod prometheus;
|
||||
pub mod prometheus_handler;
|
||||
|
||||
@@ -177,6 +177,13 @@ lazy_static! {
|
||||
vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
|
||||
)
|
||||
.unwrap();
|
||||
pub static ref PENDING_ROWS_BATCH_FLUSH_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_prom_store_pending_rows_batch_flush_stage_elapsed",
|
||||
"Elapsed time of pending rows batch flush stages in seconds",
|
||||
&["stage"],
|
||||
vec![0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0]
|
||||
)
|
||||
.unwrap();
|
||||
/// Http prometheus read duration per database.
|
||||
pub static ref METRIC_HTTP_PROM_STORE_READ_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"greptime_servers_http_prometheus_read_elapsed",
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -28,7 +28,7 @@ use session::context::{Channel, QueryContextRef};
|
||||
use snafu::ResultExt;
|
||||
use vrl::value::Value as VrlValue;
|
||||
|
||||
use crate::error::{CatalogSnafu, PipelineSnafu, Result};
|
||||
use crate::error::{PipelineSnafu, Result};
|
||||
use crate::http::event::PipelineIngestRequest;
|
||||
use crate::metrics::{
|
||||
METRIC_FAILURE_VALUE, METRIC_HTTP_LOGS_TRANSFORM_ELAPSED, METRIC_SUCCESS_VALUE,
|
||||
@@ -89,10 +89,7 @@ async fn run_identity_pipeline(
|
||||
let table = if pipeline_ctx.channel == Channel::Prometheus {
|
||||
None
|
||||
} else {
|
||||
handler
|
||||
.get_table(&table_name, query_ctx)
|
||||
.await
|
||||
.context(CatalogSnafu)?
|
||||
handler.get_table(&table_name, query_ctx).await?
|
||||
};
|
||||
identity_pipeline(data_array, table, pipeline_ctx)
|
||||
.map(|opt_map| ContextReq::from_opt_map(opt_map, table_name))
|
||||
@@ -141,10 +138,7 @@ async fn run_custom_pipeline(
|
||||
}
|
||||
};
|
||||
|
||||
let table = handler
|
||||
.get_table(&table_name, query_ctx)
|
||||
.await
|
||||
.context(CatalogSnafu)?;
|
||||
let table = handler.get_table(&table_name, query_ctx).await?;
|
||||
schema_info.set_table(table);
|
||||
|
||||
for pipeline_map in pipeline_maps {
|
||||
|
||||
557
src/servers/src/prom_row_builder.rs
Normal file
557
src/servers/src/prom_row_builder.rs
Normal file
@@ -0,0 +1,557 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! Prometheus row-level helpers for converting proto `Rows` into Arrow
|
||||
//! `RecordBatch`es and aligning / normalizing their schemas against
|
||||
//! existing table schemas in the catalog.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::helper::ColumnDataTypeWrapper;
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnSchema, Rows, SemanticType};
|
||||
use arrow::array::{
|
||||
ArrayRef, Float64Builder, StringBuilder, TimestampMicrosecondBuilder,
|
||||
TimestampMillisecondBuilder, TimestampNanosecondBuilder, TimestampSecondBuilder,
|
||||
new_null_array,
|
||||
};
|
||||
use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_schema::TimeUnit;
|
||||
use common_query::prelude::{greptime_timestamp, greptime_value};
|
||||
use datatypes::data_type::DataType;
|
||||
use datatypes::prelude::ConcreteDataType;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
|
||||
use crate::error;
|
||||
use crate::error::Result;
|
||||
|
||||
/// Extract timestamp, field, and tag column names from a logical region schema.
|
||||
fn unzip_logical_region_schema(
|
||||
target_schema: &ArrowSchema,
|
||||
) -> Result<(String, String, HashSet<String>)> {
|
||||
let mut timestamp_column = None;
|
||||
let mut field_column = None;
|
||||
let mut tag_columns = HashSet::with_capacity(target_schema.fields.len().saturating_sub(2));
|
||||
for field in target_schema.fields() {
|
||||
if field.name() == greptime_timestamp() {
|
||||
timestamp_column = Some(field.name().clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
if field.name() == greptime_value() {
|
||||
field_column = Some(field.name().clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
if timestamp_column.is_none() && matches!(field.data_type(), ArrowDataType::Timestamp(_, _))
|
||||
{
|
||||
timestamp_column = Some(field.name().clone());
|
||||
continue;
|
||||
}
|
||||
|
||||
if field_column.is_none() && matches!(field.data_type(), ArrowDataType::Float64) {
|
||||
field_column = Some(field.name().clone());
|
||||
continue;
|
||||
}
|
||||
tag_columns.insert(field.name().clone());
|
||||
}
|
||||
|
||||
let timestamp_column = timestamp_column.with_context(|| error::UnexpectedResultSnafu {
|
||||
reason: "Failed to locate timestamp column in target schema".to_string(),
|
||||
})?;
|
||||
let field_column = field_column.with_context(|| error::UnexpectedResultSnafu {
|
||||
reason: "Failed to locate field column in target schema".to_string(),
|
||||
})?;
|
||||
|
||||
Ok((timestamp_column, field_column, tag_columns))
|
||||
}
|
||||
|
||||
/// Directly converts proto `Rows` into a `RecordBatch` aligned to the given
|
||||
/// `target_schema`, handling Prometheus column renaming (timestamp/value),
|
||||
/// reordering, type casting, and null-filling in a single pass.
|
||||
pub(crate) fn rows_to_aligned_record_batch(
|
||||
rows: &Rows,
|
||||
target_schema: &ArrowSchema,
|
||||
) -> Result<RecordBatch> {
|
||||
let row_count = rows.rows.len();
|
||||
let column_count = rows.schema.len();
|
||||
|
||||
for (idx, row) in rows.rows.iter().enumerate() {
|
||||
ensure!(
|
||||
row.values.len() == column_count,
|
||||
error::InternalSnafu {
|
||||
err_msg: format!(
|
||||
"Column count mismatch in row {}, expected {}, got {}",
|
||||
idx,
|
||||
column_count,
|
||||
row.values.len()
|
||||
)
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
let (target_ts_name, target_field_name, _target_tags) =
|
||||
unzip_logical_region_schema(target_schema)?;
|
||||
|
||||
// Map effective target column name → (source column index, source arrow type).
|
||||
// Handles prom renames: Timestamp → target ts name, Float64 → target field name.
|
||||
let mut source_map: HashMap<&str, (usize, ArrowDataType)> =
|
||||
HashMap::with_capacity(rows.schema.len());
|
||||
|
||||
for (src_idx, col) in rows.schema.iter().enumerate() {
|
||||
let wrapper = ColumnDataTypeWrapper::try_new(col.datatype, col.datatype_extension.clone())?;
|
||||
let src_arrow_type = ConcreteDataType::from(wrapper).as_arrow_type();
|
||||
|
||||
match &src_arrow_type {
|
||||
ArrowDataType::Float64 => {
|
||||
source_map.insert(&target_field_name, (src_idx, src_arrow_type));
|
||||
}
|
||||
ArrowDataType::Timestamp(unit, _) => {
|
||||
ensure!(
|
||||
unit == &TimeUnit::Millisecond,
|
||||
error::InvalidPromRemoteRequestSnafu {
|
||||
msg: format!(
|
||||
"Unexpected remote write batch timestamp unit, expect millisecond, got: {}",
|
||||
unit
|
||||
)
|
||||
}
|
||||
);
|
||||
source_map.insert(&target_ts_name, (src_idx, src_arrow_type));
|
||||
}
|
||||
ArrowDataType::Utf8 => {
|
||||
source_map.insert(&col.column_name, (src_idx, src_arrow_type));
|
||||
}
|
||||
other => {
|
||||
return error::InvalidPromRemoteRequestSnafu {
|
||||
msg: format!(
|
||||
"Unexpected remote write batch field type {}, field name: {}",
|
||||
other, col.column_name
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Build columns in target schema order
|
||||
let mut columns = Vec::with_capacity(target_schema.fields().len());
|
||||
for target_field in target_schema.fields() {
|
||||
if let Some((src_idx, src_arrow_type)) = source_map.get(target_field.name().as_str()) {
|
||||
let array = build_arrow_array(
|
||||
rows,
|
||||
*src_idx,
|
||||
&rows.schema[*src_idx].column_name,
|
||||
src_arrow_type.clone(),
|
||||
row_count,
|
||||
)?;
|
||||
columns.push(array);
|
||||
} else {
|
||||
columns.push(new_null_array(target_field.data_type(), row_count));
|
||||
}
|
||||
}
|
||||
|
||||
let batch = RecordBatch::try_new(Arc::new(target_schema.clone()), columns)
|
||||
.context(error::ArrowSnafu)?;
|
||||
Ok(batch)
|
||||
}
|
||||
|
||||
/// Identify tag columns in the proto `rows_schema` that are absent from the
|
||||
/// target region schema, without building an intermediate `RecordBatch`.
|
||||
pub(crate) fn identify_missing_columns_from_proto(
|
||||
rows_schema: &[ColumnSchema],
|
||||
target_schema: &ArrowSchema,
|
||||
) -> Result<Vec<String>> {
|
||||
let (_, _, target_tags) = unzip_logical_region_schema(target_schema)?;
|
||||
let mut missing = Vec::new();
|
||||
for col in rows_schema {
|
||||
let wrapper = ColumnDataTypeWrapper::try_new(col.datatype, col.datatype_extension.clone())?;
|
||||
let arrow_type = ConcreteDataType::from(wrapper).as_arrow_type();
|
||||
if matches!(arrow_type, ArrowDataType::Utf8)
|
||||
&& !target_tags.contains(&col.column_name)
|
||||
&& target_schema.column_with_name(&col.column_name).is_none()
|
||||
{
|
||||
missing.push(col.column_name.clone());
|
||||
}
|
||||
}
|
||||
Ok(missing)
|
||||
}
|
||||
|
||||
/// Build a `Vec<ColumnSchema>` suitable for creating a new Prometheus logical table
|
||||
/// directly from the proto `rows.schema`, avoiding the round-trip through Arrow schema.
|
||||
pub fn build_prom_create_table_schema_from_proto(
|
||||
rows_schema: &[ColumnSchema],
|
||||
) -> Result<Vec<ColumnSchema>> {
|
||||
rows_schema
|
||||
.iter()
|
||||
.map(|col| {
|
||||
let semantic_type = if col.datatype == api::v1::ColumnDataType::TimestampMillisecond as i32 {
|
||||
SemanticType::Timestamp
|
||||
} else if col.datatype == api::v1::ColumnDataType::Float64 as i32 {
|
||||
SemanticType::Field
|
||||
} else {
|
||||
// tag columns must be String type
|
||||
ensure!(col.datatype == api::v1::ColumnDataType::String as i32, error::InvalidPromRemoteRequestSnafu{
|
||||
msg: format!(
|
||||
"Failed to build create table schema, tag column '{}' must be String but got datatype {}",
|
||||
col.column_name, col.datatype
|
||||
)
|
||||
});
|
||||
SemanticType::Tag
|
||||
};
|
||||
|
||||
Ok(ColumnSchema {
|
||||
column_name: col.column_name.clone(),
|
||||
datatype: col.datatype,
|
||||
semantic_type: semantic_type as i32,
|
||||
datatype_extension: col.datatype_extension.clone(),
|
||||
options: None,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Build a single Arrow array for the given column index from proto `Rows`.
|
||||
fn build_arrow_array(
|
||||
rows: &Rows,
|
||||
col_idx: usize,
|
||||
column_name: &String,
|
||||
column_data_type: arrow::datatypes::DataType,
|
||||
row_count: usize,
|
||||
) -> Result<ArrayRef> {
|
||||
macro_rules! build_array {
|
||||
($builder:expr, $( $pattern:pat => $value:expr ),+ $(,)?) => {{
|
||||
let mut builder = $builder;
|
||||
for row in &rows.rows {
|
||||
match row.values[col_idx].value_data.as_ref() {
|
||||
$(Some($pattern) => builder.append_value($value),)+
|
||||
Some(v) => {
|
||||
return error::InvalidPromRemoteRequestSnafu {
|
||||
msg: format!("Unexpected value: {:?}", v),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
None => builder.append_null(),
|
||||
}
|
||||
}
|
||||
Arc::new(builder.finish()) as ArrayRef
|
||||
}};
|
||||
}
|
||||
|
||||
let array: ArrayRef = match column_data_type {
|
||||
arrow::datatypes::DataType::Float64 => {
|
||||
build_array!(Float64Builder::with_capacity(row_count), ValueData::F64Value(v) => *v)
|
||||
}
|
||||
arrow::datatypes::DataType::Utf8 => build_array!(
|
||||
StringBuilder::with_capacity(row_count, 0),
|
||||
ValueData::StringValue(v) => v
|
||||
),
|
||||
arrow::datatypes::DataType::Timestamp(u, _) => match u {
|
||||
TimeUnit::Second => build_array!(
|
||||
TimestampSecondBuilder::with_capacity(row_count),
|
||||
ValueData::TimestampSecondValue(v) => *v
|
||||
),
|
||||
TimeUnit::Millisecond => build_array!(
|
||||
TimestampMillisecondBuilder::with_capacity(row_count),
|
||||
ValueData::TimestampMillisecondValue(v) => *v
|
||||
),
|
||||
TimeUnit::Microsecond => build_array!(
|
||||
TimestampMicrosecondBuilder::with_capacity(row_count),
|
||||
ValueData::DatetimeValue(v) => *v,
|
||||
ValueData::TimestampMicrosecondValue(v) => *v
|
||||
),
|
||||
TimeUnit::Nanosecond => build_array!(
|
||||
TimestampNanosecondBuilder::with_capacity(row_count),
|
||||
ValueData::TimestampNanosecondValue(v) => *v
|
||||
),
|
||||
},
|
||||
ty => {
|
||||
return error::InvalidPromRemoteRequestSnafu {
|
||||
msg: format!(
|
||||
"Unexpected column type {:?}, column name: {}",
|
||||
ty, column_name
|
||||
),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
|
||||
Ok(array)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
|
||||
use arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
|
||||
use arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
|
||||
|
||||
use super::{
|
||||
build_prom_create_table_schema_from_proto, identify_missing_columns_from_proto,
|
||||
rows_to_aligned_record_batch,
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn test_rows_to_aligned_record_batch_renames_and_reorders() {
|
||||
let rows = Rows {
|
||||
schema: vec![
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "host".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_value".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
rows: vec![
|
||||
Row {
|
||||
values: vec![
|
||||
Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(1000)),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::StringValue("h1".to_string())),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::F64Value(42.0)),
|
||||
},
|
||||
],
|
||||
},
|
||||
Row {
|
||||
values: vec![
|
||||
Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(2000)),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::StringValue("h2".to_string())),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::F64Value(99.0)),
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
// Target schema has renamed columns and different ordering.
|
||||
let target = ArrowSchema::new(vec![
|
||||
Field::new(
|
||||
"my_ts",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("my_value", DataType::Float64, true),
|
||||
]);
|
||||
|
||||
let batch = rows_to_aligned_record_batch(&rows, &target).unwrap();
|
||||
assert_eq!(batch.schema().as_ref(), &target);
|
||||
assert_eq!(2, batch.num_rows());
|
||||
assert_eq!(3, batch.num_columns());
|
||||
|
||||
let ts = batch
|
||||
.column(0)
|
||||
.as_any()
|
||||
.downcast_ref::<TimestampMillisecondArray>()
|
||||
.unwrap();
|
||||
assert_eq!(ts.value(0), 1000);
|
||||
assert_eq!(ts.value(1), 2000);
|
||||
|
||||
let hosts = batch
|
||||
.column(1)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
assert_eq!(hosts.value(0), "h1");
|
||||
assert_eq!(hosts.value(1), "h2");
|
||||
|
||||
let values = batch
|
||||
.column(2)
|
||||
.as_any()
|
||||
.downcast_ref::<Float64Array>()
|
||||
.unwrap();
|
||||
assert_eq!(values.value(0), 42.0);
|
||||
assert_eq!(values.value(1), 99.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_rows_to_aligned_record_batch_fills_nulls() {
|
||||
let rows = Rows {
|
||||
schema: vec![
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "host".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "instance".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_value".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
..Default::default()
|
||||
},
|
||||
],
|
||||
rows: vec![Row {
|
||||
values: vec![
|
||||
Value {
|
||||
value_data: Some(ValueData::TimestampMillisecondValue(1000)),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::StringValue("h1".to_string())),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::StringValue("i1".to_string())),
|
||||
},
|
||||
Value {
|
||||
value_data: Some(ValueData::F64Value(1.0)),
|
||||
},
|
||||
],
|
||||
}],
|
||||
};
|
||||
|
||||
// Target schema has "host" but not "instance"; also has "region" which is missing from source.
|
||||
let target = ArrowSchema::new(vec![
|
||||
Field::new(
|
||||
"my_ts",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("region", DataType::Utf8, true),
|
||||
Field::new("my_value", DataType::Float64, true),
|
||||
]);
|
||||
|
||||
let batch = rows_to_aligned_record_batch(&rows, &target).unwrap();
|
||||
assert_eq!(batch.schema().as_ref(), &target);
|
||||
assert_eq!(1, batch.num_rows());
|
||||
assert_eq!(4, batch.num_columns());
|
||||
|
||||
// "region" column should be null-filled.
|
||||
let region = batch
|
||||
.column(2)
|
||||
.as_any()
|
||||
.downcast_ref::<StringArray>()
|
||||
.unwrap();
|
||||
assert!(region.is_null(0));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_identify_missing_columns_from_proto() {
|
||||
let rows_schema = vec![
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "host".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "instance".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_value".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
..Default::default()
|
||||
},
|
||||
];
|
||||
|
||||
let target = ArrowSchema::new(vec![
|
||||
Field::new(
|
||||
"my_ts",
|
||||
DataType::Timestamp(TimeUnit::Millisecond, None),
|
||||
false,
|
||||
),
|
||||
Field::new("host", DataType::Utf8, true),
|
||||
Field::new("my_value", DataType::Float64, true),
|
||||
]);
|
||||
|
||||
let missing = identify_missing_columns_from_proto(&rows_schema, &target).unwrap();
|
||||
assert_eq!(missing, vec!["instance".to_string()]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_build_prom_create_table_schema_from_proto() {
|
||||
let rows_schema = vec![
|
||||
ColumnSchema {
|
||||
column_name: "greptime_timestamp".to_string(),
|
||||
datatype: ColumnDataType::TimestampMillisecond as i32,
|
||||
semantic_type: SemanticType::Timestamp as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "job".to_string(),
|
||||
datatype: ColumnDataType::String as i32,
|
||||
semantic_type: SemanticType::Tag as i32,
|
||||
..Default::default()
|
||||
},
|
||||
ColumnSchema {
|
||||
column_name: "greptime_value".to_string(),
|
||||
datatype: ColumnDataType::Float64 as i32,
|
||||
semantic_type: SemanticType::Field as i32,
|
||||
..Default::default()
|
||||
},
|
||||
];
|
||||
|
||||
let schema = build_prom_create_table_schema_from_proto(&rows_schema).unwrap();
|
||||
assert_eq!(3, schema.len());
|
||||
|
||||
assert_eq!("greptime_timestamp", schema[0].column_name);
|
||||
assert_eq!(SemanticType::Timestamp as i32, schema[0].semantic_type);
|
||||
assert_eq!(
|
||||
ColumnDataType::TimestampMillisecond as i32,
|
||||
schema[0].datatype
|
||||
);
|
||||
|
||||
assert_eq!("job", schema[1].column_name);
|
||||
assert_eq!(SemanticType::Tag as i32, schema[1].semantic_type);
|
||||
assert_eq!(ColumnDataType::String as i32, schema[1].datatype);
|
||||
|
||||
assert_eq!("greptime_value", schema[2].column_name);
|
||||
assert_eq!(SemanticType::Field as i32, schema[2].semantic_type);
|
||||
assert_eq!(ColumnDataType::Float64 as i32, schema[2].datatype);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user