diff --git a/Cargo.lock b/Cargo.lock index f683068b01..391b9b2c2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -273,6 +273,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "auto_ops" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7460f7dd8e100147b82a63afca1a20eb6c231ee36b90ba7272e14951cb58af59" + [[package]] name = "autocfg" version = "1.1.0" @@ -1780,6 +1786,8 @@ dependencies = [ "datanode", "datatypes", "futures", + "openmetrics-parser", + "prost 0.11.0", "serde", "servers", "snafu", @@ -3136,6 +3144,17 @@ dependencies = [ "ureq", ] +[[package]] +name = "openmetrics-parser" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5caf1ccaaf43651cc5abda77353a173869d8d8b0238f2faacb23d6b32931e860" +dependencies = [ + "auto_ops", + "pest", + "pest_derive", +] + [[package]] name = "opensrv-mysql" version = "0.1.0" @@ -3344,6 +3363,50 @@ version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e" +[[package]] +name = "pest" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc7bc69c062e492337d74d59b120c274fd3d261b6bf6d3207d499b4b379c41a" +dependencies = [ + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b75706b9642ebcb34dab3bc7750f811609a0eb1dd8b88c2d15bf628c1c65b2" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4f9272122f5979a6511a749af9db9bfc810393f63119970d7085fed1c4ea0db" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pest_meta" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c8717927f9b79515e565a64fe46c38b8cd0427e64c40680b14a7365ab09ac8d" +dependencies = [ + "once_cell", + "pest", + "sha1", +] + [[package]] name = "petgraph" version = "0.6.2" @@ -4652,14 +4715,17 @@ dependencies = [ "metrics", "mysql_async", "num_cpus", + "openmetrics-parser", "opensrv-mysql", "pgwire", + "prost 0.11.0", "query", "rand 0.8.5", "script", "serde", "serde_json", "snafu", + "snap", "table", "tokio", "tokio-postgres", @@ -5756,6 +5822,12 @@ version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987" +[[package]] +name = "ucd-trie" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e79c4d996edb816c91e4308506774452e55e95c3c9de07b6729e17e15a5ef81" + [[package]] name = "uname" version = "0.1.1" diff --git a/src/api/build.rs b/src/api/build.rs index 1b8e171cb5..e15f2f80bc 100644 --- a/src/api/build.rs +++ b/src/api/build.rs @@ -10,6 +10,7 @@ fn main() { "greptime/v1/meta/heartbeat.proto", "greptime/v1/meta/route.proto", "greptime/v1/meta/store.proto", + "prometheus/remote/remote.proto", ], &["."], ) diff --git a/src/api/prometheus/remote/remote.proto b/src/api/prometheus/remote/remote.proto new file mode 100644 index 0000000000..623f363055 --- /dev/null +++ b/src/api/prometheus/remote/remote.proto @@ -0,0 +1,85 @@ +// Copyright 2016 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package prometheus; + +option go_package = "prompb"; + +import "prometheus/remote/types.proto"; + +message WriteRequest { + repeated prometheus.TimeSeries timeseries = 1; + // Cortex uses this field to determine the source of the write request. + // We reserve it to avoid any compatibility issues. + reserved 2; + repeated prometheus.MetricMetadata metadata = 3; +} + +// ReadRequest represents a remote read request. +message ReadRequest { + repeated Query queries = 1; + + enum ResponseType { + // Server will return a single ReadResponse message with matched series that includes list of raw samples. + // It's recommended to use streamed response types instead. + // + // Response headers: + // Content-Type: "application/x-protobuf" + // Content-Encoding: "snappy" + SAMPLES = 0; + // Server will stream a delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series. + // Each message is following varint size and fixed size bigendian uint32 for CRC32 Castagnoli checksum. + // + // Response headers: + // Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse" + // Content-Encoding: "" + STREAMED_XOR_CHUNKS = 1; + } + + // accepted_response_types allows negotiating the content type of the response. + // + // Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is + // implemented by server, error is returned. + // For request that do not contain `accepted_response_types` field the SAMPLES response type will be used. + repeated ResponseType accepted_response_types = 2; +} + +// ReadResponse is a response when response_type equals SAMPLES. +message ReadResponse { + // In same order as the request's queries. + repeated QueryResult results = 1; +} + +message Query { + int64 start_timestamp_ms = 1; + int64 end_timestamp_ms = 2; + repeated prometheus.LabelMatcher matchers = 3; + prometheus.ReadHints hints = 4; +} + +message QueryResult { + // Samples within a time series must be ordered by time. + repeated prometheus.TimeSeries timeseries = 1; +} + +// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS. +// We strictly stream full series after series, optionally split by time. This means that a single frame can contain +// partition of the single series, but once a new series is started to be streamed it means that no more chunks will +// be sent for previous one. Series are returned sorted in the same way TSDB block are internally. +message ChunkedReadResponse { + repeated prometheus.ChunkedSeries chunked_series = 1; + + // query_index represents an index of the query from ReadRequest.queries these chunks relates to. + int64 query_index = 2; +} diff --git a/src/api/prometheus/remote/types.proto b/src/api/prometheus/remote/types.proto new file mode 100644 index 0000000000..0d17e88d29 --- /dev/null +++ b/src/api/prometheus/remote/types.proto @@ -0,0 +1,117 @@ +// Copyright 2017 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package prometheus; + +option go_package = "prompb"; + +message MetricMetadata { + enum MetricType { + UNKNOWN = 0; + COUNTER = 1; + GAUGE = 2; + HISTOGRAM = 3; + GAUGEHISTOGRAM = 4; + SUMMARY = 5; + INFO = 6; + STATESET = 7; + } + + // Represents the metric type, these match the set from Prometheus. + // Refer to model/textparse/interface.go for details. + MetricType type = 1; + string metric_family_name = 2; + string help = 4; + string unit = 5; +} + +message Sample { + double value = 1; + // timestamp is in ms format, see model/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + int64 timestamp = 2; +} + +message Exemplar { + // Optional, can be empty. + repeated Label labels = 1; + double value = 2; + // timestamp is in ms format, see model/timestamp/timestamp.go for + // conversion from time.Time to Prometheus timestamp. + int64 timestamp = 3; +} + +// TimeSeries represents samples and labels for a single time series. +message TimeSeries { + // For a timeseries to be valid, and for the samples and exemplars + // to be ingested by the remote system properly, the labels field is required. + repeated Label labels = 1; + repeated Sample samples = 2; + repeated Exemplar exemplars = 3; +} + +message Label { + string name = 1; + string value = 2; +} + +message Labels { + repeated Label labels = 1; +} + +// Matcher specifies a rule, which can match or set of labels or not. +message LabelMatcher { + enum Type { + EQ = 0; + NEQ = 1; + RE = 2; + NRE = 3; + } + Type type = 1; + string name = 2; + string value = 3; +} + +message ReadHints { + int64 step_ms = 1; // Query step size in milliseconds. + string func = 2; // String representation of surrounding function or aggregation. + int64 start_ms = 3; // Start time in milliseconds. + int64 end_ms = 4; // End time in milliseconds. + repeated string grouping = 5; // List of label names used in aggregation. + bool by = 6; // Indicate whether it is without or by. + int64 range_ms = 7; // Range vector selector range in milliseconds. +} + +// Chunk represents a TSDB chunk. +// Time range [min, max] is inclusive. +message Chunk { + int64 min_time_ms = 1; + int64 max_time_ms = 2; + + // We require this to match chunkenc.Encoding. + enum Encoding { + UNKNOWN = 0; + XOR = 1; + } + Encoding type = 3; + bytes data = 4; +} + +// ChunkedSeries represents single, encoded time series. +message ChunkedSeries { + // Labels should be sorted. + repeated Label labels = 1; + // Chunks will be in start time order and may overlap. + repeated Chunk chunks = 2; +} diff --git a/src/api/src/lib.rs b/src/api/src/lib.rs index 51614463aa..d5e3d6188e 100644 --- a/src/api/src/lib.rs +++ b/src/api/src/lib.rs @@ -1,5 +1,6 @@ pub mod error; pub mod helper; +pub mod prometheus; pub mod serde; pub mod v1; diff --git a/src/api/src/prometheus.rs b/src/api/src/prometheus.rs new file mode 100644 index 0000000000..31e109b909 --- /dev/null +++ b/src/api/src/prometheus.rs @@ -0,0 +1,5 @@ +#![allow(clippy::derive_partial_eq_without_eq)] + +pub mod remote { + tonic::include_proto!("prometheus"); +} diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 2986fdfc24..293533f14e 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -16,6 +16,8 @@ common-runtime = { path = "../common/runtime" } common-telemetry = { path = "../common/telemetry" } common-time = { path = "../common/time" } datatypes = { path = "../datatypes" } +openmetrics-parser = "0.4" +prost = "0.11" serde = "1.0" servers = { path = "../servers" } snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/frontend/src/frontend.rs b/src/frontend/src/frontend.rs index 29bf944ad4..20d5a7fe4f 100644 --- a/src/frontend/src/frontend.rs +++ b/src/frontend/src/frontend.rs @@ -9,6 +9,7 @@ use crate::instance::Instance; use crate::mysql::MysqlOptions; use crate::opentsdb::OpentsdbOptions; use crate::postgres::PostgresOptions; +use crate::prometheus::PrometheusOptions; use crate::server::Services; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -19,6 +20,7 @@ pub struct FrontendOptions { pub postgres_options: Option, pub opentsdb_options: Option, pub influxdb_options: Option, + pub prometheus_options: Option, } impl Default for FrontendOptions { @@ -30,6 +32,7 @@ impl Default for FrontendOptions { postgres_options: Some(PostgresOptions::default()), opentsdb_options: Some(OpentsdbOptions::default()), influxdb_options: Some(InfluxdbOptions::default()), + prometheus_options: Some(PrometheusOptions::default()), } } } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 49c489e068..29ce0f83db 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -1,5 +1,6 @@ mod influxdb; mod opentsdb; +mod prometheus; use std::collections::HashMap; use std::sync::Arc; diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 9ed7bdd5e3..f7ac68993f 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -128,13 +128,13 @@ mod tests { let pretty_print = arrow_print::write(&recordbatches); let pretty_print = pretty_print.lines().collect::>(); let expected = vec![ - "+---------------------+-------+-------+-------+-------+", - "| timestamp | value | tagk1 | tagk2 | tagk3 |", - "+---------------------+-------+-------+-------+-------+", - "| 1970-01-01 00:00:01 | 1 | tagv1 | tagv2 | |", - "| 1970-01-01 00:00:02 | 2 | | tagv2 | tagv3 |", - "| 1970-01-01 00:00:03 | 3 | | | |", - "+---------------------+-------+-------+-------+-------+", + "+---------------------+----------------+-------+-------+-------+", + "| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |", + "+---------------------+----------------+-------+-------+-------+", + "| 1970-01-01 00:00:01 | 1 | tagv1 | tagv2 | |", + "| 1970-01-01 00:00:02 | 2 | | tagv2 | tagv3 |", + "| 1970-01-01 00:00:03 | 3 | | | |", + "+---------------------+----------------+-------+-------+-------+", ]; assert_eq!(pretty_print, expected); } diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs new file mode 100644 index 0000000000..7f3a3cede3 --- /dev/null +++ b/src/frontend/src/instance/prometheus.rs @@ -0,0 +1,282 @@ +use api::prometheus::remote::{ + read_request::ResponseType, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest, +}; +use async_trait::async_trait; +use client::ObjectResult; +use client::{Database, Select}; +use common_error::prelude::BoxedError; +use common_telemetry::logging; +use prost::Message; +use servers::error::{self, Result as ServerResult}; +use servers::http::{BytesResponse, HttpResponse}; +use servers::prometheus::{self, Metrics}; +use servers::query_handler::PrometheusProtocolHandler; +use snafu::{OptionExt, ResultExt}; + +use crate::instance::Instance; + +const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; + +#[inline] +fn is_supported(response_type: i32) -> bool { + // Only supports samples response right now + response_type == SAMPLES_RESPONSE_TYPE +} + +/// Negotiating the content type of the remote read response. +/// +/// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is +/// implemented by server, error is returned. +/// For request that do not contain `accepted_response_types` field the SAMPLES response type will be used. +fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult { + if accepted_response_types.is_empty() { + return Ok(ResponseType::Samples); + } + + let response_type = accepted_response_types + .iter() + .find(|t| is_supported(**t)) + .with_context(|| error::NotSupportedSnafu { + feat: format!( + "server does not support any of the requested response types: {:?}", + accepted_response_types + ), + })?; + + // It's safe to unwrap here, we known that it should be SAMPLES_RESPONSE_TYPE + Ok(ResponseType::from_i32(*response_type).unwrap()) +} + +fn object_result_to_query_result( + table_name: &str, + object_result: ObjectResult, +) -> ServerResult { + let select_result = match object_result { + ObjectResult::Select(result) => result, + _ => unreachable!(), + }; + + Ok(QueryResult { + timeseries: prometheus::select_result_to_timeseries(table_name, select_result)?, + }) +} + +async fn handle_remote_queries( + db: &Database, + queries: &[Query], +) -> ServerResult> { + let mut results = Vec::with_capacity(queries.len()); + + for q in queries { + let (table_name, sql) = prometheus::query_to_sql(q)?; + + logging::debug!( + "prometheus remote read, table: {}, sql: {}", + table_name, + sql + ); + + let object_result = db + .select(Select::Sql(sql.clone())) + .await + .map_err(BoxedError::new) + .context(error::ExecuteQuerySnafu { query: sql })?; + + results.push((table_name, object_result)); + } + + Ok(results) +} + +#[async_trait] +impl PrometheusProtocolHandler for Instance { + async fn write(&self, request: WriteRequest) -> ServerResult<()> { + let exprs = prometheus::write_request_to_insert_exprs(request)?; + + self.db + .batch_insert(exprs) + .await + .map_err(BoxedError::new) + .context(error::ExecuteInsertSnafu { + msg: "failed to write prometheus remote request", + })?; + + Ok(()) + } + + async fn read(&self, request: ReadRequest) -> ServerResult { + let response_type = negotiate_response_type(&request.accepted_response_types)?; + + // TODO(dennis): use read_hints to speedup query if possible + let results = handle_remote_queries(&self.db, &request.queries).await?; + + match response_type { + ResponseType::Samples => { + let query_results = results + .into_iter() + .map(|(table_name, object_result)| { + object_result_to_query_result(&table_name, object_result) + }) + .collect::>>()?; + + let response = ReadResponse { + results: query_results, + }; + + // TODO(dennis): may consume too much memory, adds flow control + Ok(HttpResponse::Bytes(BytesResponse { + content_type: "application/x-protobuf".to_string(), + content_encoding: "snappy".to_string(), + bytes: prometheus::snappy_compress(&response.encode_to_vec())?, + })) + } + ResponseType::StreamedXorChunks => error::NotSupportedSnafu { + feat: "streamed remote read", + } + .fail(), + } + } + + async fn ingest_metrics(&self, _metrics: Metrics) -> ServerResult<()> { + todo!(); + } +} + +#[cfg(test)] +mod tests { + use api::prometheus::remote::{ + label_matcher::Type as MatcherType, Label, LabelMatcher, Sample, + }; + + use super::*; + use crate::tests; + + #[tokio::test] + async fn test_prometheus_remote_write_and_read() { + let instance = tests::create_frontend_instance().await; + + let write_request = WriteRequest { + timeseries: prometheus::mock_timeseries(), + ..Default::default() + }; + + assert!(instance.write(write_request).await.is_ok()); + + let read_request = ReadRequest { + queries: vec![ + Query { + start_timestamp_ms: 1000, + end_timestamp_ms: 2000, + matchers: vec![LabelMatcher { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric1".to_string(), + r#type: 0, + }], + ..Default::default() + }, + Query { + start_timestamp_ms: 1000, + end_timestamp_ms: 3000, + matchers: vec![ + LabelMatcher { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric3".to_string(), + r#type: 0, + }, + LabelMatcher { + name: "app".to_string(), + value: "biz".to_string(), + r#type: MatcherType::Eq as i32, + }, + ], + ..Default::default() + }, + ], + ..Default::default() + }; + + let response = instance.read(read_request).await.unwrap(); + + match response { + HttpResponse::Bytes(resp) => { + assert_eq!(resp.content_type, "application/x-protobuf"); + assert_eq!(resp.content_encoding, "snappy"); + let body = prometheus::snappy_decompress(&resp.bytes).unwrap(); + let read_response = ReadResponse::decode(&body[..]).unwrap(); + let query_results = read_response.results; + assert_eq!(2, query_results.len()); + + assert_eq!(1, query_results[0].timeseries.len()); + let timeseries = &query_results[0].timeseries[0]; + + assert_eq!( + vec![ + Label { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric1".to_string(), + }, + Label { + name: "job".to_string(), + value: "spark".to_string(), + }, + ], + timeseries.labels + ); + + assert_eq!( + timeseries.samples, + vec![ + Sample { + value: 1.0, + timestamp: 1000, + }, + Sample { + value: 2.0, + timestamp: 2000, + } + ] + ); + + assert_eq!(1, query_results[1].timeseries.len()); + let timeseries = &query_results[1].timeseries[0]; + + assert_eq!( + vec![ + Label { + name: prometheus::METRIC_NAME_LABEL.to_string(), + value: "metric3".to_string(), + }, + Label { + name: "idc".to_string(), + value: "z002".to_string(), + }, + Label { + name: "app".to_string(), + value: "biz".to_string(), + }, + ], + timeseries.labels + ); + + assert_eq!( + timeseries.samples, + vec![ + Sample { + value: 5.0, + timestamp: 1000, + }, + Sample { + value: 6.0, + timestamp: 2000, + }, + Sample { + value: 7.0, + timestamp: 3000, + } + ] + ); + } + _ => unreachable!(), + } + } +} diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index f54699178d..5dd6d1047e 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -8,6 +8,7 @@ pub mod mysql; pub mod opentsdb; pub mod partition; pub mod postgres; +pub mod prometheus; mod server; pub mod spliter; #[cfg(test)] diff --git a/src/frontend/src/prometheus.rs b/src/frontend/src/prometheus.rs new file mode 100644 index 0000000000..2d4a1633cf --- /dev/null +++ b/src/frontend/src/prometheus.rs @@ -0,0 +1,23 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct PrometheusOptions { + pub enable: bool, +} + +impl Default for PrometheusOptions { + fn default() -> Self { + Self { enable: true } + } +} + +#[cfg(test)] +mod tests { + use super::PrometheusOptions; + + #[test] + fn test_prometheus_options() { + let default = PrometheusOptions::default(); + assert!(default.enable); + } +} diff --git a/src/frontend/src/server.rs b/src/frontend/src/server.rs index 99686041d2..b34c7de5cb 100644 --- a/src/frontend/src/server.rs +++ b/src/frontend/src/server.rs @@ -15,6 +15,7 @@ use crate::error::{self, Result}; use crate::frontend::FrontendOptions; use crate::influxdb::InfluxdbOptions; use crate::instance::InstanceRef; +use crate::prometheus::PrometheusOptions; pub(crate) struct Services; @@ -99,6 +100,13 @@ impl Services { http_server.set_influxdb_handler(instance.clone()); } + if matches!( + opts.prometheus_options, + Some(PrometheusOptions { enable: true }) + ) { + http_server.set_prom_handler(instance.clone()); + } + Some((Box::new(http_server) as _, http_addr)) } else { None diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index c642c43d43..ae590a6d64 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -25,11 +25,14 @@ hyper = { version = "0.14", features = ["full"] } influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" } metrics = "0.20" num_cpus = "1.13" +openmetrics-parser = "0.4" opensrv-mysql = "0.1" pgwire = { version = "0.4" } +prost = "0.11" serde = "1.0" serde_json = "1.0" snafu = { version = "0.7", features = ["backtraces"] } +snap = "1" tokio = { version = "1.20", features = ["full"] } tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.8" diff --git a/src/servers/src/error.rs b/src/servers/src/error.rs index 91312bf9e2..af7f71f686 100644 --- a/src/servers/src/error.rs +++ b/src/servers/src/error.rs @@ -55,6 +55,13 @@ pub enum Error { source: BoxedError, }, + #[snafu(display("Failed to execute insert: {}, source: {}", msg, source))] + ExecuteInsert { + msg: String, + #[snafu(backtrace)] + source: BoxedError, + }, + #[snafu(display("Failed to insert script with name: {}, source: {}", name, source))] InsertScript { name: String, @@ -121,6 +128,24 @@ pub enum Error { #[snafu(backtrace)] source: BoxedError, }, + + #[snafu(display("Failed to decode prometheus remote request, source: {}", source))] + DecodePromRemoteRequest { + backtrace: Backtrace, + source: prost::DecodeError, + }, + + #[snafu(display("Failed to decompress prometheus remote request, source: {}", source))] + DecompressPromRemoteRequest { + backtrace: Backtrace, + source: snap::Error, + }, + + #[snafu(display("Invalid prometheus remote request, msg: {}", msg))] + InvalidPromRemoteRequest { msg: String, backtrace: Backtrace }, + + #[snafu(display("Invalid prometheus remote read query result, msg: {}", msg))] + InvalidPromRemoteReadQueryResult { msg: String, backtrace: Backtrace }, } pub type Result = std::result::Result; @@ -136,11 +161,13 @@ impl ErrorExt for Error { | CollectRecordbatch { .. } | StartHttp { .. } | StartGrpc { .. } + | InvalidPromRemoteReadQueryResult { .. } | TcpBind { .. } => StatusCode::Internal, InsertScript { source, .. } | ExecuteScript { source, .. } | ExecuteQuery { source, .. } + | ExecuteInsert { source, .. } | PutOpentsdbDataPoint { source, .. } => source.status_code(), NotSupported { .. } @@ -149,6 +176,9 @@ impl ErrorExt for Error { | ConnResetByPeer { .. } | InvalidOpentsdbLine { .. } | InvalidOpentsdbJsonRequest { .. } + | DecodePromRemoteRequest { .. } + | DecompressPromRemoteRequest { .. } + | InvalidPromRemoteRequest { .. } | TimePrecision { .. } => StatusCode::InvalidArguments, InfluxdbLinesWrite { source, .. } => source.status_code(), @@ -184,6 +214,9 @@ impl IntoResponse for Error { | Error::InfluxdbLinesWrite { .. } | Error::InvalidOpentsdbLine { .. } | Error::InvalidOpentsdbJsonRequest { .. } + | Error::DecodePromRemoteRequest { .. } + | Error::DecompressPromRemoteRequest { .. } + | Error::InvalidPromRemoteRequest { .. } | Error::InvalidQuery { .. } | Error::TimePrecision { .. } => (HttpStatusCode::BAD_REQUEST, self.to_string()), _ => (HttpStatusCode::INTERNAL_SERVER_ERROR, self.to_string()), diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 9f31d19f6b..b641bc0d1a 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -1,6 +1,7 @@ pub mod handler; pub mod influxdb; pub mod opentsdb; +pub mod prometheus; use std::net::SocketAddr; use std::time::Duration; @@ -8,6 +9,7 @@ use std::time::Duration; use async_trait::async_trait; use axum::{ error_handling::HandleErrorLayer, + http::header, response::IntoResponse, response::{Json, Response}, routing, BoxError, Router, @@ -23,7 +25,9 @@ use tower_http::trace::TraceLayer; use self::influxdb::influxdb_write; use crate::error::{Result, StartHttpSnafu}; use crate::query_handler::SqlQueryHandlerRef; -use crate::query_handler::{InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef}; +use crate::query_handler::{ + InfluxdbLineProtocolHandlerRef, OpentsdbProtocolHandlerRef, PrometheusProtocolHandlerRef, +}; use crate::server::Server; const HTTP_API_VERSION: &str = "v1"; @@ -32,6 +36,7 @@ pub struct HttpServer { sql_handler: SqlQueryHandlerRef, influxdb_handler: Option, opentsdb_handler: Option, + prom_handler: Option, } #[derive(Serialize, Debug)] @@ -40,10 +45,18 @@ pub enum JsonOutput { Rows(Vec), } +#[derive(Serialize, Debug)] +pub struct BytesResponse { + pub content_type: String, + pub content_encoding: String, + pub bytes: Vec, +} + #[derive(Serialize, Debug)] pub enum HttpResponse { Json(JsonResponse), Text(String), + Bytes(BytesResponse), } #[derive(Serialize, Debug)] @@ -60,6 +73,14 @@ impl IntoResponse for HttpResponse { match self { HttpResponse::Json(json) => Json(json).into_response(), HttpResponse::Text(text) => text.into_response(), + HttpResponse::Bytes(resp) => ( + [ + (header::CONTENT_TYPE, resp.content_type), + (header::CONTENT_ENCODING, resp.content_encoding), + ], + resp.bytes, + ) + .into_response(), } } } @@ -125,6 +146,7 @@ impl HttpServer { sql_handler, opentsdb_handler: None, influxdb_handler: None, + prom_handler: None, } } @@ -144,6 +166,14 @@ impl HttpServer { self.influxdb_handler.get_or_insert(handler); } + pub fn set_prom_handler(&mut self, handler: PrometheusProtocolHandlerRef) { + debug_assert!( + self.prom_handler.is_none(), + "Prometheus protocol handler can be set only once!" + ); + self.prom_handler.get_or_insert(handler); + } + pub fn make_app(&self) -> Router { // TODO(LFC): Use released Axum. // Axum version 0.6 introduces state within router, making router methods far more elegant @@ -174,6 +204,14 @@ impl HttpServer { router = router.nest(&format!("/{}/influxdb", HTTP_API_VERSION), influxdb_router); } + if let Some(prom_handler) = self.prom_handler.clone() { + let prom_router = Router::with_state(prom_handler) + .route("/write", routing::post(prometheus::remote_write)) + .route("/read", routing::post(prometheus::remote_read)); + + router = router.nest(&format!("/{}/prometheus", HTTP_API_VERSION), prom_router); + } + router .route("/metrics", routing::get(handler::metrics)) // middlewares diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs new file mode 100644 index 0000000000..e0074c6007 --- /dev/null +++ b/src/servers/src/http/prometheus.rs @@ -0,0 +1,54 @@ +use api::prometheus::remote::{ReadRequest, WriteRequest}; +use axum::extract::{RawBody, State}; +use axum::http::StatusCode; +use hyper::Body; +use prost::Message; +use snafu::prelude::*; + +use crate::error::Result; +use crate::error::{self}; +use crate::http::HttpResponse; +use crate::prometheus::snappy_decompress; +use crate::query_handler::PrometheusProtocolHandlerRef; + +#[axum_macros::debug_handler] +pub async fn remote_write( + State(handler): State, + RawBody(body): RawBody, +) -> Result<(StatusCode, HttpResponse)> { + let request = decode_remote_write_request(body).await?; + + handler.write(request).await?; + + Ok((StatusCode::NO_CONTENT, HttpResponse::Text("".to_string()))) +} + +#[axum_macros::debug_handler] +pub async fn remote_read( + State(handler): State, + RawBody(body): RawBody, +) -> Result { + let request = decode_remote_read_request(body).await?; + + handler.read(request).await +} + +async fn decode_remote_write_request(body: Body) -> Result { + let body = hyper::body::to_bytes(body) + .await + .context(error::HyperSnafu)?; + + let buf = snappy_decompress(&body[..])?; + + WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu) +} + +async fn decode_remote_read_request(body: Body) -> Result { + let body = hyper::body::to_bytes(body) + .await + .context(error::HyperSnafu)?; + + let buf = snappy_decompress(&body[..])?; + + ReadRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu) +} diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index af3b257882..7d32f5facb 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -7,6 +7,7 @@ pub mod influxdb; pub mod mysql; pub mod opentsdb; pub mod postgres; +pub mod prometheus; pub mod query_handler; pub mod server; mod shutdown; diff --git a/src/servers/src/opentsdb/codec.rs b/src/servers/src/opentsdb/codec.rs index b856471d1a..f02c44f135 100644 --- a/src/servers/src/opentsdb/codec.rs +++ b/src/servers/src/opentsdb/codec.rs @@ -5,8 +5,8 @@ use api::v1::{column, column::SemanticType, insert_expr, Column, ColumnDataType, use crate::error::{self, Result}; -pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "timestamp"; -pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "value"; +pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; +pub const OPENTSDB_VALUE_COLUMN_NAME: &str = "greptime_value"; #[derive(Debug)] pub struct DataPoint { diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs new file mode 100644 index 0000000000..953ea822d9 --- /dev/null +++ b/src/servers/src/prometheus.rs @@ -0,0 +1,694 @@ +//! promethues protcol supportings +use std::cmp::Ordering; +use std::collections::{BTreeMap, HashMap}; +use std::hash::{Hash, Hasher}; + +use api::prometheus::remote::{ + label_matcher::Type as MatcherType, Label, Query, Sample, TimeSeries, WriteRequest, +}; +use api::v1::codec::InsertBatch; +use api::v1::{ + codec::SelectResult, column, column::SemanticType, insert_expr, Column, ColumnDataType, + InsertExpr, +}; +use openmetrics_parser::{MetricsExposition, PrometheusType, PrometheusValue}; +use snafu::{OptionExt, ResultExt}; +use snap::raw::{Decoder, Encoder}; + +use crate::error::{self, Result}; + +const TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp"; +const VALUE_COLUMN_NAME: &str = "greptime_value"; +pub const METRIC_NAME_LABEL: &str = "__name__"; + +/// Metrics for push gateway protocol +pub struct Metrics { + pub exposition: MetricsExposition, +} + +/// Generate a sql from a remote request query +/// TODO(dennis): maybe use logical plan in future to prevent sql injection +pub fn query_to_sql(q: &Query) -> Result<(String, String)> { + let start_timestamp_ms = q.start_timestamp_ms; + let end_timestamp_ms = q.end_timestamp_ms; + + let label_matches = &q.matchers; + let table_name = label_matches + .iter() + .find_map(|m| { + if m.name == METRIC_NAME_LABEL { + Some(m.value.to_string()) + } else { + None + } + }) + .context(error::InvalidPromRemoteRequestSnafu { + msg: "missing '__name__' label in timeseries", + })?; + + let mut conditions: Vec = Vec::with_capacity(label_matches.len()); + + conditions.push(format!( + "{}>={} AND {}<={}", + TIMESTAMP_COLUMN_NAME, start_timestamp_ms, TIMESTAMP_COLUMN_NAME, end_timestamp_ms, + )); + + for m in label_matches { + let name = &m.name; + + if name == METRIC_NAME_LABEL { + continue; + } + + let value = &m.value; + let m_type = + MatcherType::from_i32(m.r#type).context(error::InvalidPromRemoteRequestSnafu { + msg: format!("invaid LabelMatcher type: {}", m.r#type), + })?; + + match m_type { + MatcherType::Eq => { + conditions.push(format!("{}='{}'", name, value)); + } + MatcherType::Neq => { + conditions.push(format!("{}!='{}'", name, value)); + } + // Case senstive regexp match + MatcherType::Re => { + conditions.push(format!("{}~'{}'", name, value)); + } + // Case senstive regexp not match + MatcherType::Nre => { + conditions.push(format!("{}!~'{}'", name, value)); + } + } + } + + let conditions = conditions.join(" AND "); + + Ok(( + table_name.to_string(), + format!( + "select * from {} where {} order by {}", + table_name, conditions, TIMESTAMP_COLUMN_NAME, + ), + )) +} + +#[inline] +fn new_label(name: String, value: String) -> Label { + Label { name, value } +} + +// A timeseries id +#[derive(Debug)] +struct TimeSeriesId { + labels: Vec