refactor: make prometheus http apis compatible with recent changes

This commit is contained in:
Ning Sun
2022-10-28 23:05:00 +08:00
parent 54485863b2
commit db2cb63267
5 changed files with 108 additions and 106 deletions

View File

@@ -8,9 +8,8 @@ use common_error::prelude::BoxedError;
use common_telemetry::logging;
use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::http::{BytesResponse, HttpResponse};
use servers::prometheus::{self, Metrics};
use servers::query_handler::PrometheusProtocolHandler;
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse};
use snafu::{OptionExt, ResultExt};
use crate::instance::Instance;
@@ -104,7 +103,7 @@ impl PrometheusProtocolHandler for Instance {
Ok(())
}
async fn read(&self, request: ReadRequest) -> ServerResult<HttpResponse> {
async fn read(&self, request: ReadRequest) -> ServerResult<PrometheusResponse> {
let response_type = negotiate_response_type(&request.accepted_response_types)?;
// TODO(dennis): use read_hints to speedup query if possible
@@ -124,11 +123,11 @@ impl PrometheusProtocolHandler for Instance {
};
// TODO(dennis): may consume too much memory, adds flow control
Ok(HttpResponse::Bytes(BytesResponse {
Ok(PrometheusResponse {
content_type: "application/x-protobuf".to_string(),
content_encoding: "snappy".to_string(),
bytes: prometheus::snappy_compress(&response.encode_to_vec())?,
}))
body: prometheus::snappy_compress(&response.encode_to_vec())?,
})
}
ResponseType::StreamedXorChunks => error::NotSupportedSnafu {
feat: "streamed remote read",
@@ -195,88 +194,82 @@ mod tests {
..Default::default()
};
let response = instance.read(read_request).await.unwrap();
let resp = instance.read(read_request).await.unwrap();
assert_eq!(resp.content_type, "application/x-protobuf");
assert_eq!(resp.content_encoding, "snappy");
let body = prometheus::snappy_decompress(&resp.body).unwrap();
let read_response = ReadResponse::decode(&body[..]).unwrap();
let query_results = read_response.results;
assert_eq!(2, query_results.len());
match response {
HttpResponse::Bytes(resp) => {
assert_eq!(resp.content_type, "application/x-protobuf");
assert_eq!(resp.content_encoding, "snappy");
let body = prometheus::snappy_decompress(&resp.bytes).unwrap();
let read_response = ReadResponse::decode(&body[..]).unwrap();
let query_results = read_response.results;
assert_eq!(2, query_results.len());
assert_eq!(1, query_results[0].timeseries.len());
let timeseries = &query_results[0].timeseries[0];
assert_eq!(1, query_results[0].timeseries.len());
let timeseries = &query_results[0].timeseries[0];
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
name: "job".to_string(),
value: "spark".to_string(),
},
],
timeseries.labels
);
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
name: "job".to_string(),
value: "spark".to_string(),
},
],
timeseries.labels
);
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 1.0,
timestamp: 1000,
},
Sample {
value: 2.0,
timestamp: 2000,
}
]
);
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 1.0,
timestamp: 1000,
},
Sample {
value: 2.0,
timestamp: 2000,
}
]
);
assert_eq!(1, query_results[1].timeseries.len());
let timeseries = &query_results[1].timeseries[0];
assert_eq!(1, query_results[1].timeseries.len());
let timeseries = &query_results[1].timeseries[0];
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
},
Label {
name: "idc".to_string(),
value: "z002".to_string(),
},
Label {
name: "app".to_string(),
value: "biz".to_string(),
},
],
timeseries.labels
);
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
},
Label {
name: "idc".to_string(),
value: "z002".to_string(),
},
Label {
name: "app".to_string(),
value: "biz".to_string(),
},
],
timeseries.labels
);
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 5.0,
timestamp: 1000,
},
Sample {
value: 6.0,
timestamp: 2000,
},
Sample {
value: 7.0,
timestamp: 3000,
}
]
);
}
_ => unreachable!(),
}
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 5.0,
timestamp: 1000,
},
Sample {
value: 6.0,
timestamp: 2000,
},
Sample {
value: 7.0,
timestamp: 3000,
}
]
);
}
}

View File

@@ -48,13 +48,6 @@ pub struct ColumnSchema {
data_type: String,
}
#[derive(Serialize, Debug)]
pub struct BytesResponse {
pub content_type: String,
pub content_encoding: String,
pub bytes: Vec<u8>,
}
#[derive(Debug, Serialize, JsonSchema)]
pub struct Schema {
column_schemas: Vec<ColumnSchema>,

View File

@@ -1,33 +1,46 @@
use api::prometheus::remote::{ReadRequest, WriteRequest};
use axum::extract::{RawBody, State};
use axum::http::header;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use hyper::Body;
use prost::Message;
use snafu::prelude::*;
use crate::error::Result;
use crate::error::{self};
use crate::http::HttpResponse;
use crate::error::{self, Result};
use crate::prometheus::snappy_decompress;
use crate::query_handler::PrometheusProtocolHandlerRef;
use crate::query_handler::{PrometheusProtocolHandlerRef, PrometheusResponse};
#[axum_macros::debug_handler]
pub async fn remote_write(
State(handler): State<PrometheusProtocolHandlerRef>,
RawBody(body): RawBody,
) -> Result<(StatusCode, HttpResponse)> {
) -> Result<(StatusCode, ())> {
let request = decode_remote_write_request(body).await?;
handler.write(request).await?;
Ok((StatusCode::NO_CONTENT, HttpResponse::Text("".to_string())))
Ok((StatusCode::NO_CONTENT, ()))
}
impl IntoResponse for PrometheusResponse {
fn into_response(self) -> axum::response::Response {
(
[
(header::CONTENT_TYPE, self.content_type),
(header::CONTENT_ENCODING, self.content_encoding),
],
self.body,
)
.into_response()
}
}
#[axum_macros::debug_handler]
pub async fn remote_read(
State(handler): State<PrometheusProtocolHandlerRef>,
RawBody(body): RawBody,
) -> Result<HttpResponse> {
) -> Result<PrometheusResponse> {
let request = decode_remote_read_request(body).await?;
handler.read(request).await

View File

@@ -6,7 +6,6 @@ use async_trait::async_trait;
use common_query::Output;
use crate::error::Result;
use crate::http::HttpResponse;
use crate::influxdb::InfluxdbRequest;
use crate::opentsdb::codec::DataPoint;
use crate::prometheus::Metrics;
@@ -59,12 +58,18 @@ pub trait OpentsdbProtocolHandler {
async fn exec(&self, data_point: &DataPoint) -> Result<()>;
}
pub struct PrometheusResponse {
pub content_type: String,
pub content_encoding: String,
pub body: Vec<u8>,
}
#[async_trait]
pub trait PrometheusProtocolHandler {
/// Handling prometheus remote write requests
async fn write(&self, request: WriteRequest) -> Result<()>;
/// Handling prometheus remote read requests
async fn read(&self, request: ReadRequest) -> Result<HttpResponse>;
async fn read(&self, request: ReadRequest) -> Result<PrometheusResponse>;
/// Handling push gateway requests
async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
}

View File

@@ -9,13 +9,11 @@ use axum_test_helper::TestClient;
use common_query::Output;
use prost::Message;
use servers::error::Result;
use servers::http::BytesResponse;
use servers::http::HttpResponse;
use servers::http::HttpServer;
use servers::prometheus;
use servers::prometheus::snappy_compress;
use servers::prometheus::Metrics;
use servers::query_handler::{PrometheusProtocolHandler, SqlQueryHandler};
use servers::query_handler::{PrometheusProtocolHandler, PrometheusResponse, SqlQueryHandler};
use tokio::sync::mpsc;
struct DummyInstance {
@@ -29,7 +27,7 @@ impl PrometheusProtocolHandler for DummyInstance {
Ok(())
}
async fn read(&self, request: ReadRequest) -> Result<HttpResponse> {
async fn read(&self, request: ReadRequest) -> Result<PrometheusResponse> {
let _ = self.tx.send(request.encode_to_vec()).await;
let response = ReadResponse {
@@ -38,11 +36,11 @@ impl PrometheusProtocolHandler for DummyInstance {
}],
};
Ok(HttpResponse::Bytes(BytesResponse {
Ok(PrometheusResponse {
content_type: "application/x-protobuf".to_string(),
content_encoding: "snappy".to_string(),
bytes: response.encode_to_vec(),
}))
body: response.encode_to_vec(),
})
}
async fn ingest_metrics(&self, _metrics: Metrics) -> Result<()> {