From 109b70750afd289b7bc84929d93a4f03c3b7769c Mon Sep 17 00:00:00 2001 From: LFC <990479+MichaelScofield@users.noreply.github.com> Date: Thu, 30 Oct 2025 18:24:12 +0800 Subject: [PATCH] refactor: convert to prometheus values directly from arrow (#7153) * refactor: convert to prometheus values directly from arrow Signed-off-by: luofucong * resolve PR comments Signed-off-by: luofucong --------- Signed-off-by: luofucong --- src/servers/src/http/prometheus.rs | 402 +++++++++++++++++++++++++++-- tests-integration/tests/http.rs | 8 +- 2 files changed, 388 insertions(+), 22 deletions(-) diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index e4c9677a4c..f9d1e1c21b 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -13,22 +13,43 @@ // limitations under the License. //! prom supply the prometheus HTTP API Server compliance + +use std::borrow::Borrow; use std::collections::{BTreeMap, HashMap, HashSet}; +use std::hash::{Hash, Hasher}; use std::sync::Arc; +use arrow::array::AsArray; +use arrow::datatypes::{ + Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType, + DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type, + Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, +}; +use arrow_schema::{DataType, IntervalUnit, TimeUnit}; use axum::extract::{Path, Query, State}; use axum::{Extension, Form}; use catalog::CatalogManagerRef; use common_catalog::parse_catalog_and_schema_from_db_string; +use common_decimal::Decimal128; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; use common_query::{Output, OutputData}; -use common_recordbatch::RecordBatches; +use common_recordbatch::{RecordBatch, RecordBatches}; use common_telemetry::{debug, tracing}; +use common_time::time::Time; use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; +use common_time::{ + Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp, +}; use common_version::OwnedBuildInfo; +use datafusion_common::ScalarValue; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; +use datatypes::schema::{ColumnSchema, SchemaRef}; +use datatypes::types::jsonb_to_string; use datatypes::vectors::Float64Vector; use futures::StreamExt; use futures::future::join_all; @@ -53,8 +74,9 @@ use store_api::metric_engine_consts::{ pub use super::result::prometheus_resp::PrometheusJsonResponse; use crate::error::{ - CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result, - TableNotFoundSnafu, UnexpectedResultSnafu, + CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error, + InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu, + UnexpectedResultSnafu, }; use crate::http::header::collect_plan_metrics; use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL}; @@ -98,12 +120,23 @@ pub struct PromData { pub result: PromQueryResult, } +/// A "holder" for the reference([Arc]) to a column name, +/// to help avoiding cloning [String]s when used as a [HashMap] key. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub struct Column(Arc); + +impl From<&str> for Column { + fn from(s: &str) -> Self { + Self(Arc::new(s.to_string())) + } +} + #[derive(Debug, Default, Serialize, Deserialize, PartialEq)] #[serde(untagged)] pub enum PrometheusResponse { PromData(PromData), Labels(Vec), - Series(Vec>), + Series(Vec>), LabelValues(Vec), FormatQuery(String), BuildInfo(OwnedBuildInfo), @@ -622,7 +655,7 @@ async fn get_all_column_names( async fn retrieve_series_from_query_result( result: Result, - series: &mut Vec>, + series: &mut Vec>, query_ctx: &QueryContext, table_name: &str, manager: &CatalogManagerRef, @@ -700,7 +733,7 @@ async fn retrieve_labels_name_from_query_result( fn record_batches_to_series( batches: RecordBatches, - series: &mut Vec>, + series: &mut Vec>, table_name: &str, tag_columns: &HashSet, ) -> Result<()> { @@ -723,22 +756,355 @@ fn record_batches_to_series( .try_project(&projection) .context(CollectRecordbatchSnafu)?; - for row in batch.rows() { - let mut element: HashMap = row - .iter() - .enumerate() - .map(|(idx, column)| { - let column_name = batch.schema.column_name_by_index(idx); - (column_name.to_string(), column.to_string()) - }) - .collect(); - let _ = element.insert("__name__".to_string(), table_name.to_string()); - series.push(element); - } + let mut writer = RowWriter::new(&batch.schema, table_name); + writer.write(batch, series)?; } Ok(()) } +/// Writer from a row in the record batch to a Prometheus time series: +/// +/// `{__name__="",