feat: prometheus remote write and read (#346)

* feat: scaffold for prometheus protocol handler

* feat: impl remote write and read for prometheus

* chore: make label matchers working in remote reading

* chore: case senstive regexp matching for labers and tweak restful api

* test: prometheus test

* test: adds test for prometheus handler and http server

* fix: typo in comment

* refactor: move snappy_compress and snappy_decompress

* fix: by code review

* fix: collect_timeseries_ids

* fix: timestamp and value column's value may be null
This commit is contained in:
dennis zhuang
2022-10-28 18:47:16 +08:00
committed by GitHub
parent 81716d622e
commit 0604eb7509
25 changed files with 1593 additions and 16 deletions

72
Cargo.lock generated
View File

@@ -273,6 +273,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "auto_ops"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7460f7dd8e100147b82a63afca1a20eb6c231ee36b90ba7272e14951cb58af59"
[[package]]
name = "autocfg"
version = "1.1.0"
@@ -1780,6 +1786,8 @@ dependencies = [
"datanode",
"datatypes",
"futures",
"openmetrics-parser",
"prost 0.11.0",
"serde",
"servers",
"snafu",
@@ -3136,6 +3144,17 @@ dependencies = [
"ureq",
]
[[package]]
name = "openmetrics-parser"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5caf1ccaaf43651cc5abda77353a173869d8d8b0238f2faacb23d6b32931e860"
dependencies = [
"auto_ops",
"pest",
"pest_derive",
]
[[package]]
name = "opensrv-mysql"
version = "0.1.0"
@@ -3344,6 +3363,50 @@ version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
name = "pest"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbc7bc69c062e492337d74d59b120c274fd3d261b6bf6d3207d499b4b379c41a"
dependencies = [
"thiserror",
"ucd-trie",
]
[[package]]
name = "pest_derive"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60b75706b9642ebcb34dab3bc7750f811609a0eb1dd8b88c2d15bf628c1c65b2"
dependencies = [
"pest",
"pest_generator",
]
[[package]]
name = "pest_generator"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4f9272122f5979a6511a749af9db9bfc810393f63119970d7085fed1c4ea0db"
dependencies = [
"pest",
"pest_meta",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pest_meta"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c8717927f9b79515e565a64fe46c38b8cd0427e64c40680b14a7365ab09ac8d"
dependencies = [
"once_cell",
"pest",
"sha1",
]
[[package]]
name = "petgraph"
version = "0.6.2"
@@ -4652,14 +4715,17 @@ dependencies = [
"metrics",
"mysql_async",
"num_cpus",
"openmetrics-parser",
"opensrv-mysql",
"pgwire",
"prost 0.11.0",
"query",
"rand 0.8.5",
"script",
"serde",
"serde_json",
"snafu",
"snap",
"table",
"tokio",
"tokio-postgres",
@@ -5756,6 +5822,12 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "ucd-trie"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e79c4d996edb816c91e4308506774452e55e95c3c9de07b6729e17e15a5ef81"
[[package]]
name = "uname"
version = "0.1.1"

View File

@@ -10,6 +10,7 @@ fn main() {
"greptime/v1/meta/heartbeat.proto",
"greptime/v1/meta/route.proto",
"greptime/v1/meta/store.proto",
"prometheus/remote/remote.proto",
],
&["."],
)

View File

@@ -0,0 +1,85 @@
// Copyright 2016 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prometheus;
option go_package = "prompb";
import "prometheus/remote/types.proto";
message WriteRequest {
repeated prometheus.TimeSeries timeseries = 1;
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
repeated prometheus.MetricMetadata metadata = 3;
}
// ReadRequest represents a remote read request.
message ReadRequest {
repeated Query queries = 1;
enum ResponseType {
// Server will return a single ReadResponse message with matched series that includes list of raw samples.
// It's recommended to use streamed response types instead.
//
// Response headers:
// Content-Type: "application/x-protobuf"
// Content-Encoding: "snappy"
SAMPLES = 0;
// Server will stream a delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series.
// Each message is following varint size and fixed size bigendian uint32 for CRC32 Castagnoli checksum.
//
// Response headers:
// Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
// Content-Encoding: ""
STREAMED_XOR_CHUNKS = 1;
}
// accepted_response_types allows negotiating the content type of the response.
//
// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
// implemented by server, error is returned.
// For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
repeated ResponseType accepted_response_types = 2;
}
// ReadResponse is a response when response_type equals SAMPLES.
message ReadResponse {
// In same order as the request's queries.
repeated QueryResult results = 1;
}
message Query {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated prometheus.LabelMatcher matchers = 3;
prometheus.ReadHints hints = 4;
}
message QueryResult {
// Samples within a time series must be ordered by time.
repeated prometheus.TimeSeries timeseries = 1;
}
// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS.
// We strictly stream full series after series, optionally split by time. This means that a single frame can contain
// partition of the single series, but once a new series is started to be streamed it means that no more chunks will
// be sent for previous one. Series are returned sorted in the same way TSDB block are internally.
message ChunkedReadResponse {
repeated prometheus.ChunkedSeries chunked_series = 1;
// query_index represents an index of the query from ReadRequest.queries these chunks relates to.
int64 query_index = 2;
}

View File

@@ -0,0 +1,117 @@
// Copyright 2017 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prometheus;
option go_package = "prompb";
message MetricMetadata {
enum MetricType {
UNKNOWN = 0;
COUNTER = 1;
GAUGE = 2;
HISTOGRAM = 3;
GAUGEHISTOGRAM = 4;
SUMMARY = 5;
INFO = 6;
STATESET = 7;
}
// Represents the metric type, these match the set from Prometheus.
// Refer to model/textparse/interface.go for details.
MetricType type = 1;
string metric_family_name = 2;
string help = 4;
string unit = 5;
}
message Sample {
double value = 1;
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 2;
}
message Exemplar {
// Optional, can be empty.
repeated Label labels = 1;
double value = 2;
// timestamp is in ms format, see model/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 3;
}
// TimeSeries represents samples and labels for a single time series.
message TimeSeries {
// For a timeseries to be valid, and for the samples and exemplars
// to be ingested by the remote system properly, the labels field is required.
repeated Label labels = 1;
repeated Sample samples = 2;
repeated Exemplar exemplars = 3;
}
message Label {
string name = 1;
string value = 2;
}
message Labels {
repeated Label labels = 1;
}
// Matcher specifies a rule, which can match or set of labels or not.
message LabelMatcher {
enum Type {
EQ = 0;
NEQ = 1;
RE = 2;
NRE = 3;
}
Type type = 1;
string name = 2;
string value = 3;
}
message ReadHints {
int64 step_ms = 1; // Query step size in milliseconds.
string func = 2; // String representation of surrounding function or aggregation.
int64 start_ms = 3; // Start time in milliseconds.
int64 end_ms = 4; // End time in milliseconds.
repeated string grouping = 5; // List of label names used in aggregation.
bool by = 6; // Indicate whether it is without or by.
int64 range_ms = 7; // Range vector selector range in milliseconds.
}
// Chunk represents a TSDB chunk.
// Time range [min, max] is inclusive.
message Chunk {
int64 min_time_ms = 1;
int64 max_time_ms = 2;
// We require this to match chunkenc.Encoding.
enum Encoding {
UNKNOWN = 0;
XOR = 1;
}
Encoding type = 3;
bytes data = 4;
}
// ChunkedSeries represents single, encoded time series.
message ChunkedSeries {
// Labels should be sorted.
repeated Label labels = 1;
// Chunks will be in start time order and may overlap.
repeated Chunk chunks = 2;
}

View File

@@ -1,5 +1,6 @@
pub mod error;
pub mod helper;
pub mod prometheus;
pub mod serde;
pub mod v1;

View File

@@ -0,0 +1,5 @@
#![allow(clippy::derive_partial_eq_without_eq)]
pub mod remote {
tonic::include_proto!("prometheus");
}

View File

@@ -16,6 +16,8 @@ common-runtime = { path = "../common/runtime" }
common-telemetry = { path = "../common/telemetry" }
common-time = { path = "../common/time" }
datatypes = { path = "../datatypes" }
openmetrics-parser = "0.4"
prost = "0.11"
serde = "1.0"
servers = { path = "../servers" }
snafu = { version = "0.7", features = ["backtraces"] }

View File

@@ -9,6 +9,7 @@ use crate::instance::Instance;
use crate::mysql::MysqlOptions;
use crate::opentsdb::OpentsdbOptions;
use crate::postgres::PostgresOptions;
use crate::prometheus::PrometheusOptions;
use crate::server::Services;
#[derive(Clone, Debug, Serialize, Deserialize)]
@@ -19,6 +20,7 @@ pub struct FrontendOptions {
pub postgres_options: Option<PostgresOptions>,
pub opentsdb_options: Option<OpentsdbOptions>,
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
}
impl Default for FrontendOptions {
@@ -30,6 +32,7 @@ impl Default for FrontendOptions {
postgres_options: Some(PostgresOptions::default()),
opentsdb_options: Some(OpentsdbOptions::default()),
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
}
}
}

View File

@@ -1,5 +1,6 @@
mod influxdb;
mod opentsdb;
mod prometheus;
use std::collections::HashMap;
use std::sync::Arc;

View File

@@ -128,13 +128,13 @@ mod tests {
let pretty_print = arrow_print::write(&recordbatches);
let pretty_print = pretty_print.lines().collect::<Vec<&str>>();
let expected = vec![
"+---------------------+-------+-------+-------+-------+",
"| timestamp | value | tagk1 | tagk2 | tagk3 |",
"+---------------------+-------+-------+-------+-------+",
"| 1970-01-01 00:00:01 | 1 | tagv1 | tagv2 | |",
"| 1970-01-01 00:00:02 | 2 | | tagv2 | tagv3 |",
"| 1970-01-01 00:00:03 | 3 | | | |",
"+---------------------+-------+-------+-------+-------+",
"+---------------------+----------------+-------+-------+-------+",
"| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |",
"+---------------------+----------------+-------+-------+-------+",
"| 1970-01-01 00:00:01 | 1 | tagv1 | tagv2 | |",
"| 1970-01-01 00:00:02 | 2 | | tagv2 | tagv3 |",
"| 1970-01-01 00:00:03 | 3 | | | |",
"+---------------------+----------------+-------+-------+-------+",
];
assert_eq!(pretty_print, expected);
}

View File

@@ -0,0 +1,282 @@
use api::prometheus::remote::{
read_request::ResponseType, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest,
};
use async_trait::async_trait;
use client::ObjectResult;
use client::{Database, Select};
use common_error::prelude::BoxedError;
use common_telemetry::logging;
use prost::Message;
use servers::error::{self, Result as ServerResult};
use servers::http::{BytesResponse, HttpResponse};
use servers::prometheus::{self, Metrics};
use servers::query_handler::PrometheusProtocolHandler;
use snafu::{OptionExt, ResultExt};
use crate::instance::Instance;
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
#[inline]
fn is_supported(response_type: i32) -> bool {
// Only supports samples response right now
response_type == SAMPLES_RESPONSE_TYPE
}
/// Negotiating the content type of the remote read response.
///
/// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
/// implemented by server, error is returned.
/// For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<ResponseType> {
if accepted_response_types.is_empty() {
return Ok(ResponseType::Samples);
}
let response_type = accepted_response_types
.iter()
.find(|t| is_supported(**t))
.with_context(|| error::NotSupportedSnafu {
feat: format!(
"server does not support any of the requested response types: {:?}",
accepted_response_types
),
})?;
// It's safe to unwrap here, we known that it should be SAMPLES_RESPONSE_TYPE
Ok(ResponseType::from_i32(*response_type).unwrap())
}
fn object_result_to_query_result(
table_name: &str,
object_result: ObjectResult,
) -> ServerResult<QueryResult> {
let select_result = match object_result {
ObjectResult::Select(result) => result,
_ => unreachable!(),
};
Ok(QueryResult {
timeseries: prometheus::select_result_to_timeseries(table_name, select_result)?,
})
}
async fn handle_remote_queries(
db: &Database,
queries: &[Query],
) -> ServerResult<Vec<(String, ObjectResult)>> {
let mut results = Vec::with_capacity(queries.len());
for q in queries {
let (table_name, sql) = prometheus::query_to_sql(q)?;
logging::debug!(
"prometheus remote read, table: {}, sql: {}",
table_name,
sql
);
let object_result = db
.select(Select::Sql(sql.clone()))
.await
.map_err(BoxedError::new)
.context(error::ExecuteQuerySnafu { query: sql })?;
results.push((table_name, object_result));
}
Ok(results)
}
#[async_trait]
impl PrometheusProtocolHandler for Instance {
async fn write(&self, request: WriteRequest) -> ServerResult<()> {
let exprs = prometheus::write_request_to_insert_exprs(request)?;
self.db
.batch_insert(exprs)
.await
.map_err(BoxedError::new)
.context(error::ExecuteInsertSnafu {
msg: "failed to write prometheus remote request",
})?;
Ok(())
}
async fn read(&self, request: ReadRequest) -> ServerResult<HttpResponse> {
let response_type = negotiate_response_type(&request.accepted_response_types)?;
// TODO(dennis): use read_hints to speedup query if possible
let results = handle_remote_queries(&self.db, &request.queries).await?;
match response_type {
ResponseType::Samples => {
let query_results = results
.into_iter()
.map(|(table_name, object_result)| {
object_result_to_query_result(&table_name, object_result)
})
.collect::<ServerResult<Vec<_>>>()?;
let response = ReadResponse {
results: query_results,
};
// TODO(dennis): may consume too much memory, adds flow control
Ok(HttpResponse::Bytes(BytesResponse {
content_type: "application/x-protobuf".to_string(),
content_encoding: "snappy".to_string(),
bytes: prometheus::snappy_compress(&response.encode_to_vec())?,
}))
}
ResponseType::StreamedXorChunks => error::NotSupportedSnafu {
feat: "streamed remote read",
}
.fail(),
}
}
async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> {
todo!();
}
}
#[cfg(test)]
mod tests {
use api::prometheus::remote::{
label_matcher::Type as MatcherType, Label, LabelMatcher, Sample,
};
use super::*;
use crate::tests;
#[tokio::test]
async fn test_prometheus_remote_write_and_read() {
let instance = tests::create_frontend_instance().await;
let write_request = WriteRequest {
timeseries: prometheus::mock_timeseries(),
..Default::default()
};
assert!(instance.write(write_request).await.is_ok());
let read_request = ReadRequest {
queries: vec![
Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
r#type: 0,
}],
..Default::default()
},
Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 3000,
matchers: vec![
LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
r#type: 0,
},
LabelMatcher {
name: "app".to_string(),
value: "biz".to_string(),
r#type: MatcherType::Eq as i32,
},
],
..Default::default()
},
],
..Default::default()
};
let response = instance.read(read_request).await.unwrap();
match response {
HttpResponse::Bytes(resp) => {
assert_eq!(resp.content_type, "application/x-protobuf");
assert_eq!(resp.content_encoding, "snappy");
let body = prometheus::snappy_decompress(&resp.bytes).unwrap();
let read_response = ReadResponse::decode(&body[..]).unwrap();
let query_results = read_response.results;
assert_eq!(2, query_results.len());
assert_eq!(1, query_results[0].timeseries.len());
let timeseries = &query_results[0].timeseries[0];
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
name: "job".to_string(),
value: "spark".to_string(),
},
],
timeseries.labels
);
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 1.0,
timestamp: 1000,
},
Sample {
value: 2.0,
timestamp: 2000,
}
]
);
assert_eq!(1, query_results[1].timeseries.len());
let timeseries = &query_results[1].timeseries[0];
assert_eq!(
vec![
Label {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric3".to_string(),
},
Label {
name: "idc".to_string(),
value: "z002".to_string(),
},
Label {
name: "app".to_string(),
value: "biz".to_string(),
},
],
timeseries.labels
);
assert_eq!(
timeseries.samples,
vec![
Sample {
value: 5.0,
timestamp: 1000,
},
Sample {
value: 6.0,
timestamp: 2000,
},
Sample {
value: 7.0,
timestamp: 3000,
}
]
);
}
_ => unreachable!(),
}
}
}

View File

@@ -8,6 +8,7 @@ pub mod mysql;
pub mod opentsdb;
pub mod partition;
pub mod postgres;
pub mod prometheus;
mod server;
pub mod spliter;
#[cfg(test)]

View File

@@ -0,0 +1,23 @@
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PrometheusOptions {
pub enable: bool,
}
impl Default for PrometheusOptions {
fn default() -> Self {
Self { enable: true }
}
}
#[cfg(test)]
mod tests {
use super::PrometheusOptions;
#[test]
fn test_prometheus_options() {
let default = PrometheusOptions::default();
assert!(default.enable);
}
}

View File

@@ -15,6 +15,7 @@ use crate::error::{self, Result};
use crate::frontend::FrontendOptions;
use crate::influxdb::InfluxdbOptions;
use crate::instance::InstanceRef;
use crate::prometheus::PrometheusOptions;
pub(crate) struct Services;
@@ -99,6 +100,13 @@ impl Services {
http_server.set_influxdb_handler(instance.clone());
}
if matches!(
opts.prometheus_options,
Some(PrometheusOptions { enable: true })
) {
http_server.set_prom_handler(instance.clone());
}
Some((Box::new(http_server) as _, http_addr))
} else {
None

View File

@@ -25,11 +25,14 @@ hyper = { version = "0.14", features = ["full"] }
influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" }
metrics = "0.20"
num_cpus = "1.13"
openmetrics-parser = "0.4"
opensrv-mysql = "0.1"
pgwire = { version = "0.4" }
prost = "0.11"
serde = "1.0"
serde_json = "1.0"
snafu = { version = "0.7", features = ["backtraces"] }
snap = "1"
tokio = { version = "1.20", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.8"

View File

@@ -55,6 +55,13 @@ pub enum Error {
source: BoxedError,
},
#[snafu(display("Failed to execute insert: {}, source: {}", msg, source))]
ExecuteInsert {
msg: String,
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to insert script with name: {}, source: {}", name, source))]
InsertScript {
name: String,
@@ -121,6 +128,24 @@ pub enum Error {
#[snafu(backtrace)]
source: BoxedError,
},
#[snafu(display("Failed to decode prometheus remote request, source: {}", source))]
DecodePromRemoteRequest {
backtrace: Backtrace,
source: prost::DecodeError,
},
#[snafu(display("Failed to decompress prometheus remote request, source: {}", source))]
DecompressPromRemoteRequest {
backtrace: Backtrace,
source: snap::Error,
},
#[snafu(display("Invalid prometheus remote request, msg: {}", msg))]
InvalidPromRemoteRequest { msg: String, backtrace: Backtrace },
#[snafu(display("Invalid prometheus remote read query result, msg: {}", msg))]
InvalidPromRemoteReadQueryResult { msg: String, backtrace: Backtrace },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -136,11 +161,13 @@ impl ErrorExt for Error {
| CollectRecordbatch { .. }
| StartHttp { .. }
| StartGrpc { .. }
| InvalidPromRemoteReadQueryResult { .. }
| TcpBind { .. } => StatusCode::Internal,
InsertScript { source, .. }
| ExecuteScript { source, .. }
| ExecuteQuery { source, .. }
| ExecuteInsert { source, .. }
| PutOpentsdbDataPoint { source, .. } => source.status_code(),
NotSupported { .. }
@@ -149,6 +176,9 @@ impl ErrorExt for Error {
| ConnResetByPeer { .. }
| InvalidOpentsdbLine { .. }
| InvalidOpentsdbJsonRequest { .. }
| DecodePromRemoteRequest { .. }
| DecompressPromRemoteRequest { .. }
| InvalidPromRemoteRequest { .. }
| TimePrecision { .. } => StatusCode::InvalidArguments,
InfluxdbLinesWrite { source, .. } => source.status_code(),
@@ -184,6 +214,9 @@ impl IntoResponse for Error {
| Error::InfluxdbLinesWrite { .. }
| Error::InvalidOpentsdbLine { .. }
| Error::InvalidOpentsdbJsonRequest { .. }
| Error::DecodePromRemoteRequest { .. }
| Error::DecompressPromRemoteRequest { .. }
| Error::InvalidPromRemoteRequest { .. }
| Error::InvalidQuery { .. }
| Error::TimePrecision { .. } => (HttpStatusCode::BAD_REQUEST, self.to_string()),
_ => (HttpStatusCode::INTERNAL_SERVER_ERROR, self.to_string()),

View File

@@ -1,6 +1,7 @@
pub mod handler;
pub mod influxdb;
pub mod opentsdb;
pub mod prometheus;
use std::net::SocketAddr;
use std::time::Duration;
@@ -8,6 +9,7 @@ use std::time::Duration;
use async_trait::async_trait;
use axum::{
error_handling::HandleErrorLayer,
http::header,
response::IntoResponse,
response::{Json, Response},
routing, BoxError, Router,
@@ -23,7 +25,9 @@ use tower_http::trace::TraceLayer;
use self::influxdb::influxdb_write;
use crate::error::{Result, StartHttpSnafu};
use crate::query_handler::SqlQueryHandlerRef;
use crate::query_handler::{InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef};
use crate::query_handler::{
InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef,
};
use crate::server::Server;
const HTTP_API_VERSION: &str = "v1";
@@ -32,6 +36,7 @@ pub struct HttpServer {
sql_handler: SqlQueryHandlerRef,
influxdb_handler: Option<InfluxdbLineProtocolHandlerRef>,
opentsdb_handler: Option<OpentsdbProtocolHandlerRef>,
prom_handler: Option<PrometheusProtocolHandlerRef>,
}
#[derive(Serialize, Debug)]
@@ -40,10 +45,18 @@ pub enum JsonOutput {
Rows(Vec<RecordBatch>),
}
#[derive(Serialize, Debug)]
pub struct BytesResponse {
pub content_type: String,
pub content_encoding: String,
pub bytes: Vec<u8>,
}
#[derive(Serialize, Debug)]
pub enum HttpResponse {
Json(JsonResponse),
Text(String),
Bytes(BytesResponse),
}
#[derive(Serialize, Debug)]
@@ -60,6 +73,14 @@ impl IntoResponse for HttpResponse {
match self {
HttpResponse::Json(json) => Json(json).into_response(),
HttpResponse::Text(text) => text.into_response(),
HttpResponse::Bytes(resp) => (
[
(header::CONTENT_TYPE, resp.content_type),
(header::CONTENT_ENCODING, resp.content_encoding),
],
resp.bytes,
)
.into_response(),
}
}
}
@@ -125,6 +146,7 @@ impl HttpServer {
sql_handler,
opentsdb_handler: None,
influxdb_handler: None,
prom_handler: None,
}
}
@@ -144,6 +166,14 @@ impl HttpServer {
self.influxdb_handler.get_or_insert(handler);
}
pub fn set_prom_handler(&mut self, handler: PrometheusProtocolHandlerRef) {
debug_assert!(
self.prom_handler.is_none(),
"Prometheus protocol handler can be set only once!"
);
self.prom_handler.get_or_insert(handler);
}
pub fn make_app(&self) -> Router {
// TODO(LFC): Use released Axum.
// Axum version 0.6 introduces state within router, making router methods far more elegant
@@ -174,6 +204,14 @@ impl HttpServer {
router = router.nest(&format!("/{}/influxdb", HTTP_API_VERSION), influxdb_router);
}
if let Some(prom_handler) = self.prom_handler.clone() {
let prom_router = Router::with_state(prom_handler)
.route("/write", routing::post(prometheus::remote_write))
.route("/read", routing::post(prometheus::remote_read));
router = router.nest(&format!("/{}/prometheus", HTTP_API_VERSION), prom_router);
}
router
.route("/metrics", routing::get(handler::metrics))
// middlewares

View File

@@ -0,0 +1,54 @@
use api::prometheus::remote::{ReadRequest, WriteRequest};
use axum::extract::{RawBody, State};
use axum::http::StatusCode;
use hyper::Body;
use prost::Message;
use snafu::prelude::*;
use crate::error::Result;
use crate::error::{self};
use crate::http::HttpResponse;
use crate::prometheus::snappy_decompress;
use crate::query_handler::PrometheusProtocolHandlerRef;
#[axum_macros::debug_handler]
pub async fn remote_write(
State(handler): State<PrometheusProtocolHandlerRef>,
RawBody(body): RawBody,
) -> Result<(StatusCode, HttpResponse)> {
let request = decode_remote_write_request(body).await?;
handler.write(request).await?;
Ok((StatusCode::NO_CONTENT, HttpResponse::Text("".to_string())))
}
#[axum_macros::debug_handler]
pub async fn remote_read(
State(handler): State<PrometheusProtocolHandlerRef>,
RawBody(body): RawBody,
) -> Result<HttpResponse> {
let request = decode_remote_read_request(body).await?;
handler.read(request).await
}
async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;
let buf = snappy_decompress(&body[..])?;
WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
}
async fn decode_remote_read_request(body: Body) -> Result<ReadRequest> {
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;
let buf = snappy_decompress(&body[..])?;
ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)
}

View File

@@ -7,6 +7,7 @@ pub mod influxdb;
pub mod mysql;
pub mod opentsdb;
pub mod postgres;
pub mod prometheus;
pub mod query_handler;
pub mod server;
mod shutdown;

View File

@@ -5,8 +5,8 @@ use api::v1::{column, column::SemanticType, insert_expr, Column, ColumnDataType,
use crate::error::{self, Result};
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "timestamp";
pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "value";
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "greptime_value";
#[derive(Debug)]
pub struct DataPoint {

View File

@@ -0,0 +1,694 @@
//! promethues protcol supportings
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap};
use std::hash::{Hash, Hasher};
use api::prometheus::remote::{
label_matcher::Type as MatcherType, Label, Query, Sample, TimeSeries, WriteRequest,
};
use api::v1::codec::InsertBatch;
use api::v1::{
codec::SelectResult, column, column::SemanticType, insert_expr, Column, ColumnDataType,
InsertExpr,
};
use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue};
use snafu::{OptionExt, ResultExt};
use snap::raw::{Decoder, Encoder};
use crate::error::{self, Result};
const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
const VALUE_COLUMN_NAME: &str = "greptime_value";
pub const METRIC_NAME_LABEL: &str = "__name__";
/// Metrics for push gateway protocol
pub struct Metrics {
pub exposition: MetricsExposition<PrometheusType, PrometheusValue>,
}
/// Generate a sql from a remote request query
/// TODO(dennis): maybe use logical plan in future to prevent sql injection
pub fn query_to_sql(q: &Query) -> Result<(String, String)> {
let start_timestamp_ms = q.start_timestamp_ms;
let end_timestamp_ms = q.end_timestamp_ms;
let label_matches = &q.matchers;
let table_name = label_matches
.iter()
.find_map(|m| {
if m.name == METRIC_NAME_LABEL {
Some(m.value.to_string())
} else {
None
}
})
.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?;
let mut conditions: Vec<String> = Vec::with_capacity(label_matches.len());
conditions.push(format!(
"{}>={} AND {}<={}",
TIMESTAMP_COLUMN_NAME, start_timestamp_ms, TIMESTAMP_COLUMN_NAME, end_timestamp_ms,
));
for m in label_matches {
let name = &m.name;
if name == METRIC_NAME_LABEL {
continue;
}
let value = &m.value;
let m_type =
MatcherType::from_i32(m.r#type).context(error::InvalidPromRemoteRequestSnafu {
msg: format!("invaid LabelMatcher type: {}", m.r#type),
})?;
match m_type {
MatcherType::Eq => {
conditions.push(format!("{}='{}'", name, value));
}
MatcherType::Neq => {
conditions.push(format!("{}!='{}'", name, value));
}
// Case senstive regexp match
MatcherType::Re => {
conditions.push(format!("{}~'{}'", name, value));
}
// Case senstive regexp not match
MatcherType::Nre => {
conditions.push(format!("{}!~'{}'", name, value));
}
}
}
let conditions = conditions.join(" AND ");
Ok((
table_name.to_string(),
format!(
"select * from {} where {} order by {}",
table_name, conditions, TIMESTAMP_COLUMN_NAME,
),
))
}
#[inline]
fn new_label(name: String, value: String) -> Label {
Label { name, value }
}
// A timeseries id
#[derive(Debug)]
struct TimeSeriesId {
labels: Vec<Label>,
}
/// Because Label in protobuf doesn't impl `Eq`, so we have to do it by ourselves.
impl PartialEq for TimeSeriesId {
fn eq(&self, other: &Self) -> bool {
if self.labels.len() != other.labels.len() {
return false;
}
self.labels
.iter()
.zip(other.labels.iter())
.all(|(l, r)| l.name == r.name && l.value == r.value)
}
}
impl Eq for TimeSeriesId {}
impl Hash for TimeSeriesId {
fn hash<H: Hasher>(&self, state: &mut H) {
for label in &self.labels {
label.name.hash(state);
label.value.hash(state);
}
}
}
/// For Sorting timeseries
impl Ord for TimeSeriesId {
fn cmp(&self, other: &Self) -> Ordering {
let ordering = self.labels.len().cmp(&other.labels.len());
if ordering != Ordering::Equal {
return ordering;
}
for (l, r) in self.labels.iter().zip(other.labels.iter()) {
let ordering = l.name.cmp(&r.name);
if ordering != Ordering::Equal {
return ordering;
}
let ordering = l.value.cmp(&r.value);
if ordering != Ordering::Equal {
return ordering;
}
}
Ordering::Equal
}
}
impl PartialOrd for TimeSeriesId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}
/// Collect each row's timeseries id
/// This processing is ugly, hope https://github.com/GreptimeTeam/greptimedb/issues/336 making some progress in future.
fn collect_timeseries_ids(
table_name: &str,
row_count: usize,
columns: &[Column],
) -> Vec<TimeSeriesId> {
let mut timeseries_ids = Vec::with_capacity(row_count);
let mut columns_rows = vec![0; columns.len()];
for row in 0..row_count {
let mut labels = Vec::with_capacity(columns.len() - 1);
labels.push(new_label(
METRIC_NAME_LABEL.to_string(),
table_name.to_string(),
));
for (i, column) in columns.iter().enumerate() {
let column_name = &column.column_name;
let null_mask = &column.null_mask;
let values = &column.values;
if column_name == VALUE_COLUMN_NAME || column_name == TIMESTAMP_COLUMN_NAME {
continue;
}
// A label with an empty label value is considered equivalent to a label that does not exist.
if !null_mask.is_empty() && null_mask[row] == 0 {
continue;
}
let row = columns_rows[i];
columns_rows[i] += 1;
let column_value = values.as_ref().map(|vs| vs.string_values[row].to_string());
if let Some(value) = column_value {
labels.push(new_label(column_name.to_string(), value));
}
}
timeseries_ids.push(TimeSeriesId { labels });
}
timeseries_ids
}
pub fn select_result_to_timeseries(
table_name: &str,
select_result: SelectResult,
) -> Result<Vec<TimeSeries>> {
let row_count = select_result.row_count as usize;
let columns = select_result.columns;
let ts_column = columns
.iter()
.find(|c| c.column_name == TIMESTAMP_COLUMN_NAME)
.context(error::InvalidPromRemoteReadQueryResultSnafu {
msg: "missing greptime_timestamp column in query result",
})?;
let value_column = columns
.iter()
.find(|c| c.column_name == VALUE_COLUMN_NAME)
.context(error::InvalidPromRemoteReadQueryResultSnafu {
msg: "missing greptime_value column in query result",
})?;
// First, collect each row's timeseries id
let timeseries_ids = collect_timeseries_ids(table_name, row_count, &columns);
// Then, group timeseries by it's id.
let mut timeseries_map: BTreeMap<&TimeSeriesId, TimeSeries> = BTreeMap::default();
let mut value_column_row = 0;
let mut ts_column_row = 0;
let value_null_mask = &value_column.null_mask;
let ts_null_mask = &ts_column.null_mask;
for (row, timeseries_id) in timeseries_ids.iter().enumerate() {
let timeseries = timeseries_map
.entry(timeseries_id)
.or_insert_with(|| TimeSeries {
labels: timeseries_id.labels.clone(),
..Default::default()
});
if !ts_null_mask.is_empty() && ts_null_mask[row] == 0 {
continue;
}
let ts_row = ts_column_row;
ts_column_row += 1;
if !value_null_mask.is_empty() && value_null_mask[row] == 0 {
continue;
}
let value_row = value_column_row;
value_column_row += 1;
let sample = Sample {
value: value_column
.values
.as_ref()
.map(|vs| vs.f64_values[value_row])
.unwrap_or(0.0f64),
timestamp: ts_column
.values
.as_ref()
.map(|vs| vs.ts_millis_values[ts_row])
.unwrap_or(0i64),
};
timeseries.samples.push(sample);
}
Ok(timeseries_map.into_values().collect())
}
/// Cast a remote write request into gRPC's InsertExpr.
pub fn write_request_to_insert_exprs(mut request: WriteRequest) -> Result<Vec<InsertExpr>> {
let timeseries = std::mem::take(&mut request.timeseries);
timeseries
.into_iter()
.map(timeseries_to_insert_expr)
.collect()
}
fn timeseries_to_insert_expr(mut timeseries: TimeSeries) -> Result<InsertExpr> {
// TODO(dennis): save exemplars into a column
let labels = std::mem::take(&mut timeseries.labels);
let samples = std::mem::take(&mut timeseries.samples);
let row_count = samples.len();
let mut columns = Vec::with_capacity(2 + labels.len());
let ts_column = Column {
column_name: TIMESTAMP_COLUMN_NAME.to_string(),
values: Some(column::Values {
ts_millis_values: samples.iter().map(|x| x.timestamp).collect(),
..Default::default()
}),
semantic_type: SemanticType::Timestamp as i32,
datatype: ColumnDataType::Timestamp as i32,
..Default::default()
};
columns.push(ts_column);
let value_column = Column {
column_name: VALUE_COLUMN_NAME.to_string(),
values: Some(column::Values {
f64_values: samples.iter().map(|x| x.value).collect(),
..Default::default()
}),
semantic_type: SemanticType::Field as i32,
datatype: ColumnDataType::Float64 as i32,
..Default::default()
};
columns.push(value_column);
let mut table_name = None;
for label in labels {
let tagk = label.name;
let tagv = label.value;
// The metric name is a special label
if tagk == METRIC_NAME_LABEL {
table_name = Some(tagv);
continue;
}
columns.push(Column {
column_name: tagk.to_string(),
values: Some(column::Values {
string_values: std::iter::repeat(tagv).take(row_count).collect(),
..Default::default()
}),
semantic_type: SemanticType::Tag as i32,
datatype: ColumnDataType::String as i32,
..Default::default()
});
}
let batch = InsertBatch {
columns,
row_count: row_count as u32,
};
Ok(InsertExpr {
table_name: table_name.context(error::InvalidPromRemoteRequestSnafu {
msg: "missing '__name__' label in timeseries",
})?,
expr: Some(insert_expr::Expr::Values(insert_expr::Values {
values: vec![batch.into()],
})),
options: HashMap::default(),
})
}
#[inline]
pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
let mut decoder = Decoder::new();
decoder
.decompress_vec(buf)
.context(error::DecompressPromRemoteRequestSnafu)
}
#[inline]
pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
let mut encoder = Encoder::new();
encoder
.compress_vec(buf)
.context(error::DecompressPromRemoteRequestSnafu)
}
/// Mock timeseries for test, it is both used in servers and frontend crate
/// So we present it here
pub fn mock_timeseries() -> Vec<TimeSeries> {
vec![
TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "metric1".to_string()),
new_label("job".to_string(), "spark".to_string()),
],
samples: vec![
Sample {
value: 1.0f64,
timestamp: 1000,
},
Sample {
value: 2.0f64,
timestamp: 2000,
},
],
..Default::default()
},
TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "metric2".to_string()),
new_label("instance".to_string(), "test_host1".to_string()),
new_label("idc".to_string(), "z001".to_string()),
],
samples: vec![
Sample {
value: 3.0f64,
timestamp: 1000,
},
Sample {
value: 4.0f64,
timestamp: 2000,
},
],
..Default::default()
},
TimeSeries {
labels: vec![
new_label(METRIC_NAME_LABEL.to_string(), "metric3".to_string()),
new_label("idc".to_string(), "z002".to_string()),
new_label("app".to_string(), "biz".to_string()),
],
samples: vec![
Sample {
value: 5.0f64,
timestamp: 1000,
},
Sample {
value: 6.0f64,
timestamp: 2000,
},
Sample {
value: 7.0f64,
timestamp: 3000,
},
],
..Default::default()
},
]
}
#[cfg(test)]
mod tests {
use api::prometheus::remote::LabelMatcher;
use super::*;
const EQ_TYPE: i32 = MatcherType::Eq as i32;
const NEQ_TYPE: i32 = MatcherType::Neq as i32;
const RE_TYPE: i32 = MatcherType::Re as i32;
#[test]
fn test_query_to_sql() {
let q = Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![],
..Default::default()
};
let err = query_to_sql(&q).unwrap_err();
assert!(matches!(err, error::Error::InvalidPromRemoteRequest { .. }));
let q = Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: METRIC_NAME_LABEL.to_string(),
value: "test".to_string(),
r#type: EQ_TYPE,
}],
..Default::default()
};
let (table, sql) = query_to_sql(&q).unwrap();
assert_eq!("test", table);
assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 order by greptime_timestamp", sql);
let q = Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![
LabelMatcher {
name: METRIC_NAME_LABEL.to_string(),
value: "test".to_string(),
r#type: EQ_TYPE,
},
LabelMatcher {
name: "job".to_string(),
value: "*prom*".to_string(),
r#type: RE_TYPE,
},
LabelMatcher {
name: "instance".to_string(),
value: "localhost".to_string(),
r#type: NEQ_TYPE,
},
],
..Default::default()
};
let (table, sql) = query_to_sql(&q).unwrap();
assert_eq!("test", table);
assert_eq!("select * from test where greptime_timestamp>=1000 AND greptime_timestamp<=2000 AND job~'*prom*' AND instance!='localhost' order by greptime_timestamp", sql);
}
#[test]
fn test_write_request_to_insert_exprs() {
let write_request = WriteRequest {
timeseries: mock_timeseries(),
..Default::default()
};
let exprs = write_request_to_insert_exprs(write_request).unwrap();
assert_eq!(3, exprs.len());
assert_eq!("metric1", exprs[0].table_name);
assert_eq!("metric2", exprs[1].table_name);
assert_eq!("metric3", exprs[2].table_name);
let values = exprs[0].clone().expr.unwrap();
match values {
insert_expr::Expr::Values(insert_expr::Values { values }) => {
assert_eq!(1, values.len());
let batch = InsertBatch::try_from(values[0].as_slice()).unwrap();
assert_eq!(2, batch.row_count);
let columns = batch.columns;
assert_eq!(columns.len(), 3);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000, 2000]
);
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![1.0, 2.0]
);
assert_eq!(columns[2].column_name, "job");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["spark", "spark"]
);
}
_ => unreachable!(),
}
let values = exprs[1].clone().expr.unwrap();
match values {
insert_expr::Expr::Values(insert_expr::Values { values }) => {
assert_eq!(1, values.len());
let batch = InsertBatch::try_from(values[0].as_slice()).unwrap();
assert_eq!(2, batch.row_count);
let columns = batch.columns;
assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000, 2000]
);
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![3.0, 4.0]
);
assert_eq!(columns[2].column_name, "instance");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["test_host1", "test_host1"]
);
assert_eq!(columns[3].column_name, "idc");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["z001", "z001"]
);
}
_ => unreachable!(),
}
let values = exprs[2].clone().expr.unwrap();
match values {
insert_expr::Expr::Values(insert_expr::Values { values }) => {
assert_eq!(1, values.len());
let batch = InsertBatch::try_from(values[0].as_slice()).unwrap();
assert_eq!(3, batch.row_count);
let columns = batch.columns;
assert_eq!(columns.len(), 4);
assert_eq!(columns[0].column_name, TIMESTAMP_COLUMN_NAME);
assert_eq!(
columns[0].values.as_ref().unwrap().ts_millis_values,
vec![1000, 2000, 3000]
);
assert_eq!(columns[1].column_name, VALUE_COLUMN_NAME);
assert_eq!(
columns[1].values.as_ref().unwrap().f64_values,
vec![5.0, 6.0, 7.0]
);
assert_eq!(columns[2].column_name, "idc");
assert_eq!(
columns[2].values.as_ref().unwrap().string_values,
vec!["z002", "z002", "z002"]
);
assert_eq!(columns[3].column_name, "app");
assert_eq!(
columns[3].values.as_ref().unwrap().string_values,
vec!["biz", "biz", "biz"]
);
}
_ => unreachable!(),
}
}
#[test]
fn test_select_result_to_timeseries() {
let select_result = SelectResult {
row_count: 2,
columns: vec![
Column {
column_name: TIMESTAMP_COLUMN_NAME.to_string(),
values: Some(column::Values {
ts_millis_values: vec![1000, 2000],
..Default::default()
}),
..Default::default()
},
Column {
column_name: VALUE_COLUMN_NAME.to_string(),
values: Some(column::Values {
f64_values: vec![3.0, 7.0],
..Default::default()
}),
..Default::default()
},
Column {
column_name: "instance".to_string(),
values: Some(column::Values {
string_values: vec!["host1".to_string(), "host2".to_string()],
..Default::default()
}),
..Default::default()
},
],
};
let timeseries = select_result_to_timeseries("metric1", select_result).unwrap();
assert_eq!(2, timeseries.len());
assert_eq!(
vec![
Label {
name: METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
name: "instance".to_string(),
value: "host1".to_string(),
},
],
timeseries[0].labels
);
assert_eq!(
timeseries[0].samples,
vec![Sample {
value: 3.0,
timestamp: 1000,
}]
);
assert_eq!(
vec![
Label {
name: METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
},
Label {
name: "instance".to_string(),
value: "host2".to_string(),
},
],
timeseries[1].labels
);
assert_eq!(
timeseries[1].samples,
vec![Sample {
value: 7.0,
timestamp: 2000,
}]
);
}
}

View File

@@ -1,12 +1,15 @@
use std::sync::Arc;
use api::prometheus::remote::{ReadRequest, WriteRequest};
use api::v1::{AdminExpr, AdminResult, ObjectExpr, ObjectResult};
use async_trait::async_trait;
use common_query::Output;
use crate::error::Result;
use crate::http::HttpResponse;
use crate::influxdb::InfluxdbRequest;
use crate::opentsdb::codec::DataPoint;
use crate::prometheus::Metrics;
/// All query handler traits for various request protocols, like SQL or GRPC.
/// Instance that wishes to support certain request protocol, just implement the corresponding
@@ -23,6 +26,7 @@ pub type GrpcQueryHandlerRef = Arc<dyn GrpcQueryHandler + Send + Sync>;
pub type GrpcAdminHandlerRef = Arc<dyn GrpcAdminHandler + Send + Sync>;
pub type OpentsdbProtocolHandlerRef = Arc<dyn OpentsdbProtocolHandler + Send + Sync>;
pub type InfluxdbLineProtocolHandlerRef = Arc<dyn InfluxdbLineProtocolHandler + Send + Sync>;
pub type PrometheusProtocolHandlerRef = Arc<dyn PrometheusProtocolHandler + Send + Sync>;
#[async_trait]
pub trait SqlQueryHandler {
@@ -54,3 +58,13 @@ pub trait OpentsdbProtocolHandler {
/// Only on error will the socket return a line of data.
async fn exec(&self, data_point: &DataPoint) -> Result<()>;
}
#[async_trait]
pub trait PrometheusProtocolHandler {
/// Handling prometheus remote write requests
async fn write(&self, request: WriteRequest) -> Result<()>;
/// Handling prometheus remote read requests
async fn read(&self, request: ReadRequest) -> Result<HttpResponse>;
/// Handling push gateway requests
async fn ingest_metrics(&self, metrics: Metrics) -> Result<()>;
}

View File

@@ -1,3 +1,4 @@
mod http_handler_test;
mod influxdb_test;
mod opentsdb_test;
mod prometheus_test;

View File

@@ -0,0 +1,144 @@
use std::sync::Arc;
use api::prometheus::remote::{
LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest,
};
use async_trait::async_trait;
use axum::Router;
use axum_test_helper::TestClient;
use common_query::Output;
use prost::Message;
use servers::error::Result;
use servers::http::BytesResponse;
use servers::http::HttpResponse;
use servers::http::HttpServer;
use servers::prometheus;
use servers::prometheus::snappy_compress;
use servers::prometheus::Metrics;
use servers::query_handler::{PrometheusProtocolHandler, SqlQueryHandler};
use tokio::sync::mpsc;
struct DummyInstance {
tx: mpsc::Sender<Vec<u8>>,
}
#[async_trait]
impl PrometheusProtocolHandler for DummyInstance {
async fn write(&self, request: WriteRequest) -> Result<()> {
let _ = self.tx.send(request.encode_to_vec()).await;
Ok(())
}
async fn read(&self, request: ReadRequest) -> Result<HttpResponse> {
let _ = self.tx.send(request.encode_to_vec()).await;
let response = ReadResponse {
results: vec![QueryResult {
timeseries: prometheus::mock_timeseries(),
}],
};
Ok(HttpResponse::Bytes(BytesResponse {
content_type: "application/x-protobuf".to_string(),
content_encoding: "snappy".to_string(),
bytes: response.encode_to_vec(),
}))
}
async fn ingest_metrics(&self, _metrics: Metrics) -> Result<()> {
unimplemented!();
}
}
#[async_trait]
impl SqlQueryHandler for DummyInstance {
async fn do_query(&self, _query: &str) -> Result<Output> {
unimplemented!()
}
async fn insert_script(&self, _name: &str, _script: &str) -> Result<()> {
unimplemented!()
}
async fn execute_script(&self, _name: &str) -> Result<Output> {
unimplemented!()
}
}
fn make_test_app(tx: mpsc::Sender<Vec<u8>>) -> Router {
let instance = Arc::new(DummyInstance { tx });
let mut server = HttpServer::new(instance.clone());
server.set_prom_handler(instance);
server.make_app()
}
#[tokio::test]
async fn test_prometheus_remote_write_read() {
let (tx, mut rx) = mpsc::channel(100);
let app = make_test_app(tx);
let client = TestClient::new(app);
let write_request = WriteRequest {
timeseries: prometheus::mock_timeseries(),
..Default::default()
};
let result = client
.post("/v1/prometheus/write")
.body(snappy_compress(&write_request.clone().encode_to_vec()[..]).unwrap())
.send()
.await;
assert_eq!(result.status(), 204);
assert!(result.text().await.is_empty());
let read_request = ReadRequest {
queries: vec![Query {
start_timestamp_ms: 1000,
end_timestamp_ms: 2000,
matchers: vec![LabelMatcher {
name: prometheus::METRIC_NAME_LABEL.to_string(),
value: "metric1".to_string(),
r#type: 0,
}],
..Default::default()
}],
..Default::default()
};
let mut result = client
.post("/v1/prometheus/read")
.body(snappy_compress(&read_request.clone().encode_to_vec()[..]).unwrap())
.send()
.await;
assert_eq!(result.status(), 200);
let headers = result.headers();
assert_eq!(
Some("application/x-protobuf"),
headers.get("content-type").map(|x| x.to_str().unwrap())
);
assert_eq!(
Some("snappy"),
headers.get("content-encoding").map(|x| x.to_str().unwrap())
);
let response = result.chunk().await.unwrap();
let response = ReadResponse::decode(&response[..]).unwrap();
assert_eq!(response.results.len(), 1);
assert_eq!(
response.results[0].timeseries,
prometheus::mock_timeseries()
);
let mut requests = vec![];
while let Ok(s) = rx.try_recv() {
requests.push(s);
}
assert_eq!(2, requests.len());
assert_eq!(
write_request,
WriteRequest::decode(&requests[0][..]).unwrap()
);
assert_eq!(read_request, ReadRequest::decode(&requests[1][..]).unwrap());
}

View File

@@ -96,12 +96,6 @@ async fn test_shutdown_pg_server() -> Result<()> {
for handle in join_handles.iter_mut() {
let result = handle.await.unwrap();
assert!(result.is_err());
let error = result.unwrap_err().to_string();
assert!(
error.contains("Connection refused")
|| error.contains("Connection reset by peer")
|| error.contains("close")
);
}
Ok(())