From db2cb6326777df1d967c2e9cb87ecfd76274020d Mon Sep 17 00:00:00 2001 From: Ning Sun Date: Fri, 28 Oct 2022 23:05:00 +0800 Subject: [PATCH] refactor: make prometheus http apis compatible with recent changes --- src/frontend/src/instance/prometheus.rs | 159 +++++++++++----------- src/servers/src/http.rs | 7 - src/servers/src/http/prometheus.rs | 27 +++- src/servers/src/query_handler.rs | 9 +- src/servers/tests/http/prometheus_test.rs | 12 +- 5 files changed, 108 insertions(+), 106 deletions(-) diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index 7f3a3cede3..bc87ed3ab8 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -8,9 +8,8 @@ use common_error::prelude::BoxedError; use common_telemetry::logging; use prost::Message; use servers::error::{self, Result as ServerResult}; -use servers::http::{BytesResponse, HttpResponse}; use servers::prometheus::{self, Metrics}; -use servers::query_handler::PrometheusProtocolHandler; +use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse}; use snafu::{OptionExt, ResultExt}; use crate::instance::Instance; @@ -104,7 +103,7 @@ impl PrometheusProtocolHandler for Instance { Ok(()) } - async fn read(&self, request: ReadRequest) -> ServerResult { + async fn read(&self, request: ReadRequest) -> ServerResult { let response_type = negotiate_response_type(&request.accepted_response_types)?; // TODO(dennis): use read_hints to speedup query if possible @@ -124,11 +123,11 @@ impl PrometheusProtocolHandler for Instance { }; // TODO(dennis): may consume too much memory, adds flow control - Ok(HttpResponse::Bytes(BytesResponse { + Ok(PrometheusResponse { content_type: "application/x-protobuf".to_string(), content_encoding: "snappy".to_string(), - bytes: prometheus::snappy_compress(&response.encode_to_vec())?, - })) + body: prometheus::snappy_compress(&response.encode_to_vec())?, + }) } ResponseType::StreamedXorChunks => error::NotSupportedSnafu { feat: "streamed remote read", @@ -195,88 +194,82 @@ mod tests { ..Default::default() }; - let response = instance.read(read_request).await.unwrap(); + let resp = instance.read(read_request).await.unwrap(); + assert_eq!(resp.content_type, "application/x-protobuf"); + assert_eq!(resp.content_encoding, "snappy"); + let body = prometheus::snappy_decompress(&resp.body).unwrap(); + let read_response = ReadResponse::decode(&body[..]).unwrap(); + let query_results = read_response.results; + assert_eq!(2, query_results.len()); - match response { - HttpResponse::Bytes(resp) => { - assert_eq!(resp.content_type, "application/x-protobuf"); - assert_eq!(resp.content_encoding, "snappy"); - let body = prometheus::snappy_decompress(&resp.bytes).unwrap(); - let read_response = ReadResponse::decode(&body[..]).unwrap(); - let query_results = read_response.results; - assert_eq!(2, query_results.len()); + assert_eq!(1, query_results[0].timeseries.len()); + let timeseries = &query_results[0].timeseries[0]; - assert_eq!(1, query_results[0].timeseries.len()); - let timeseries = &query_results[0].timeseries[0]; + assert_eq!( + vec![ + Label { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric1".to_string(), + }, + Label { + name: "job".to_string(), + value: "spark".to_string(), + }, + ], + timeseries.labels + ); - assert_eq!( - vec![ - Label { - name: prometheus::METRIC_NAME_LABEL.to_string(), - value: "metric1".to_string(), - }, - Label { - name: "job".to_string(), - value: "spark".to_string(), - }, - ], - timeseries.labels - ); + assert_eq!( + timeseries.samples, + vec![ + Sample { + value: 1.0, + timestamp: 1000, + }, + Sample { + value: 2.0, + timestamp: 2000, + } + ] + ); - assert_eq!( - timeseries.samples, - vec![ - Sample { - value: 1.0, - timestamp: 1000, - }, - Sample { - value: 2.0, - timestamp: 2000, - } - ] - ); + assert_eq!(1, query_results[1].timeseries.len()); + let timeseries = &query_results[1].timeseries[0]; - assert_eq!(1, query_results[1].timeseries.len()); - let timeseries = &query_results[1].timeseries[0]; + assert_eq!( + vec![ + Label { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric3".to_string(), + }, + Label { + name: "idc".to_string(), + value: "z002".to_string(), + }, + Label { + name: "app".to_string(), + value: "biz".to_string(), + }, + ], + timeseries.labels + ); - assert_eq!( - vec![ - Label { - name: prometheus::METRIC_NAME_LABEL.to_string(), - value: "metric3".to_string(), - }, - Label { - name: "idc".to_string(), - value: "z002".to_string(), - }, - Label { - name: "app".to_string(), - value: "biz".to_string(), - }, - ], - timeseries.labels - ); - - assert_eq!( - timeseries.samples, - vec![ - Sample { - value: 5.0, - timestamp: 1000, - }, - Sample { - value: 6.0, - timestamp: 2000, - }, - Sample { - value: 7.0, - timestamp: 3000, - } - ] - ); - } - _ => unreachable!(), - } + assert_eq!( + timeseries.samples, + vec![ + Sample { + value: 5.0, + timestamp: 1000, + }, + Sample { + value: 6.0, + timestamp: 2000, + }, + Sample { + value: 7.0, + timestamp: 3000, + } + ] + ); } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 95e4c98eac..14a17b5271 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -48,13 +48,6 @@ pub struct ColumnSchema { data_type: String, } -#[derive(Serialize, Debug)] -pub struct BytesResponse { - pub content_type: String, - pub content_encoding: String, - pub bytes: Vec, -} - #[derive(Debug, Serialize, JsonSchema)] pub struct Schema { column_schemas: Vec, diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index e0074c6007..f740eb322f 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -1,33 +1,46 @@ use api::prometheus::remote::{ReadRequest, WriteRequest}; use axum::extract::{RawBody, State}; +use axum::http::header; use axum::http::StatusCode; +use axum::response::IntoResponse; use hyper::Body; use prost::Message; use snafu::prelude::*; -use crate::error::Result; -use crate::error::{self}; -use crate::http::HttpResponse; +use crate::error::{self, Result}; use crate::prometheus::snappy_decompress; -use crate::query_handler::PrometheusProtocolHandlerRef; +use crate::query_handler::{PrometheusProtocolHandlerRef, PrometheusResponse}; #[axum_macros::debug_handler] pub async fn remote_write( State(handler): State, RawBody(body): RawBody, -) -> Result<(StatusCode, HttpResponse)> { +) -> Result<(StatusCode, ())> { let request = decode_remote_write_request(body).await?; handler.write(request).await?; - Ok((StatusCode::NO_CONTENT, HttpResponse::Text("".to_string()))) + Ok((StatusCode::NO_CONTENT, ())) +} + +impl IntoResponse for PrometheusResponse { + fn into_response(self) -> axum::response::Response { + ( + [ + (header::CONTENT_TYPE, self.content_type), + (header::CONTENT_ENCODING, self.content_encoding), + ], + self.body, + ) + .into_response() + } } #[axum_macros::debug_handler] pub async fn remote_read( State(handler): State, RawBody(body): RawBody, -) -> Result { +) -> Result { let request = decode_remote_read_request(body).await?; handler.read(request).await diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 4bfb8ffc94..46b194169b 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -6,7 +6,6 @@ use async_trait::async_trait; use common_query::Output; use crate::error::Result; -use crate::http::HttpResponse; use crate::influxdb::InfluxdbRequest; use crate::opentsdb::codec::DataPoint; use crate::prometheus::Metrics; @@ -59,12 +58,18 @@ pub trait OpentsdbProtocolHandler { async fn exec(&self, data_point: &DataPoint) -> Result<()>; } +pub struct PrometheusResponse { + pub content_type: String, + pub content_encoding: String, + pub body: Vec, +} + #[async_trait] pub trait PrometheusProtocolHandler { /// Handling prometheus remote write requests async fn write(&self, request: WriteRequest) -> Result<()>; /// Handling prometheus remote read requests - async fn read(&self, request: ReadRequest) -> Result; + async fn read(&self, request: ReadRequest) -> Result; /// Handling push gateway requests async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>; } diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 9c3cba019a..3415110be7 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -9,13 +9,11 @@ use axum_test_helper::TestClient; use common_query::Output; use prost::Message; use servers::error::Result; -use servers::http::BytesResponse; -use servers::http::HttpResponse; use servers::http::HttpServer; use servers::prometheus; use servers::prometheus::snappy_compress; use servers::prometheus::Metrics; -use servers::query_handler::{PrometheusProtocolHandler, SqlQueryHandler}; +use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse, SqlQueryHandler}; use tokio::sync::mpsc; struct DummyInstance { @@ -29,7 +27,7 @@ impl PrometheusProtocolHandler for DummyInstance { Ok(()) } - async fn read(&self, request: ReadRequest) -> Result { + async fn read(&self, request: ReadRequest) -> Result { let _ = self.tx.send(request.encode_to_vec()).await; let response = ReadResponse { @@ -38,11 +36,11 @@ impl PrometheusProtocolHandler for DummyInstance { }], }; - Ok(HttpResponse::Bytes(BytesResponse { + Ok(PrometheusResponse { content_type: "application/x-protobuf".to_string(), content_encoding: "snappy".to_string(), - bytes: response.encode_to_vec(), - })) + body: response.encode_to_vec(), + }) } async fn ingest_metrics(&self, _metrics: Metrics) -> Result<()> {