feat: Support export metric in remote write format (#2928)

* feat: remote write metric task

* chore: pass stanalone task to frontend

* chore: change name to system metric

* fix: add header and rename to export metrics
This commit is contained in:
WU Jingdi
2023-12-22 10:09:12 +08:00
committed by GitHub
parent 054bca359e
commit 675767c023
22 changed files with 708 additions and 2 deletions

2
Cargo.lock generated
View File

@@ -1945,6 +1945,7 @@ dependencies = [
"backtrace",
"common-error",
"console-subscriber",
"greptime-proto",
"lazy_static",
"once_cell",
"opentelemetry 0.21.0 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -8463,6 +8464,7 @@ dependencies = [
"query",
"rand",
"regex",
"reqwest",
"rust-embed",
"rustls 0.22.1",
"rustls-pemfile 2.0.0",

View File

@@ -127,3 +127,19 @@ parallel_scan_channel_size = 32
# [logging]
# dir = "/tmp/greptimedb/logs"
# level = "info"
# Datanode export the metrics generated by itself
# encoded to Prometheus remote-write format
# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself)
# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
# headers = {}

View File

@@ -77,3 +77,19 @@ tcp_nodelay = true
timeout = "10s"
connect_timeout = "10s"
tcp_nodelay = true
# Frontend export the metrics generated by itself
# encoded to Prometheus remote-write format
# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself)
# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
# headers = {}

View File

@@ -66,3 +66,19 @@ provider = "raft_engine"
# num_partitions = 1
# Expected number of replicas of each partition.
# replication_factor = 3
# Metasrv export the metrics generated by itself
# encoded to Prometheus remote-write format
# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself)
# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
# headers = {}

View File

@@ -184,3 +184,19 @@ parallel_scan_channel_size = 32
# otlp_endpoint = "localhost:4317"
# The percentage of tracing will be sampled and exported. Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1. ratio > 1 are treated as 1. Fractions < 0 are treated as 0
# tracing_sample_ratio = 1.0
# Standalone export the metrics generated by itself
# encoded to Prometheus remote-write format
# and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself)
# This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
# [export_metrics]
# whether enable export metrics, default is false
# enable = false
# The url of metrics export endpoint, default is `frontend` default HTTP endpoint.
# endpoint = "127.0.0.1:4000"
# The database name of exported metrics stores, user needs to specify a valid database
# db = ""
# The interval of export metrics
# write_interval = "30s"
# HTTP headers of Prometheus remote-write carry
# headers = {}

View File

@@ -249,6 +249,10 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
instance
.build_export_metrics_task(&opts.export_metrics)
.context(StartFrontendSnafu)?;
instance
.build_servers(opts)
.await

View File

@@ -44,6 +44,7 @@ use frontend::service_config::{
};
use mito2::config::MitoConfig;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::http::HttpOptions;
use servers::tls::{TlsMode, TlsOption};
use servers::Mode;
@@ -112,6 +113,7 @@ pub struct StandaloneOptions {
pub user_provider: Option<String>,
/// Options for different store engines.
pub region_engine: Vec<RegionEngineConfig>,
pub export_metrics: ExportMetricsOption,
}
impl Default for StandaloneOptions {
@@ -131,6 +133,7 @@ impl Default for StandaloneOptions {
metadata_store: KvBackendConfig::default(),
procedure: ProcedureConfig::default(),
logging: LoggingOptions::default(),
export_metrics: ExportMetricsOption::default(),
user_provider: None,
region_engine: vec![
RegionEngineConfig::Mito(MitoConfig::default()),
@@ -154,6 +157,8 @@ impl StandaloneOptions {
meta_client: None,
logging: self.logging,
user_provider: self.user_provider,
// Handle the export metrics task run by standalone to frontend for execution
export_metrics: self.export_metrics,
..Default::default()
}
}
@@ -407,6 +412,10 @@ impl StartCommand {
.await
.context(StartFrontendSnafu)?;
frontend
.build_export_metrics_task(&opts.frontend.export_metrics)
.context(StartFrontendSnafu)?;
frontend
.build_servers(opts)
.await

View File

@@ -12,6 +12,7 @@ deadlock_detection = ["parking_lot/deadlock_detection"]
backtrace = "0.3"
common-error.workspace = true
console-subscriber = { version = "0.1", optional = true }
greptime-proto.workspace = true
lazy_static.workspace = true
once_cell.workspace = true
opentelemetry = { version = "0.21.0", default-features = false, features = [

View File

@@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// metric stuffs, inspired by databend
use std::sync::Arc;
use greptime_proto::prometheus::remote::{Sample, TimeSeries};
use greptime_proto::prometheus::*;
use prometheus::proto::{LabelPair, MetricFamily, MetricType};
use prometheus::{Encoder, TextEncoder};
pub fn dump_metrics() -> Result<String, String> {
@@ -25,3 +28,368 @@ pub fn dump_metrics() -> Result<String, String> {
.map_err(|_| "Encode metrics failed".to_string())?;
String::from_utf8(buffer).map_err(|e| e.to_string())
}
/// `MetricFilter` used in `report_metric_task`.
/// for metric user don't want collect, return a `false`, else return a `true`
#[derive(Clone)]
pub struct MetricFilter {
inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>,
}
impl MetricFilter {
pub fn new(inner: Arc<dyn Fn(&MetricFamily) -> bool + Send + Sync>) -> Self {
Self { inner }
}
pub fn filter(&self, mf: &MetricFamily) -> bool {
(self.inner)(mf)
}
}
pub fn convert_metric_to_write_request(
metric_families: Vec<MetricFamily>,
metric_filter: Option<&MetricFilter>,
default_timestamp: i64,
) -> remote::WriteRequest {
let mut timeseries: Vec<TimeSeries> = Vec::with_capacity(metric_families.len());
for mf in metric_families {
if !metric_filter.map(|f| f.filter(&mf)).unwrap_or(true) {
continue;
}
let mf_type = mf.get_field_type();
let mf_name = mf.get_name();
for m in mf.get_metric() {
let timestamp = if m.get_timestamp_ms() == 0 {
default_timestamp
} else {
m.get_timestamp_ms()
};
match mf_type {
MetricType::COUNTER => timeseries.push(TimeSeries {
labels: convert_label(m.get_label(), mf_name, None),
samples: vec![Sample {
value: m.get_counter().get_value(),
timestamp,
}],
exemplars: vec![],
}),
MetricType::GAUGE => timeseries.push(TimeSeries {
labels: convert_label(m.get_label(), mf_name, None),
samples: vec![Sample {
value: m.get_gauge().get_value(),
timestamp,
}],
exemplars: vec![],
}),
MetricType::HISTOGRAM => {
let h = m.get_histogram();
let mut inf_seen = false;
let metric_name = format!("{}_bucket", mf_name);
for b in h.get_bucket() {
let upper_bound = b.get_upper_bound();
timeseries.push(TimeSeries {
labels: convert_label(
m.get_label(),
metric_name.as_str(),
Some(("le", upper_bound.to_string())),
),
samples: vec![Sample {
value: b.get_cumulative_count() as f64,
timestamp,
}],
exemplars: vec![],
});
if upper_bound.is_sign_positive() && upper_bound.is_infinite() {
inf_seen = true;
}
}
if !inf_seen {
timeseries.push(TimeSeries {
labels: convert_label(
m.get_label(),
metric_name.as_str(),
Some(("le", "+Inf".to_string())),
),
samples: vec![Sample {
value: h.get_sample_count() as f64,
timestamp,
}],
exemplars: vec![],
});
}
timeseries.push(TimeSeries {
labels: convert_label(
m.get_label(),
format!("{}_sum", mf_name).as_str(),
None,
),
samples: vec![Sample {
value: h.get_sample_sum(),
timestamp,
}],
exemplars: vec![],
});
timeseries.push(TimeSeries {
labels: convert_label(
m.get_label(),
format!("{}_count", mf_name).as_str(),
None,
),
samples: vec![Sample {
value: h.get_sample_count() as f64,
timestamp,
}],
exemplars: vec![],
});
}
MetricType::SUMMARY => {
let s = m.get_summary();
for q in s.get_quantile() {
timeseries.push(TimeSeries {
labels: convert_label(
m.get_label(),
mf_name,
Some(("quantile", q.get_quantile().to_string())),
),
samples: vec![Sample {
value: q.get_value(),
timestamp,
}],
exemplars: vec![],
});
}
timeseries.push(TimeSeries {
labels: convert_label(
m.get_label(),
format!("{}_sum", mf_name).as_str(),
None,
),
samples: vec![Sample {
value: s.get_sample_sum(),
timestamp,
}],
exemplars: vec![],
});
timeseries.push(TimeSeries {
labels: convert_label(
m.get_label(),
format!("{}_count", mf_name).as_str(),
None,
),
samples: vec![Sample {
value: s.get_sample_count() as f64,
timestamp,
}],
exemplars: vec![],
});
}
MetricType::UNTYPED => {
// `TextEncoder` `MetricType::UNTYPED` unimplemented
// To keep the implementation consistent and not cause unexpected panics, we do nothing here.
}
};
}
}
remote::WriteRequest {
timeseries,
metadata: vec![],
}
}
fn convert_label(
pairs: &[LabelPair],
name: &str,
addon: Option<(&'static str, String)>,
) -> Vec<remote::Label> {
let mut labels = Vec::with_capacity(pairs.len() + 1 + if addon.is_some() { 1 } else { 0 });
for label in pairs {
labels.push(remote::Label {
name: label.get_name().to_string(),
value: label.get_value().to_string(),
});
}
labels.push(remote::Label {
name: "__name__".to_string(),
value: name.to_string(),
});
if let Some(addon) = addon {
labels.push(remote::Label {
name: addon.0.to_string(),
value: addon.1,
});
}
// Remote write protocol need label names sorted in lexicographical order.
labels.sort_unstable_by(|a, b| a.name.cmp(&b.name));
labels
}
#[cfg(test)]
mod test {
use std::sync::Arc;
use prometheus::core::Collector;
use prometheus::proto::{LabelPair, MetricFamily, MetricType};
use prometheus::{Counter, Gauge, Histogram, HistogramOpts, Opts};
use super::convert_label;
use crate::metric::{convert_metric_to_write_request, MetricFilter};
#[test]
fn test_convert_label() {
let pairs = vec![
{
let mut pair = LabelPair::new();
pair.set_name(String::from("a"));
pair.set_value(String::from("b"));
pair
},
{
let mut pair = LabelPair::new();
pair.set_name(String::from("e"));
pair.set_value(String::from("g"));
pair
},
];
let label1 = convert_label(&pairs, "label1", None);
assert_eq!(
format!("{:?}", label1),
r#"[Label { name: "__name__", value: "label1" }, Label { name: "a", value: "b" }, Label { name: "e", value: "g" }]"#
);
let label2 = convert_label(&pairs, "label2", Some(("c", "c".to_string())));
assert_eq!(
format!("{:?}", label2),
r#"[Label { name: "__name__", value: "label2" }, Label { name: "a", value: "b" }, Label { name: "c", value: "c" }, Label { name: "e", value: "g" }]"#
);
}
#[test]
fn test_write_request_encoder() {
let counter_opts = Opts::new("test_counter", "test help")
.const_label("a", "1")
.const_label("b", "2");
let counter = Counter::with_opts(counter_opts).unwrap();
counter.inc();
let mf = counter.collect();
let write_quest = convert_metric_to_write_request(mf, None, 0);
assert_eq!(
format!("{:?}", write_quest.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }]"#
);
let gauge_opts = Opts::new("test_gauge", "test help")
.const_label("a", "1")
.const_label("b", "2");
let gauge = Gauge::with_opts(gauge_opts).unwrap();
gauge.inc();
gauge.set(42.0);
let mf = gauge.collect();
let write_quest = convert_metric_to_write_request(mf, None, 0);
assert_eq!(
format!("{:?}", write_quest.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_gauge" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 42.0, timestamp: 0 }], exemplars: [] }]"#
);
}
#[test]
fn test_write_request_histogram() {
let opts = HistogramOpts::new("test_histogram", "test help").const_label("a", "1");
let histogram = Histogram::with_opts(opts).unwrap();
histogram.observe(0.25);
let mf = histogram.collect();
let write_quest = convert_metric_to_write_request(mf, None, 0);
let write_quest_str: Vec<_> = write_quest
.timeseries
.iter()
.map(|x| format!("{:?}", x))
.collect();
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.005" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.01" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.025" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.05" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.1" }], samples: [Sample { value: 0.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.25" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "0.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "2.5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "5" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "10" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_bucket" }, Label { name: "a", value: "1" }, Label { name: "le", value: "+Inf" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_sum" }, Label { name: "a", value: "1" }], samples: [Sample { value: 0.25, timestamp: 0 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_histogram_count" }, Label { name: "a", value: "1" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }"#;
assert_eq!(write_quest_str.join("\n"), ans);
}
#[test]
fn test_write_request_summary() {
use prometheus::proto::{Metric, Quantile, Summary};
let mut metric_family = MetricFamily::default();
metric_family.set_name("test_summary".to_string());
metric_family.set_help("This is a test summary statistic".to_string());
metric_family.set_field_type(MetricType::SUMMARY);
let mut summary = Summary::default();
summary.set_sample_count(5.0 as u64);
summary.set_sample_sum(15.0);
let mut quantile1 = Quantile::default();
quantile1.set_quantile(50.0);
quantile1.set_value(3.0);
let mut quantile2 = Quantile::default();
quantile2.set_quantile(100.0);
quantile2.set_value(5.0);
summary.set_quantile(vec![quantile1, quantile2].into());
let mut metric = Metric::default();
metric.set_summary(summary);
metric_family.set_metric(vec![metric].into());
let write_quest = convert_metric_to_write_request(vec![metric_family], None, 20);
let write_quest_str: Vec<_> = write_quest
.timeseries
.iter()
.map(|x| format!("{:?}", x))
.collect();
let ans = r#"TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "50" }], samples: [Sample { value: 3.0, timestamp: 20 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary" }, Label { name: "quantile", value: "100" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_sum" }], samples: [Sample { value: 15.0, timestamp: 20 }], exemplars: [] }
TimeSeries { labels: [Label { name: "__name__", value: "test_summary_count" }], samples: [Sample { value: 5.0, timestamp: 20 }], exemplars: [] }"#;
assert_eq!(write_quest_str.join("\n"), ans);
}
#[test]
fn test_metric_filter() {
let counter_opts = Opts::new("filter_counter", "test help")
.const_label("a", "1")
.const_label("b", "2");
let counter_1 = Counter::with_opts(counter_opts).unwrap();
counter_1.inc_by(1.0);
let counter_opts = Opts::new("test_counter", "test help")
.const_label("a", "1")
.const_label("b", "2");
let counter_2 = Counter::with_opts(counter_opts).unwrap();
counter_2.inc_by(2.0);
let mut mf = counter_1.collect();
mf.append(&mut counter_2.collect());
let filter = MetricFilter::new(Arc::new(|mf: &MetricFamily| {
!mf.get_name().starts_with("filter")
}));
let write_quest1 = convert_metric_to_write_request(mf.clone(), None, 0);
let write_quest2 = convert_metric_to_write_request(mf, Some(&filter), 0);
assert_eq!(
format!("{:?}", write_quest1.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "filter_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 1.0, timestamp: 0 }], exemplars: [] }, TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
);
assert_eq!(
format!("{:?}", write_quest2.timeseries),
r#"[TimeSeries { labels: [Label { name: "__name__", value: "test_counter" }, Label { name: "a", value: "1" }, Label { name: "b", value: "2" }], samples: [Sample { value: 2.0, timestamp: 0 }], exemplars: [] }]"#
);
}
}

View File

@@ -28,6 +28,7 @@ use meta_client::MetaClientOptions;
use mito2::config::MitoConfig;
use secrecy::SecretString;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
@@ -242,6 +243,7 @@ pub struct DatanodeOptions {
pub region_engine: Vec<RegionEngineConfig>,
pub logging: LoggingOptions,
pub enable_telemetry: bool,
pub export_metrics: ExportMetricsOption,
}
impl Default for DatanodeOptions {
@@ -267,6 +269,7 @@ impl Default for DatanodeOptions {
logging: LoggingOptions::default(),
heartbeat: HeartbeatOptions::datanode_default(),
enable_telemetry: true,
export_metrics: ExportMetricsOption::default(),
}
}
}

View File

@@ -43,6 +43,7 @@ use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::grpc::{GrpcServer, GrpcServerConfig};
use servers::http::HttpServerBuilder;
use servers::metrics_handler::MetricsHandler;
@@ -84,6 +85,7 @@ pub struct Datanode {
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
leases_notifier: Option<Arc<Notify>>,
plugins: Plugins,
export_metrics_task: Option<ExportMetricsTask>,
}
impl Datanode {
@@ -95,6 +97,10 @@ impl Datanode {
self.start_telemetry();
if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
}
self.start_services().await
}
@@ -277,6 +283,10 @@ impl DatanodeBuilder {
None
};
let export_metrics_task =
ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
.context(StartServerSnafu)?;
Ok(Datanode {
services,
heartbeat_task,
@@ -285,6 +295,7 @@ impl DatanodeBuilder {
region_event_receiver,
leases_notifier,
plugins: self.plugins.clone(),
export_metrics_task,
})
}

View File

@@ -15,6 +15,7 @@
use common_telemetry::logging::LoggingOptions;
use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions;
use servers::Mode;
@@ -44,6 +45,7 @@ pub struct FrontendOptions {
pub logging: LoggingOptions,
pub datanode: DatanodeOptions,
pub user_provider: Option<String>,
pub export_metrics: ExportMetricsOption,
}
impl Default for FrontendOptions {
@@ -64,6 +66,7 @@ impl Default for FrontendOptions {
logging: LoggingOptions::default(),
datanode: DatanodeOptions::default(),
user_provider: None,
export_metrics: ExportMetricsOption::default(),
}
}
}

View File

@@ -55,6 +55,7 @@ use query::QueryEngineRef;
use raft_engine::{Config, ReadableSize, RecoveryMode};
use servers::error as server_error;
use servers::error::{AuthSnafu, ExecuteQuerySnafu, ParsePromQLSnafu};
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
use servers::interceptor::{
PromQueryInterceptor, PromQueryInterceptorRef, SqlQueryInterceptor, SqlQueryInterceptorRef,
};
@@ -77,7 +78,8 @@ pub use standalone::StandaloneDatanodeManager;
use crate::error::{
self, Error, ExecLogicalPlanSnafu, ExecutePromqlSnafu, ExternalSnafu, ParseSqlSnafu,
PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, TableOperationSnafu,
PermissionSnafu, PlanStatementSnafu, Result, SqlExecInterceptedSnafu, StartServerSnafu,
TableOperationSnafu,
};
use crate::frontend::{FrontendOptions, TomlSerializable};
use crate::heartbeat::HeartbeatTask;
@@ -116,6 +118,7 @@ pub struct Instance {
heartbeat_task: Option<HeartbeatTask>,
inserter: InserterRef,
deleter: DeleterRef,
export_metrics_task: Option<ExportMetricsTask>,
}
impl Instance {
@@ -193,6 +196,12 @@ impl Instance {
Ok(())
}
pub fn build_export_metrics_task(&mut self, opts: &ExportMetricsOption) -> Result<()> {
self.export_metrics_task =
ExportMetricsTask::try_new(opts, Some(&self.plugins)).context(StartServerSnafu)?;
Ok(())
}
pub fn catalog_manager(&self) -> &CatalogManagerRef {
&self.catalog_manager
}
@@ -222,6 +231,10 @@ impl FrontendInstance for Instance {
self.script_executor.start(self)?;
if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
}
futures::future::try_join_all(self.servers.iter().map(|(name, handler)| async move {
info!("Starting service: {name}");
start_server(handler).await

View File

@@ -144,6 +144,7 @@ impl FrontendBuilder {
heartbeat_task: self.heartbeat_task,
inserter,
deleter,
export_metrics_task: None,
})
}
}

View File

@@ -26,6 +26,7 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
use common_telemetry::info;
use etcd_client::Client;
use servers::configurator::ConfiguratorRef;
use servers::export_metrics::ExportMetricsTask;
use servers::http::{HttpServer, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::server::Server;
@@ -36,6 +37,7 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use tonic::transport::server::{Router, TcpIncoming};
use crate::election::etcd::EtcdElection;
use crate::error::InitExportMetricsTaskSnafu;
use crate::lock::etcd::EtcdLock;
use crate::lock::memory::MemLock;
use crate::metasrv::builder::MetaSrvBuilder;
@@ -57,6 +59,8 @@ pub struct MetaSrvInstance {
signal_sender: Option<Sender<()>>,
plugins: Plugins,
export_metrics_task: Option<ExportMetricsTask>,
}
impl MetaSrvInstance {
@@ -73,18 +77,25 @@ impl MetaSrvInstance {
);
// put meta_srv into plugins for later use
plugins.insert::<Arc<MetaSrv>>(Arc::new(meta_srv.clone()));
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
.context(InitExportMetricsTaskSnafu)?;
Ok(MetaSrvInstance {
meta_srv,
http_srv,
opts,
signal_sender: None,
plugins,
export_metrics_task,
})
}
pub async fn start(&mut self) -> Result<()> {
self.meta_srv.try_start().await?;
if let Some(t) = self.export_metrics_task.as_ref() {
t.start()
}
let (tx, rx) = mpsc::channel::<()>(1);
self.signal_sender = Some(tx);

View File

@@ -196,6 +196,11 @@ pub enum Error {
location: Location,
source: servers::error::Error,
},
#[snafu(display("Failed to init export metrics task"))]
InitExportMetricsTask {
location: Location,
source: servers::error::Error,
},
#[snafu(display("Failed to parse address {}", addr))]
ParseAddr {
addr: String,
@@ -651,6 +656,7 @@ impl ErrorExt for Error {
| Error::ParseNum { .. }
| Error::UnsupportedSelectorType { .. }
| Error::InvalidArguments { .. }
| Error::InitExportMetricsTask { .. }
| Error::InvalidHeartbeatRequest { .. }
| Error::TooManyPartitions { .. } => StatusCode::InvalidArguments,
Error::LeaseKeyFromUtf8 { .. }

View File

@@ -32,6 +32,7 @@ use common_procedure::ProcedureManagerRef;
use common_telemetry::logging::LoggingOptions;
use common_telemetry::{error, info, warn};
use serde::{Deserialize, Serialize};
use servers::export_metrics::ExportMetricsOption;
use servers::http::HttpOptions;
use snafu::ResultExt;
use table::metadata::TableId;
@@ -72,6 +73,7 @@ pub struct MetaSrvOptions {
pub enable_telemetry: bool,
pub data_home: String,
pub wal: WalConfig,
pub export_metrics: ExportMetricsOption,
}
impl Default for MetaSrvOptions {
@@ -97,6 +99,7 @@ impl Default for MetaSrvOptions {
enable_telemetry: true,
data_home: METASRV_HOME.to_string(),
wal: WalConfig::default(),
export_metrics: ExportMetricsOption::default(),
}
}
}

View File

@@ -72,6 +72,7 @@ prost.workspace = true
query.workspace = true
rand.workspace = true
regex.workspace = true
reqwest.workspace = true
rust-embed = { version = "6.6", features = ["debug-embed"] }
rustls = "0.22"
rustls-pemfile = "2.0"

View File

@@ -214,6 +214,16 @@ pub enum Error {
error: snap::Error,
},
#[snafu(display("Failed to send prometheus remote request"))]
SendPromRemoteRequest {
location: Location,
#[snafu(source)]
error: reqwest::Error,
},
#[snafu(display("Invalid export metrics config, msg: {}", msg))]
InvalidExportMetricsConfig { msg: String, location: Location },
#[snafu(display("Failed to compress prometheus remote request"))]
CompressPromRemoteRequest {
location: Location,
@@ -427,6 +437,7 @@ impl ErrorExt for Error {
| AlreadyStarted { .. }
| InvalidPromRemoteReadQueryResult { .. }
| TcpBind { .. }
| SendPromRemoteRequest { .. }
| TcpIncoming { .. }
| CatalogError { .. }
| GrpcReflectionService { .. }
@@ -457,6 +468,7 @@ impl ErrorExt for Error {
| CompressPromRemoteRequest { .. }
| DecompressPromRemoteRequest { .. }
| InvalidPromRemoteRequest { .. }
| InvalidExportMetricsConfig { .. }
| InvalidFlightTicket { .. }
| InvalidPrepareStatement { .. }
| DataFrame { .. }

View File

@@ -0,0 +1,179 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::time::Duration;
use axum::http::HeaderValue;
use common_base::Plugins;
use common_telemetry::metric::{convert_metric_to_write_request, MetricFilter};
use common_telemetry::{error, info};
use common_time::Timestamp;
use hyper::HeaderMap;
use prost::Message;
use reqwest::header::HeaderName;
use reqwest::{Client, Response};
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use tokio::time;
use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu};
use crate::prom_store::snappy_compress;
/// Use to export the metrics generated by greptimedb, encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/),
/// and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself)
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
pub struct ExportMetricsOption {
pub enable: bool,
pub endpoint: String,
pub db: String,
#[serde(with = "humantime_serde")]
pub write_interval: Duration,
pub headers: HashMap<String, String>,
}
impl Default for ExportMetricsOption {
fn default() -> Self {
Self {
enable: false,
endpoint: "127.0.0.1:4000".to_string(),
db: String::new(),
write_interval: Duration::from_secs(30),
headers: HashMap::new(),
}
}
}
#[derive(Default, Clone)]
pub struct ExportMetricsTask {
config: ExportMetricsOption,
filter: Option<MetricFilter>,
headers: HeaderMap<HeaderValue>,
}
impl ExportMetricsTask {
pub fn try_new(
config: &ExportMetricsOption,
plugins: Option<&Plugins>,
) -> Result<Option<Self>> {
if !config.enable {
return Ok(None);
}
let filter = plugins.map(|p| p.get::<MetricFilter>()).unwrap_or(None);
ensure!(
config.write_interval.as_secs() != 0,
InvalidExportMetricsConfigSnafu {
msg: "Expected export metrics write_interval greater than zero"
}
);
ensure!(
!config.db.is_empty(),
InvalidExportMetricsConfigSnafu {
msg: "Expected export metrics db not empty"
}
);
// construct http header
let mut headers = reqwest::header::HeaderMap::with_capacity(config.headers.len());
config.headers.iter().try_for_each(|(k, v)| {
let header = match TryInto::<HeaderName>::try_into(k) {
Ok(header) => header,
Err(_) => {
return InvalidExportMetricsConfigSnafu {
msg: format!("Export metrics: invalid HTTP header name: {}", k),
}
.fail()
}
};
match TryInto::<HeaderValue>::try_into(v) {
Ok(value) => headers.insert(header, value),
Err(_) => {
return InvalidExportMetricsConfigSnafu {
msg: format!("Export metrics: invalid HTTP header value: {}", v),
}
.fail()
}
};
Ok(())
})?;
Ok(Some(Self {
config: config.clone(),
filter,
headers,
}))
}
pub fn start(&self) {
if !self.config.enable {
return;
}
let mut interval = time::interval(self.config.write_interval);
let sec = self.config.write_interval.as_secs();
let endpoint = format!(
"http://{}/v1/prometheus/write?db={}",
self.config.endpoint, self.config.db
);
let filter = self.filter.clone();
let headers = self.headers.clone();
let _handle = common_runtime::spawn_bg(async move {
info!(
"Start export metrics task to endpoint: {}, interval: {}s",
endpoint, sec
);
// Pass the first tick. Because the first tick completes immediately.
interval.tick().await;
let client = reqwest::Client::new();
loop {
interval.tick().await;
match write_system_metric(&client, &endpoint, filter.as_ref(), headers.clone())
.await
{
Ok(resp) => {
if !resp.status().is_success() {
error!("report export metrics error, msg: {:#?}", resp);
}
}
Err(e) => error!("report export metrics failed, error {}", e),
};
}
});
}
}
/// Export the collected metrics, encode metrics into [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/),
/// and send metrics to Prometheus remote-write compatible receiver (e.g. `greptimedb`) specified by `url`.
/// User could use `MetricFilter` to filter metric they don't want collect
pub async fn write_system_metric(
client: &Client,
url: &str,
filter: Option<&MetricFilter>,
headers: HeaderMap<HeaderValue>,
) -> Result<Response> {
let metric_families = prometheus::gather();
let request = convert_metric_to_write_request(
metric_families,
filter,
Timestamp::current_millis().value(),
);
// RemoteWrite format require compress by snappy
client
.post(url)
.header("X-Prometheus-Remote-Write-Version", "0.1.0")
.header("Content-Type", "application/x-protobuf")
.headers(headers)
.body(snappy_compress(&request.encode_to_vec())?)
.send()
.await
.context(SendPromRemoteRequestSnafu)
}

View File

@@ -22,6 +22,7 @@ use serde::{Deserialize, Serialize};
pub mod configurator;
pub mod error;
pub mod export_metrics;
pub mod grpc;
pub mod heartbeat_options;
pub mod http;

View File

@@ -753,6 +753,13 @@ timeout = "10s"
connect_timeout = "1s"
tcp_nodelay = true
[frontend.export_metrics]
enable = false
db = ""
write_interval = "30s"
[frontend.export_metrics.headers]
[datanode]
mode = "standalone"
node_id = 0
@@ -809,6 +816,13 @@ parallel_scan_channel_size = 32
[datanode.logging]
enable_otlp_tracing = false
[datanode.export_metrics]
enable = false
db = ""
write_interval = "30s"
[datanode.export_metrics.headers]
[logging]
enable_otlp_tracing = false"#,
store_type,