refactor: convert to prometheus values directly from arrow (#7153)

* refactor: convert to prometheus values directly from arrow

Signed-off-by: luofucong <luofc@foxmail.com>

* resolve PR comments

Signed-off-by: luofucong <luofc@foxmail.com>

---------

Signed-off-by: luofucong <luofc@foxmail.com>
This commit is contained in:
LFC
2025-10-30 18:24:12 +08:00
committed by GitHub
parent ee5b7ff3c8
commit 109b70750a
2 changed files with 388 additions and 22 deletions

View File

@@ -13,22 +13,43 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::borrow::Borrow;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
use arrow::array::AsArray;
use arrow::datatypes::{
Date32Type, Date64Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
};
use arrow_schema::{DataType, IntervalUnit, TimeUnit};
use axum::extract::{Path, Query, State};
use axum::{Extension, Form};
use catalog::CatalogManagerRef;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_decimal::Decimal128;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::{Output, OutputData};
use common_recordbatch::RecordBatches;
use common_recordbatch::{RecordBatch, RecordBatches};
use common_telemetry::{debug, tracing};
use common_time::time::Time;
use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
use common_time::{
Date, Duration, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp,
};
use common_version::OwnedBuildInfo;
use datafusion_common::ScalarValue;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::types::jsonb_to_string;
use datatypes::vectors::Float64Vector;
use futures::StreamExt;
use futures::future::join_all;
@@ -53,8 +74,9 @@ use store_api::metric_engine_consts::{
pub use super::result::prometheus_resp::PrometheusJsonResponse;
use crate::error::{
CatalogSnafu, CollectRecordbatchSnafu, Error, InvalidQuerySnafu, ParseTimestampSnafu, Result,
TableNotFoundSnafu, UnexpectedResultSnafu,
CatalogSnafu, CollectRecordbatchSnafu, ConvertScalarValueSnafu, DataFusionSnafu, Error,
InvalidQuerySnafu, NotSupportedSnafu, ParseTimestampSnafu, Result, TableNotFoundSnafu,
UnexpectedResultSnafu,
};
use crate::http::header::collect_plan_metrics;
use crate::prom_store::{DATABASE_LABEL, FIELD_NAME_LABEL, METRIC_NAME_LABEL, SCHEMA_LABEL};
@@ -98,12 +120,23 @@ pub struct PromData {
pub result: PromQueryResult,
}
/// A "holder" for the reference([Arc]) to a column name,
/// to help avoiding cloning [String]s when used as a [HashMap] key.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct Column(Arc<String>);
impl From<&str> for Column {
fn from(s: &str) -> Self {
Self(Arc::new(s.to_string()))
}
}
#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum PrometheusResponse {
PromData(PromData),
Labels(Vec<String>),
Series(Vec<HashMap<String, String>>),
Series(Vec<HashMap<Column, String>>),
LabelValues(Vec<String>),
FormatQuery(String),
BuildInfo(OwnedBuildInfo),
@@ -622,7 +655,7 @@ async fn get_all_column_names(
async fn retrieve_series_from_query_result(
result: Result<Output>,
series: &mut Vec<HashMap<String, String>>,
series: &mut Vec<HashMap<Column, String>>,
query_ctx: &QueryContext,
table_name: &str,
manager: &CatalogManagerRef,
@@ -700,7 +733,7 @@ async fn retrieve_labels_name_from_query_result(
fn record_batches_to_series(
batches: RecordBatches,
series: &mut Vec<HashMap<String, String>>,
series: &mut Vec<HashMap<Column, String>>,
table_name: &str,
tag_columns: &HashSet<String>,
) -> Result<()> {
@@ -723,22 +756,355 @@ fn record_batches_to_series(
.try_project(&projection)
.context(CollectRecordbatchSnafu)?;
for row in batch.rows() {
let mut element: HashMap<String, String> = row
.iter()
.enumerate()
.map(|(idx, column)| {
let column_name = batch.schema.column_name_by_index(idx);
(column_name.to_string(), column.to_string())
})
.collect();
let _ = element.insert("__name__".to_string(), table_name.to_string());
series.push(element);
}
let mut writer = RowWriter::new(&batch.schema, table_name);
writer.write(batch, series)?;
}
Ok(())
}
/// Writer from a row in the record batch to a Prometheus time series:
///
/// `{__name__="<metric name>", <label name>="<label value>", ...}`
///
/// The metrics name is the table name; label names are the column names and
/// the label values are the corresponding row values (all are converted to strings).
struct RowWriter {
/// The template that is to produce a Prometheus time series. It is pre-filled with metrics name
/// and label names, waiting to be filled by row values afterward.
template: HashMap<Column, Option<String>>,
/// The current filling row.
current: Option<HashMap<Column, Option<String>>>,
}
impl RowWriter {
fn new(schema: &SchemaRef, table: &str) -> Self {
let mut template = schema
.column_schemas()
.iter()
.map(|x| (x.name.as_str().into(), None))
.collect::<HashMap<Column, Option<String>>>();
template.insert("__name__".into(), Some(table.to_string()));
Self {
template,
current: None,
}
}
fn insert(&mut self, column: ColumnRef, value: impl ToString) {
let current = self.current.get_or_insert_with(|| self.template.clone());
match current.get_mut(&column as &dyn AsColumnRef) {
Some(x) => {
let _ = x.insert(value.to_string());
}
None => {
let _ = current.insert(column.0.into(), Some(value.to_string()));
}
}
}
fn insert_bytes(&mut self, column_schema: &ColumnSchema, bytes: &[u8]) -> Result<()> {
let column_name = column_schema.name.as_str().into();
if column_schema.data_type.is_json() {
let s = jsonb_to_string(bytes).context(ConvertScalarValueSnafu)?;
self.insert(column_name, s);
} else {
let hex = bytes
.iter()
.map(|b| format!("{b:02x}"))
.collect::<Vec<String>>()
.join("");
self.insert(column_name, hex);
}
Ok(())
}
fn finish(&mut self) -> HashMap<Column, String> {
let Some(current) = self.current.take() else {
return HashMap::new();
};
current
.into_iter()
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect()
}
fn write(
&mut self,
record_batch: RecordBatch,
series: &mut Vec<HashMap<Column, String>>,
) -> Result<()> {
let schema = record_batch.schema.clone();
let record_batch = record_batch.into_df_record_batch();
for i in 0..record_batch.num_rows() {
for (j, array) in record_batch.columns().iter().enumerate() {
let column = schema.column_name_by_index(j).into();
if array.is_null(i) {
self.insert(column, "Null");
continue;
}
match array.data_type() {
DataType::Null => {
self.insert(column, "Null");
}
DataType::Boolean => {
let array = array.as_boolean();
let v = array.value(i);
self.insert(column, v);
}
DataType::UInt8 => {
let array = array.as_primitive::<UInt8Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::UInt16 => {
let array = array.as_primitive::<UInt16Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::UInt32 => {
let array = array.as_primitive::<UInt32Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::UInt64 => {
let array = array.as_primitive::<UInt64Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::Int8 => {
let array = array.as_primitive::<Int8Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::Int16 => {
let array = array.as_primitive::<Int16Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::Int32 => {
let array = array.as_primitive::<Int32Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::Int64 => {
let array = array.as_primitive::<Int64Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::Float32 => {
let array = array.as_primitive::<Float32Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::Float64 => {
let array = array.as_primitive::<Float64Type>();
let v = array.value(i);
self.insert(column, v);
}
DataType::Utf8 => {
let array = array.as_string::<i32>();
let v = array.value(i);
self.insert(column, v);
}
DataType::LargeUtf8 => {
let array = array.as_string::<i64>();
let v = array.value(i);
self.insert(column, v);
}
DataType::Utf8View => {
let array = array.as_string_view();
let v = array.value(i);
self.insert(column, v);
}
DataType::Binary => {
let array = array.as_binary::<i32>();
let v = array.value(i);
let column_schema = &schema.column_schemas()[j];
self.insert_bytes(column_schema, v)?;
}
DataType::LargeBinary => {
let array = array.as_binary::<i64>();
let v = array.value(i);
let column_schema = &schema.column_schemas()[j];
self.insert_bytes(column_schema, v)?;
}
DataType::BinaryView => {
let array = array.as_binary_view();
let v = array.value(i);
let column_schema = &schema.column_schemas()[j];
self.insert_bytes(column_schema, v)?;
}
DataType::Date32 => {
let array = array.as_primitive::<Date32Type>();
let v = Date::new(array.value(i));
self.insert(column, v);
}
DataType::Date64 => {
let array = array.as_primitive::<Date64Type>();
// `Date64` values are milliseconds representation of `Date32` values,
// according to its specification. So we convert the `Date64` value here to
// the `Date32` value to process them unified.
let v = Date::new((array.value(i) / 86_400_000) as i32);
self.insert(column, v);
}
DataType::Timestamp(time_unit, _) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<TimestampSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<TimestampMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<TimestampMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<TimestampNanosecondType>();
array.value(i)
}
};
let v = Timestamp::new(v, time_unit.into());
self.insert(column, v.to_iso8601_string());
}
DataType::Time32(time_unit) | DataType::Time64(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<Time32SecondType>();
Time::new_second(array.value(i) as i64)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<Time32MillisecondType>();
Time::new_millisecond(array.value(i) as i64)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<Time64MicrosecondType>();
Time::new_microsecond(array.value(i))
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<Time64NanosecondType>();
Time::new_nanosecond(array.value(i))
}
};
self.insert(column, v.to_iso8601_string());
}
DataType::Interval(interval_unit) => match interval_unit {
IntervalUnit::YearMonth => {
let array = array.as_primitive::<IntervalYearMonthType>();
let v: IntervalYearMonth = array.value(i).into();
self.insert(column, v.to_iso8601_string());
}
IntervalUnit::DayTime => {
let array = array.as_primitive::<IntervalDayTimeType>();
let v: IntervalDayTime = array.value(i).into();
self.insert(column, v.to_iso8601_string());
}
IntervalUnit::MonthDayNano => {
let array = array.as_primitive::<IntervalMonthDayNanoType>();
let v: IntervalMonthDayNano = array.value(i).into();
self.insert(column, v.to_iso8601_string());
}
},
DataType::Duration(time_unit) => {
let v = match time_unit {
TimeUnit::Second => {
let array = array.as_primitive::<DurationSecondType>();
array.value(i)
}
TimeUnit::Millisecond => {
let array = array.as_primitive::<DurationMillisecondType>();
array.value(i)
}
TimeUnit::Microsecond => {
let array = array.as_primitive::<DurationMicrosecondType>();
array.value(i)
}
TimeUnit::Nanosecond => {
let array = array.as_primitive::<DurationNanosecondType>();
array.value(i)
}
};
let d = Duration::new(v, time_unit.into());
self.insert(column, d);
}
DataType::List(_) => {
let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
self.insert(column, v);
}
DataType::Struct(_) => {
let v = ScalarValue::try_from_array(array, i).context(DataFusionSnafu)?;
self.insert(column, v);
}
DataType::Decimal128(precision, scale) => {
let array = array.as_primitive::<Decimal128Type>();
let v = Decimal128::new(array.value(i), *precision, *scale);
self.insert(column, v);
}
_ => {
return NotSupportedSnafu {
feat: format!("convert {} to http value", array.data_type()),
}
.fail();
}
}
}
series.push(self.finish())
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
struct ColumnRef<'a>(&'a str);
impl<'a> From<&'a str> for ColumnRef<'a> {
fn from(s: &'a str) -> Self {
Self(s)
}
}
trait AsColumnRef {
fn as_ref(&self) -> ColumnRef<'_>;
}
impl AsColumnRef for Column {
fn as_ref(&self) -> ColumnRef<'_> {
self.0.as_str().into()
}
}
impl AsColumnRef for ColumnRef<'_> {
fn as_ref(&self) -> ColumnRef<'_> {
*self
}
}
impl<'a> PartialEq for dyn AsColumnRef + 'a {
fn eq(&self, other: &Self) -> bool {
self.as_ref() == other.as_ref()
}
}
impl<'a> Eq for dyn AsColumnRef + 'a {}
impl<'a> Hash for dyn AsColumnRef + 'a {
fn hash<H: Hasher>(&self, state: &mut H) {
self.as_ref().0.hash(state);
}
}
impl<'a> Borrow<dyn AsColumnRef + 'a> for Column {
fn borrow(&self) -> &(dyn AsColumnRef + 'a) {
self
}
}
/// Retrieve labels name from record batches
fn record_batches_to_labels_name(
batches: RecordBatches,

View File

@@ -49,7 +49,7 @@ use servers::http::header::constants::{
GREPTIME_LOG_TABLE_NAME_HEADER_NAME, GREPTIME_PIPELINE_NAME_HEADER_NAME,
};
use servers::http::header::{GREPTIME_DB_HEADER_NAME, GREPTIME_TIMEZONE_HEADER_NAME};
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::prometheus::{Column, PrometheusJsonResponse, PrometheusResponse};
use servers::http::result::error_result::ErrorResponse;
use servers::http::result::greptime_result_v1::GreptimedbV1Response;
use servers::http::result::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
@@ -849,10 +849,10 @@ pub async fn test_prom_http_api(store_type: StorageType) {
let actual = series
.remove(0)
.into_iter()
.collect::<BTreeMap<String, String>>();
.collect::<BTreeMap<Column, String>>();
let expected = BTreeMap::from([
("__name__".to_string(), "demo".to_string()),
("host".to_string(), "host1".to_string()),
("__name__".into(), "demo".to_string()),
("host".into(), "host1".to_string()),
]);
assert_eq!(actual, expected);