chore: sort range query return values (#6474)

* chore: sort range query return values

* chore: add comments

* chore: add is_sorted check

* fix: test
This commit is contained in:
shuiyisong
2025-07-09 10:27:12 +08:00
committed by GitHub
parent a4bd11fb9c
commit f7282fde28
3 changed files with 29 additions and 11 deletions

View File

@@ -13,7 +13,7 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::{HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use axum::extract::{Path, Query, State};
@@ -62,7 +62,7 @@ use crate::prometheus_handler::PrometheusHandlerRef;
/// For [ValueType::Vector] result type
#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct PromSeriesVector {
pub metric: HashMap<String, String>,
pub metric: BTreeMap<String, String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub value: Option<(f64, String)>,
}
@@ -70,7 +70,7 @@ pub struct PromSeriesVector {
/// For [ValueType::Matrix] result type
#[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct PromSeriesMatrix {
pub metric: HashMap<String, String>,
pub metric: BTreeMap<String, String>,
pub values: Vec<(f64, String)>,
}

View File

@@ -13,7 +13,8 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::HashMap;
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
@@ -311,7 +312,7 @@ impl PrometheusJsonResponse {
let metric = tags
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect::<HashMap<_, _>>();
.collect::<BTreeMap<_, _>>();
match result {
PromQueryResult::Vector(ref mut v) => {
v.push(PromSeriesVector {
@@ -320,6 +321,11 @@ impl PrometheusJsonResponse {
});
}
PromQueryResult::Matrix(ref mut v) => {
// sort values by timestamp
if !values.is_sorted_by(|a, b| a.0 <= b.0) {
values.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(Ordering::Equal));
}
v.push(PromSeriesMatrix { metric, values });
}
PromQueryResult::Scalar(ref mut v) => {
@@ -331,6 +337,12 @@ impl PrometheusJsonResponse {
}
});
// sort matrix by metric
// see: https://prometheus.io/docs/prometheus/3.5/querying/api/#range-vectors
if let PromQueryResult::Matrix(ref mut v) = result {
v.sort_by(|a, b| a.metric.cmp(&b.metric));
}
let result_type_string = result_type.to_string();
let data = PrometheusResponse::PromData(PromData {
result_type: result_type_string,

View File

@@ -2405,14 +2405,19 @@ processors:
ignore_missing: true
- vrl:
source: |
.log_id = .id
del(.id)
.from_source = "channel_2"
cond, err = .id1 > .id2
if (cond) {
.from_source = "channel_1"
}
del(.id1)
del(.id2)
.
transform:
- fields:
- log_id
type: int32
- from_source
type: string
- field: time
type: time
index: timestamp
@@ -2432,7 +2437,8 @@ transform:
let data_body = r#"
[
{
"id": "2436",
"id1": 2436,
"id2": 123,
"time": "2024-05-25 20:16:37.217"
}
]
@@ -2449,7 +2455,7 @@ transform:
"test_pipeline_with_vrl",
&client,
"select * from d_table",
"[[2436,1716668197217000000]]",
"[[\"channel_1\",1716668197217000000]]",
)
.await;