mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
refactor: remove export_metrics and related configuration (#7236)
Signed-off-by: WaterWhisperer <waterwhisperer24@qq.com>
This commit is contained in:
@@ -210,14 +210,6 @@
|
||||
| `slow_query.record_type` | String | Unset | The record type of slow queries. It can be `system_table` or `log`. |
|
||||
| `slow_query.threshold` | String | Unset | The threshold of slow query. |
|
||||
| `slow_query.sample_ratio` | Float | Unset | The sampling ratio of slow query log. The value should be in the range of (0, 1]. |
|
||||
| `export_metrics` | -- | -- | The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.self_import` | -- | -- | For `standalone` mode, `self_import` is recommended to collect metrics generated by itself<br/>You must create the database before enabling it. |
|
||||
| `export_metrics.self_import.db` | String | Unset | -- |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
| `memory` | -- | -- | The memory options. |
|
||||
@@ -335,12 +327,6 @@
|
||||
| `slow_query.threshold` | String | `30s` | The threshold of slow query. It can be human readable time string, for example: `10s`, `100ms`, `1s`. |
|
||||
| `slow_query.sample_ratio` | Float | `1.0` | The sampling ratio of slow query log. The value should be in the range of (0, 1]. For example, `0.1` means 10% of the slow queries will be logged and `1.0` means all slow queries will be logged. |
|
||||
| `slow_query.ttl` | String | `90d` | The TTL of the `slow_queries` system table. Default is `90d` when `record_type` is `system_table`. |
|
||||
| `export_metrics` | -- | -- | The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
| `memory` | -- | -- | The memory options. |
|
||||
@@ -430,12 +416,6 @@
|
||||
| `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http |
|
||||
| `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `export_metrics` | -- | -- | The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
| `memory` | -- | -- | The memory options. |
|
||||
@@ -608,12 +588,6 @@
|
||||
| `logging.otlp_headers` | -- | -- | Additional OTLP headers, only valid when using OTLP http |
|
||||
| `logging.tracing_sample_ratio` | -- | Unset | The percentage of tracing will be sampled and exported.<br/>Valid range `[0, 1]`, 1 means all traces are sampled, 0 means all traces are not sampled, the default value is 1.<br/>ratio > 1 are treated as 1. Fractions < 0 are treated as 0 |
|
||||
| `logging.tracing_sample_ratio.default_ratio` | Float | `1.0` | -- |
|
||||
| `export_metrics` | -- | -- | The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.<br/>This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape. |
|
||||
| `export_metrics.enable` | Bool | `false` | whether enable export metrics. |
|
||||
| `export_metrics.write_interval` | String | `30s` | The interval of export metrics. |
|
||||
| `export_metrics.remote_write` | -- | -- | -- |
|
||||
| `export_metrics.remote_write.url` | String | `""` | The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`. |
|
||||
| `export_metrics.remote_write.headers` | InlineTable | -- | HTTP headers of Prometheus remote-write carry. |
|
||||
| `tracing` | -- | -- | The tracing options. Only effect when compiled with `tokio-console` feature. |
|
||||
| `tracing.tokio_console_addr` | String | Unset | The tokio console address. |
|
||||
| `memory` | -- | -- | The memory options. |
|
||||
|
||||
@@ -712,21 +712,6 @@ otlp_export_protocol = "http"
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The datanode can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
headers = { }
|
||||
|
||||
## The tracing options. Only effect when compiled with `tokio-console` feature.
|
||||
#+ [tracing]
|
||||
## The tokio console address.
|
||||
|
||||
@@ -329,21 +329,6 @@ sample_ratio = 1.0
|
||||
## The TTL of the `slow_queries` system table. Default is `90d` when `record_type` is `system_table`.
|
||||
ttl = "90d"
|
||||
|
||||
## The frontend can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
headers = { }
|
||||
|
||||
## The tracing options. Only effect when compiled with `tokio-console` feature.
|
||||
#+ [tracing]
|
||||
## The tokio console address.
|
||||
|
||||
@@ -323,21 +323,6 @@ otlp_export_protocol = "http"
|
||||
[logging.tracing_sample_ratio]
|
||||
default_ratio = 1.0
|
||||
|
||||
## The metasrv can export its metrics and send to Prometheus compatible service (e.g. `greptimedb` itself) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
headers = { }
|
||||
|
||||
## The tracing options. Only effect when compiled with `tokio-console` feature.
|
||||
#+ [tracing]
|
||||
## The tokio console address.
|
||||
|
||||
@@ -820,27 +820,6 @@ default_ratio = 1.0
|
||||
## @toml2docs:none-default
|
||||
#+ sample_ratio = 1.0
|
||||
|
||||
## The standalone can export its metrics and send to Prometheus compatible service (e.g. `greptimedb`) from remote-write API.
|
||||
## This is only used for `greptimedb` to export its own metrics internally. It's different from prometheus scrape.
|
||||
[export_metrics]
|
||||
## whether enable export metrics.
|
||||
enable = false
|
||||
## The interval of export metrics.
|
||||
write_interval = "30s"
|
||||
|
||||
## For `standalone` mode, `self_import` is recommended to collect metrics generated by itself
|
||||
## You must create the database before enabling it.
|
||||
[export_metrics.self_import]
|
||||
## @toml2docs:none-default
|
||||
db = "greptime_metrics"
|
||||
|
||||
[export_metrics.remote_write]
|
||||
## The prometheus remote write endpoint that the metrics send to. The url example can be: `http://127.0.0.1:4000/v1/prometheus/write?db=greptime_metrics`.
|
||||
url = ""
|
||||
|
||||
## HTTP headers of Prometheus remote-write carry.
|
||||
headers = { }
|
||||
|
||||
## The tracing options. Only effect when compiled with `tokio-console` feature.
|
||||
#+ [tracing]
|
||||
## The tokio console address.
|
||||
|
||||
@@ -22,7 +22,6 @@ mod procedure_info;
|
||||
pub mod process_list;
|
||||
pub mod region_peers;
|
||||
mod region_statistics;
|
||||
mod runtime_metrics;
|
||||
pub mod schemata;
|
||||
mod ssts;
|
||||
mod table_constraints;
|
||||
@@ -65,7 +64,6 @@ use crate::system_schema::information_schema::information_memory_table::get_sche
|
||||
use crate::system_schema::information_schema::key_column_usage::InformationSchemaKeyColumnUsage;
|
||||
use crate::system_schema::information_schema::partitions::InformationSchemaPartitions;
|
||||
use crate::system_schema::information_schema::region_peers::InformationSchemaRegionPeers;
|
||||
use crate::system_schema::information_schema::runtime_metrics::InformationSchemaMetrics;
|
||||
use crate::system_schema::information_schema::schemata::InformationSchemaSchemata;
|
||||
use crate::system_schema::information_schema::ssts::{
|
||||
InformationSchemaSstsIndexMeta, InformationSchemaSstsManifest, InformationSchemaSstsStorage,
|
||||
@@ -216,7 +214,6 @@ impl SystemSchemaProviderInner for InformationSchemaProvider {
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
)) as _),
|
||||
RUNTIME_METRICS => Some(Arc::new(InformationSchemaMetrics::new())),
|
||||
PARTITIONS => Some(Arc::new(InformationSchemaPartitions::new(
|
||||
self.catalog_name.clone(),
|
||||
self.catalog_manager.clone(),
|
||||
@@ -311,10 +308,6 @@ impl InformationSchemaProvider {
|
||||
// authentication details, and other critical information.
|
||||
// Only put these tables under `greptime` catalog to prevent info leak.
|
||||
if self.catalog_name == DEFAULT_CATALOG_NAME {
|
||||
tables.insert(
|
||||
RUNTIME_METRICS.to_string(),
|
||||
self.build_table(RUNTIME_METRICS).unwrap(),
|
||||
);
|
||||
tables.insert(
|
||||
BUILD_INFO.to_string(),
|
||||
self.build_table(BUILD_INFO).unwrap(),
|
||||
|
||||
@@ -1,265 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use arrow_schema::SchemaRef as ArrowSchemaRef;
|
||||
use common_catalog::consts::INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::adapter::RecordBatchStreamAdapter;
|
||||
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
|
||||
use common_time::util::current_time_millis;
|
||||
use datafusion::execution::TaskContext;
|
||||
use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
|
||||
use datafusion::physical_plan::stream::RecordBatchStreamAdapter as DfRecordBatchStreamAdapter;
|
||||
use datafusion::physical_plan::streaming::PartitionStream as DfPartitionStream;
|
||||
use datatypes::prelude::{ConcreteDataType, MutableVector};
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
|
||||
use datatypes::vectors::{
|
||||
ConstantVector, Float64VectorBuilder, StringVectorBuilder, TimestampMillisecondVector,
|
||||
VectorRef,
|
||||
};
|
||||
use itertools::Itertools;
|
||||
use snafu::ResultExt;
|
||||
use store_api::storage::{ScanRequest, TableId};
|
||||
|
||||
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
|
||||
use crate::system_schema::information_schema::{InformationTable, RUNTIME_METRICS};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(super) struct InformationSchemaMetrics {
|
||||
schema: SchemaRef,
|
||||
}
|
||||
|
||||
const METRIC_NAME: &str = "metric_name";
|
||||
const METRIC_VALUE: &str = "value";
|
||||
const METRIC_LABELS: &str = "labels";
|
||||
const PEER_ADDR: &str = "peer_addr";
|
||||
const PEER_TYPE: &str = "peer_type";
|
||||
const TIMESTAMP: &str = "timestamp";
|
||||
|
||||
/// The `information_schema.runtime_metrics` virtual table.
|
||||
/// It provides the GreptimeDB runtime metrics for the users by SQL.
|
||||
impl InformationSchemaMetrics {
|
||||
pub(super) fn new() -> Self {
|
||||
Self {
|
||||
schema: Self::schema(),
|
||||
}
|
||||
}
|
||||
|
||||
fn schema() -> SchemaRef {
|
||||
Arc::new(Schema::new(vec![
|
||||
ColumnSchema::new(METRIC_NAME, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(METRIC_VALUE, ConcreteDataType::float64_datatype(), false),
|
||||
ColumnSchema::new(METRIC_LABELS, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(PEER_ADDR, ConcreteDataType::string_datatype(), true),
|
||||
ColumnSchema::new(PEER_TYPE, ConcreteDataType::string_datatype(), false),
|
||||
ColumnSchema::new(
|
||||
TIMESTAMP,
|
||||
ConcreteDataType::timestamp_millisecond_datatype(),
|
||||
false,
|
||||
),
|
||||
]))
|
||||
}
|
||||
|
||||
fn builder(&self) -> InformationSchemaMetricsBuilder {
|
||||
InformationSchemaMetricsBuilder::new(self.schema.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl InformationTable for InformationSchemaMetrics {
|
||||
fn table_id(&self) -> TableId {
|
||||
INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID
|
||||
}
|
||||
|
||||
fn table_name(&self) -> &'static str {
|
||||
RUNTIME_METRICS
|
||||
}
|
||||
|
||||
fn schema(&self) -> SchemaRef {
|
||||
self.schema.clone()
|
||||
}
|
||||
|
||||
fn to_stream(&self, request: ScanRequest) -> Result<SendableRecordBatchStream> {
|
||||
let schema = self.schema.arrow_schema().clone();
|
||||
let mut builder = self.builder();
|
||||
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
|
||||
schema,
|
||||
futures::stream::once(async move {
|
||||
builder
|
||||
.make_metrics(Some(request))
|
||||
.await
|
||||
.map(|x| x.into_df_record_batch())
|
||||
.map_err(Into::into)
|
||||
}),
|
||||
));
|
||||
|
||||
Ok(Box::pin(
|
||||
RecordBatchStreamAdapter::try_new(stream)
|
||||
.map_err(BoxedError::new)
|
||||
.context(InternalSnafu)?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
struct InformationSchemaMetricsBuilder {
|
||||
schema: SchemaRef,
|
||||
|
||||
metric_names: StringVectorBuilder,
|
||||
metric_values: Float64VectorBuilder,
|
||||
metric_labels: StringVectorBuilder,
|
||||
peer_addrs: StringVectorBuilder,
|
||||
peer_types: StringVectorBuilder,
|
||||
}
|
||||
|
||||
impl InformationSchemaMetricsBuilder {
|
||||
fn new(schema: SchemaRef) -> Self {
|
||||
Self {
|
||||
schema,
|
||||
metric_names: StringVectorBuilder::with_capacity(42),
|
||||
metric_values: Float64VectorBuilder::with_capacity(42),
|
||||
metric_labels: StringVectorBuilder::with_capacity(42),
|
||||
peer_addrs: StringVectorBuilder::with_capacity(42),
|
||||
peer_types: StringVectorBuilder::with_capacity(42),
|
||||
}
|
||||
}
|
||||
|
||||
fn add_metric(
|
||||
&mut self,
|
||||
metric_name: &str,
|
||||
labels: String,
|
||||
metric_value: f64,
|
||||
peer: Option<&str>,
|
||||
peer_type: &str,
|
||||
) {
|
||||
self.metric_names.push(Some(metric_name));
|
||||
self.metric_values.push(Some(metric_value));
|
||||
self.metric_labels.push(Some(&labels));
|
||||
self.peer_addrs.push(peer);
|
||||
self.peer_types.push(Some(peer_type));
|
||||
}
|
||||
|
||||
async fn make_metrics(&mut self, _request: Option<ScanRequest>) -> Result<RecordBatch> {
|
||||
let metric_families = prometheus::gather();
|
||||
|
||||
let write_request =
|
||||
common_telemetry::metric::convert_metric_to_write_request(metric_families, None, 0);
|
||||
|
||||
for ts in write_request.timeseries {
|
||||
//Safety: always has `__name__` label
|
||||
let metric_name = ts
|
||||
.labels
|
||||
.iter()
|
||||
.find_map(|label| {
|
||||
if label.name == "__name__" {
|
||||
Some(label.value.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
self.add_metric(
|
||||
&metric_name,
|
||||
ts.labels
|
||||
.into_iter()
|
||||
.filter_map(|label| {
|
||||
if label.name == "__name__" {
|
||||
None
|
||||
} else {
|
||||
Some(format!("{}={}", label.name, label.value))
|
||||
}
|
||||
})
|
||||
.join(", "),
|
||||
// Safety: always has a sample
|
||||
ts.samples[0].value,
|
||||
// The peer column is always `None` for standalone
|
||||
None,
|
||||
"STANDALONE",
|
||||
);
|
||||
}
|
||||
|
||||
// FIXME(dennis): fetching other peers metrics
|
||||
self.finish()
|
||||
}
|
||||
|
||||
fn finish(&mut self) -> Result<RecordBatch> {
|
||||
let rows_num = self.metric_names.len();
|
||||
|
||||
let timestamps = Arc::new(ConstantVector::new(
|
||||
Arc::new(TimestampMillisecondVector::from_slice([
|
||||
current_time_millis(),
|
||||
])),
|
||||
rows_num,
|
||||
));
|
||||
|
||||
let columns: Vec<VectorRef> = vec![
|
||||
Arc::new(self.metric_names.finish()),
|
||||
Arc::new(self.metric_values.finish()),
|
||||
Arc::new(self.metric_labels.finish()),
|
||||
Arc::new(self.peer_addrs.finish()),
|
||||
Arc::new(self.peer_types.finish()),
|
||||
timestamps,
|
||||
];
|
||||
|
||||
RecordBatch::new(self.schema.clone(), columns).context(CreateRecordBatchSnafu)
|
||||
}
|
||||
}
|
||||
|
||||
impl DfPartitionStream for InformationSchemaMetrics {
|
||||
fn schema(&self) -> &ArrowSchemaRef {
|
||||
self.schema.arrow_schema()
|
||||
}
|
||||
|
||||
fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
|
||||
let schema = self.schema.arrow_schema().clone();
|
||||
let mut builder = self.builder();
|
||||
Box::pin(DfRecordBatchStreamAdapter::new(
|
||||
schema,
|
||||
futures::stream::once(async move {
|
||||
builder
|
||||
.make_metrics(None)
|
||||
.await
|
||||
.map(|x| x.into_df_record_batch())
|
||||
.map_err(Into::into)
|
||||
}),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use common_recordbatch::RecordBatches;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_make_metrics() {
|
||||
let metrics = InformationSchemaMetrics::new();
|
||||
|
||||
let stream = metrics.to_stream(ScanRequest::default()).unwrap();
|
||||
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
|
||||
let result_literal = batches.pretty_print().unwrap();
|
||||
|
||||
assert!(result_literal.contains(METRIC_NAME));
|
||||
assert!(result_literal.contains(METRIC_VALUE));
|
||||
assert!(result_literal.contains(METRIC_LABELS));
|
||||
assert!(result_literal.contains(PEER_ADDR));
|
||||
assert!(result_literal.contains(PEER_TYPE));
|
||||
assert!(result_literal.contains(TIMESTAMP));
|
||||
}
|
||||
}
|
||||
@@ -38,7 +38,6 @@ pub const TABLE_PRIVILEGES: &str = "table_privileges";
|
||||
pub const TRIGGERS: &str = "triggers";
|
||||
pub const GLOBAL_STATUS: &str = "global_status";
|
||||
pub const SESSION_STATUS: &str = "session_status";
|
||||
pub const RUNTIME_METRICS: &str = "runtime_metrics";
|
||||
pub const PARTITIONS: &str = "partitions";
|
||||
pub const REGION_PEERS: &str = "region_peers";
|
||||
pub const TABLE_CONSTRAINTS: &str = "table_constraints";
|
||||
|
||||
@@ -99,13 +99,6 @@ pub enum Error {
|
||||
source: flow::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Servers error"))]
|
||||
Servers {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: servers::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to start frontend"))]
|
||||
StartFrontend {
|
||||
#[snafu(implicit)]
|
||||
@@ -336,7 +329,6 @@ impl ErrorExt for Error {
|
||||
Error::ShutdownFrontend { source, .. } => source.status_code(),
|
||||
Error::StartMetaServer { source, .. } => source.status_code(),
|
||||
Error::ShutdownMetaServer { source, .. } => source.status_code(),
|
||||
Error::Servers { source, .. } => source.status_code(),
|
||||
Error::BuildMetaServer { source, .. } => source.status_code(),
|
||||
Error::UnsupportedSelectorType { source, .. } => source.status_code(),
|
||||
Error::BuildCli { source, .. } => source.status_code(),
|
||||
|
||||
@@ -43,7 +43,6 @@ use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::server::Services;
|
||||
use meta_client::{MetaClientOptions, MetaClientType};
|
||||
use servers::addrs;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
@@ -458,9 +457,6 @@ impl StartCommand {
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
let instance = Arc::new(instance);
|
||||
|
||||
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
|
||||
.context(error::ServersSnafu)?;
|
||||
|
||||
let servers = Services::new(opts, instance.clone(), plugins)
|
||||
.build()
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
@@ -469,7 +465,6 @@ impl StartCommand {
|
||||
instance,
|
||||
servers,
|
||||
heartbeat_task,
|
||||
export_metrics_task,
|
||||
};
|
||||
|
||||
Ok(Instance::new(frontend, guard))
|
||||
|
||||
@@ -57,7 +57,6 @@ use frontend::instance::StandaloneDatanodeManager;
|
||||
use frontend::instance::builder::FrontendBuilder;
|
||||
use frontend::server::Services;
|
||||
use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
use snafu::ResultExt;
|
||||
use standalone::StandaloneInformationExtension;
|
||||
@@ -565,9 +564,6 @@ impl StartCommand {
|
||||
.context(StartFlownodeSnafu)?;
|
||||
flow_streaming_engine.set_frontend_invoker(invoker).await;
|
||||
|
||||
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
|
||||
.context(error::ServersSnafu)?;
|
||||
|
||||
let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
|
||||
.build()
|
||||
.context(error::StartFrontendSnafu)?;
|
||||
@@ -576,7 +572,6 @@ impl StartCommand {
|
||||
instance: fe_instance,
|
||||
servers,
|
||||
heartbeat_task: None,
|
||||
export_metrics_task,
|
||||
};
|
||||
|
||||
#[cfg(feature = "enterprise")]
|
||||
|
||||
@@ -31,7 +31,6 @@ use meta_srv::selector::SelectorType;
|
||||
use metric_engine::config::EngineConfig as MetricEngineConfig;
|
||||
use mito2::config::MitoConfig;
|
||||
use query::options::QueryOptions;
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use servers::tls::{TlsMode, TlsOption};
|
||||
@@ -95,11 +94,6 @@ fn test_load_datanode_example_config() {
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
grpc: GrpcOptions::default()
|
||||
.with_bind_addr("127.0.0.1:3001")
|
||||
.with_server_addr("127.0.0.1:3001"),
|
||||
@@ -146,11 +140,6 @@ fn test_load_frontend_example_config() {
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
grpc: GrpcOptions {
|
||||
bind_addr: "127.0.0.1:4001".to_string(),
|
||||
server_addr: "127.0.0.1:4001".to_string(),
|
||||
@@ -201,11 +190,6 @@ fn test_load_metasrv_example_config() {
|
||||
tcp_nodelay: true,
|
||||
},
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: None,
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
backend_tls: Some(TlsOption {
|
||||
mode: TlsMode::Prefer,
|
||||
cert_path: String::new(),
|
||||
@@ -317,11 +301,6 @@ fn test_load_standalone_example_config() {
|
||||
tracing_sample_ratio: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
export_metrics: ExportMetricsOption {
|
||||
self_import: Some(Default::default()),
|
||||
remote_write: Some(Default::default()),
|
||||
..Default::default()
|
||||
},
|
||||
http: HttpOptions {
|
||||
cors_allowed_origins: vec!["https://example.com".to_string()],
|
||||
..Default::default()
|
||||
|
||||
@@ -86,8 +86,6 @@ pub const INFORMATION_SCHEMA_TRIGGERS_TABLE_ID: u32 = 24;
|
||||
pub const INFORMATION_SCHEMA_GLOBAL_STATUS_TABLE_ID: u32 = 25;
|
||||
/// id for information_schema.SESSION_STATUS
|
||||
pub const INFORMATION_SCHEMA_SESSION_STATUS_TABLE_ID: u32 = 26;
|
||||
/// id for information_schema.RUNTIME_METRICS
|
||||
pub const INFORMATION_SCHEMA_RUNTIME_METRICS_TABLE_ID: u32 = 27;
|
||||
/// id for information_schema.PARTITIONS
|
||||
pub const INFORMATION_SCHEMA_PARTITIONS_TABLE_ID: u32 = 28;
|
||||
/// id for information_schema.REGION_PEERS
|
||||
|
||||
@@ -28,7 +28,6 @@ use mito2::config::MitoConfig;
|
||||
pub(crate) use object_store::config::ObjectStoreConfig;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
@@ -82,7 +81,6 @@ pub struct DatanodeOptions {
|
||||
pub region_engine: Vec<RegionEngineConfig>,
|
||||
pub logging: LoggingOptions,
|
||||
pub enable_telemetry: bool,
|
||||
pub export_metrics: ExportMetricsOption,
|
||||
pub tracing: TracingOptions,
|
||||
pub query: QueryOptions,
|
||||
pub memory: MemoryOptions,
|
||||
@@ -138,7 +136,6 @@ impl Default for DatanodeOptions {
|
||||
logging: LoggingOptions::default(),
|
||||
heartbeat: HeartbeatOptions::datanode_default(),
|
||||
enable_telemetry: true,
|
||||
export_metrics: ExportMetricsOption::default(),
|
||||
tracing: TracingOptions::default(),
|
||||
query: QueryOptions::default(),
|
||||
memory: MemoryOptions::default(),
|
||||
|
||||
@@ -48,7 +48,6 @@ use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
|
||||
use object_store::util::normalize_dir;
|
||||
use query::QueryEngineFactory;
|
||||
use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::server::ServerHandlers;
|
||||
use snafu::{OptionExt, ResultExt, ensure};
|
||||
use store_api::path_utils::WAL_DIR;
|
||||
@@ -84,7 +83,6 @@ pub struct Datanode {
|
||||
greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
|
||||
leases_notifier: Option<Arc<Notify>>,
|
||||
plugins: Plugins,
|
||||
export_metrics_task: Option<ExportMetricsTask>,
|
||||
}
|
||||
|
||||
impl Datanode {
|
||||
@@ -96,10 +94,6 @@ impl Datanode {
|
||||
|
||||
self.start_telemetry();
|
||||
|
||||
if let Some(t) = self.export_metrics_task.as_ref() {
|
||||
t.start(None).context(StartServerSnafu)?
|
||||
}
|
||||
|
||||
self.services.start_all().await.context(StartServerSnafu)
|
||||
}
|
||||
|
||||
@@ -319,10 +313,6 @@ impl DatanodeBuilder {
|
||||
None
|
||||
};
|
||||
|
||||
let export_metrics_task =
|
||||
ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
|
||||
.context(StartServerSnafu)?;
|
||||
|
||||
Ok(Datanode {
|
||||
services: ServerHandlers::default(),
|
||||
heartbeat_task,
|
||||
@@ -331,7 +321,6 @@ impl DatanodeBuilder {
|
||||
region_event_receiver,
|
||||
leases_notifier,
|
||||
plugins: self.plugins.clone(),
|
||||
export_metrics_task,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ use common_telemetry::logging::{LoggingOptions, SlowQueryOptions, TracingOptions
|
||||
use meta_client::MetaClientOptions;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
@@ -34,7 +33,6 @@ use crate::error;
|
||||
use crate::error::Result;
|
||||
use crate::heartbeat::HeartbeatTask;
|
||||
use crate::instance::Instance;
|
||||
use crate::instance::prom_store::ExportMetricHandler;
|
||||
use crate::service_config::{
|
||||
InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, OtlpOptions, PostgresOptions,
|
||||
PromStoreOptions,
|
||||
@@ -63,7 +61,6 @@ pub struct FrontendOptions {
|
||||
pub logging: LoggingOptions,
|
||||
pub datanode: DatanodeClientOptions,
|
||||
pub user_provider: Option<String>,
|
||||
pub export_metrics: ExportMetricsOption,
|
||||
pub tracing: TracingOptions,
|
||||
pub query: QueryOptions,
|
||||
pub max_in_flight_write_bytes: Option<ReadableSize>,
|
||||
@@ -94,7 +91,6 @@ impl Default for FrontendOptions {
|
||||
logging: LoggingOptions::default(),
|
||||
datanode: DatanodeClientOptions::default(),
|
||||
user_provider: None,
|
||||
export_metrics: ExportMetricsOption::default(),
|
||||
tracing: TracingOptions::default(),
|
||||
query: QueryOptions::default(),
|
||||
max_in_flight_write_bytes: None,
|
||||
@@ -117,7 +113,6 @@ pub struct Frontend {
|
||||
pub instance: Arc<Instance>,
|
||||
pub servers: ServerHandlers,
|
||||
pub heartbeat_task: Option<HeartbeatTask>,
|
||||
pub export_metrics_task: Option<ExportMetricsTask>,
|
||||
}
|
||||
|
||||
impl Frontend {
|
||||
@@ -126,17 +121,6 @@ impl Frontend {
|
||||
t.start().await?;
|
||||
}
|
||||
|
||||
if let Some(t) = self.export_metrics_task.as_ref() {
|
||||
if t.send_by_handler {
|
||||
let inserter = self.instance.inserter().clone();
|
||||
let statement_executor = self.instance.statement_executor().clone();
|
||||
let handler = ExportMetricHandler::new_handler(inserter, statement_executor);
|
||||
t.start(Some(handler)).context(error::StartServerSnafu)?
|
||||
} else {
|
||||
t.start(None).context(error::StartServerSnafu)?;
|
||||
}
|
||||
}
|
||||
|
||||
self.servers
|
||||
.start_all()
|
||||
.await
|
||||
|
||||
@@ -30,7 +30,6 @@ use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
|
||||
use common_telemetry::info;
|
||||
use either::Either;
|
||||
use servers::configurator::ConfiguratorRef;
|
||||
use servers::export_metrics::ExportMetricsTask;
|
||||
use servers::http::{HttpServer, HttpServerBuilder};
|
||||
use servers::metrics_handler::MetricsHandler;
|
||||
use servers::server::Server;
|
||||
@@ -70,8 +69,6 @@ pub struct MetasrvInstance {
|
||||
|
||||
plugins: Plugins,
|
||||
|
||||
export_metrics_task: Option<ExportMetricsTask>,
|
||||
|
||||
/// gRPC serving state receiver. Only present if the gRPC server is started.
|
||||
serve_state: Arc<Mutex<Option<oneshot::Receiver<Result<()>>>>>,
|
||||
|
||||
@@ -95,15 +92,12 @@ impl MetasrvInstance {
|
||||
|
||||
// put metasrv into plugins for later use
|
||||
plugins.insert::<Arc<Metasrv>>(metasrv.clone());
|
||||
let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
|
||||
.context(error::InitExportMetricsTaskSnafu)?;
|
||||
Ok(MetasrvInstance {
|
||||
metasrv,
|
||||
http_server: Either::Left(Some(builder)),
|
||||
opts,
|
||||
signal_sender: None,
|
||||
plugins,
|
||||
export_metrics_task,
|
||||
serve_state: Default::default(),
|
||||
bind_addr: None,
|
||||
})
|
||||
@@ -131,10 +125,6 @@ impl MetasrvInstance {
|
||||
|
||||
self.metasrv.try_start().await?;
|
||||
|
||||
if let Some(t) = self.export_metrics_task.as_ref() {
|
||||
t.start(None).context(error::InitExportMetricsTaskSnafu)?
|
||||
}
|
||||
|
||||
let (tx, rx) = mpsc::channel::<()>(1);
|
||||
|
||||
self.signal_sender = Some(tx);
|
||||
|
||||
@@ -304,13 +304,6 @@ pub enum Error {
|
||||
source: servers::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to init export metrics task"))]
|
||||
InitExportMetricsTask {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
source: servers::error::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to parse address {}", addr))]
|
||||
ParseAddr {
|
||||
addr: String,
|
||||
@@ -1061,7 +1054,6 @@ impl ErrorExt for Error {
|
||||
| Error::ParseAddr { .. }
|
||||
| Error::UnsupportedSelectorType { .. }
|
||||
| Error::InvalidArguments { .. }
|
||||
| Error::InitExportMetricsTask { .. }
|
||||
| Error::ProcedureNotFound { .. }
|
||||
| Error::TooManyPartitions { .. }
|
||||
| Error::TomlFormat { .. }
|
||||
|
||||
@@ -52,7 +52,6 @@ use common_telemetry::{error, info, warn};
|
||||
use common_time::util::DefaultSystemTimer;
|
||||
use common_wal::config::MetasrvWalConfig;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
use servers::tls::TlsOption;
|
||||
@@ -169,8 +168,6 @@ pub struct MetasrvOptions {
|
||||
pub data_home: String,
|
||||
/// The WAL options.
|
||||
pub wal: MetasrvWalConfig,
|
||||
/// The metrics export options.
|
||||
pub export_metrics: ExportMetricsOption,
|
||||
/// The store key prefix. If it is not empty, all keys in the store will be prefixed with it.
|
||||
/// This is useful when multiple metasrv clusters share the same store.
|
||||
pub store_key_prefix: String,
|
||||
@@ -234,7 +231,6 @@ impl fmt::Debug for MetasrvOptions {
|
||||
.field("enable_telemetry", &self.enable_telemetry)
|
||||
.field("data_home", &self.data_home)
|
||||
.field("wal", &self.wal)
|
||||
.field("export_metrics", &self.export_metrics)
|
||||
.field("store_key_prefix", &self.store_key_prefix)
|
||||
.field("max_txn_ops", &self.max_txn_ops)
|
||||
.field("flush_stats_factor", &self.flush_stats_factor)
|
||||
@@ -292,7 +288,6 @@ impl Default for MetasrvOptions {
|
||||
enable_telemetry: true,
|
||||
data_home: DEFAULT_DATA_HOME.to_string(),
|
||||
wal: MetasrvWalConfig::default(),
|
||||
export_metrics: ExportMetricsOption::default(),
|
||||
store_key_prefix: String::new(),
|
||||
max_txn_ops: 128,
|
||||
flush_stats_factor: 3,
|
||||
|
||||
@@ -269,21 +269,6 @@ pub enum Error {
|
||||
error: std::io::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to send prometheus remote request"))]
|
||||
SendPromRemoteRequest {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
#[snafu(source)]
|
||||
error: reqwest::Error,
|
||||
},
|
||||
|
||||
#[snafu(display("Invalid export metrics config, msg: {}", msg))]
|
||||
InvalidExportMetricsConfig {
|
||||
msg: String,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Failed to compress prometheus remote request"))]
|
||||
CompressPromRemoteRequest {
|
||||
#[snafu(implicit)]
|
||||
@@ -661,7 +646,6 @@ impl ErrorExt for Error {
|
||||
| StartHttp { .. }
|
||||
| StartGrpc { .. }
|
||||
| TcpBind { .. }
|
||||
| SendPromRemoteRequest { .. }
|
||||
| BuildHttpResponse { .. }
|
||||
| Arrow { .. }
|
||||
| FileWatch { .. } => StatusCode::Internal,
|
||||
@@ -698,7 +682,6 @@ impl ErrorExt for Error {
|
||||
| DecompressSnappyPromRemoteRequest { .. }
|
||||
| DecompressZstdPromRemoteRequest { .. }
|
||||
| InvalidPromRemoteRequest { .. }
|
||||
| InvalidExportMetricsConfig { .. }
|
||||
| InvalidFlightTicket { .. }
|
||||
| InvalidPrepareStatement { .. }
|
||||
| DataFrame { .. }
|
||||
|
||||
@@ -1,369 +0,0 @@
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::Plugins;
|
||||
use common_telemetry::metric::{MetricFilter, convert_metric_to_write_request};
|
||||
use common_telemetry::{error, info};
|
||||
use common_time::Timestamp;
|
||||
use prost::Message;
|
||||
use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContextBuilder;
|
||||
use snafu::{ResultExt, ensure};
|
||||
use tokio::time::{self, Interval};
|
||||
|
||||
use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu};
|
||||
use crate::prom_store::{snappy_compress, to_grpc_row_insert_requests};
|
||||
use crate::query_handler::PromStoreProtocolHandlerRef;
|
||||
|
||||
/// Use to export the metrics generated by greptimedb.
|
||||
///
|
||||
/// Encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/),
|
||||
/// and send to Prometheus remote-write compatible receiver (e.g. send to `greptimedb` itself)
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct ExportMetricsOption {
|
||||
pub enable: bool,
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub write_interval: Duration,
|
||||
pub self_import: Option<SelfImportOption>,
|
||||
pub remote_write: Option<RemoteWriteOption>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Default)]
|
||||
#[serde(default)]
|
||||
pub struct RemoteWriteOption {
|
||||
pub url: String,
|
||||
pub headers: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
|
||||
#[serde(default)]
|
||||
pub struct SelfImportOption {
|
||||
pub db: String,
|
||||
}
|
||||
|
||||
impl Default for SelfImportOption {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
db: "greptime_metrics".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for ExportMetricsOption {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enable: false,
|
||||
write_interval: Duration::from_secs(30),
|
||||
self_import: None,
|
||||
remote_write: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default, Clone)]
|
||||
pub struct ExportMetricsTask {
|
||||
config: ExportMetricsOption,
|
||||
filter: Option<MetricFilter>,
|
||||
headers: HeaderMap<HeaderValue>,
|
||||
pub send_by_handler: bool,
|
||||
}
|
||||
|
||||
impl ExportMetricsTask {
|
||||
pub fn try_new(
|
||||
config: &ExportMetricsOption,
|
||||
plugins: Option<&Plugins>,
|
||||
) -> Result<Option<Self>> {
|
||||
if !config.enable {
|
||||
return Ok(None);
|
||||
}
|
||||
let filter = plugins.map(|p| p.get::<MetricFilter>()).unwrap_or(None);
|
||||
ensure!(
|
||||
config.write_interval.as_secs() != 0,
|
||||
InvalidExportMetricsConfigSnafu {
|
||||
msg: "Expected export metrics write_interval greater than zero"
|
||||
}
|
||||
);
|
||||
ensure!(
|
||||
(config.remote_write.is_none() && config.self_import.is_some())
|
||||
|| (config.remote_write.is_some() && config.self_import.is_none()),
|
||||
InvalidExportMetricsConfigSnafu {
|
||||
msg: "Only one of `self_import` or `remote_write` can be used as the export method"
|
||||
}
|
||||
);
|
||||
if let Some(self_import) = &config.self_import {
|
||||
ensure!(
|
||||
!self_import.db.is_empty(),
|
||||
InvalidExportMetricsConfigSnafu {
|
||||
msg: "Expected `self_import` metrics `db` not empty"
|
||||
}
|
||||
);
|
||||
}
|
||||
let mut headers = HeaderMap::new();
|
||||
if let Some(remote_write) = &config.remote_write {
|
||||
ensure!(
|
||||
!remote_write.url.is_empty(),
|
||||
InvalidExportMetricsConfigSnafu {
|
||||
msg: "Expected `remote_write` metrics `url` not empty"
|
||||
}
|
||||
);
|
||||
// construct http header
|
||||
remote_write.headers.iter().try_for_each(|(k, v)| {
|
||||
let header = match TryInto::<HeaderName>::try_into(k) {
|
||||
Ok(header) => header,
|
||||
Err(_) => {
|
||||
return InvalidExportMetricsConfigSnafu {
|
||||
msg: format!("Export metrics: invalid HTTP header name: {}", k),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
match TryInto::<HeaderValue>::try_into(v) {
|
||||
Ok(value) => headers.insert(header, value),
|
||||
Err(_) => {
|
||||
return InvalidExportMetricsConfigSnafu {
|
||||
msg: format!("Export metrics: invalid HTTP header value: {}", v),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
})?;
|
||||
}
|
||||
Ok(Some(Self {
|
||||
config: config.clone(),
|
||||
filter,
|
||||
headers,
|
||||
send_by_handler: config.self_import.is_some(),
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn start(&self, handler: Option<PromStoreProtocolHandlerRef>) -> Result<()> {
|
||||
if !self.config.enable {
|
||||
return Ok(());
|
||||
}
|
||||
let interval = time::interval(self.config.write_interval);
|
||||
let filter = self.filter.clone();
|
||||
let _handle = if let Some(self_import) = &self.config.self_import {
|
||||
ensure!(
|
||||
handler.is_some(),
|
||||
InvalidExportMetricsConfigSnafu {
|
||||
msg: "Only `frontend` or `standalone` can use `self_import` as export method."
|
||||
}
|
||||
);
|
||||
common_runtime::spawn_global(write_system_metric_by_handler(
|
||||
self_import.db.clone(),
|
||||
handler.unwrap(),
|
||||
filter,
|
||||
interval,
|
||||
))
|
||||
} else if let Some(remote_write) = &self.config.remote_write {
|
||||
common_runtime::spawn_global(write_system_metric_by_network(
|
||||
self.headers.clone(),
|
||||
remote_write.url.clone(),
|
||||
filter,
|
||||
interval,
|
||||
))
|
||||
} else {
|
||||
unreachable!()
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Send metrics collected by standard Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/)
|
||||
pub async fn write_system_metric_by_network(
|
||||
headers: HeaderMap,
|
||||
endpoint: String,
|
||||
filter: Option<MetricFilter>,
|
||||
mut interval: Interval,
|
||||
) {
|
||||
info!(
|
||||
"Start export metrics task to endpoint: {}, interval: {}s",
|
||||
endpoint,
|
||||
interval.period().as_secs()
|
||||
);
|
||||
// Pass the first tick. Because the first tick completes immediately.
|
||||
interval.tick().await;
|
||||
let client = reqwest::Client::new();
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let metric_families = prometheus::gather();
|
||||
let request = convert_metric_to_write_request(
|
||||
metric_families,
|
||||
filter.as_ref(),
|
||||
Timestamp::current_millis().value(),
|
||||
);
|
||||
let resp = match snappy_compress(&request.encode_to_vec()) {
|
||||
Ok(body) => client
|
||||
.post(endpoint.as_str())
|
||||
.header("X-Prometheus-Remote-Write-Version", "0.1.0")
|
||||
.header("Content-Type", "application/x-protobuf")
|
||||
.headers(headers.clone())
|
||||
.body(body)
|
||||
.send()
|
||||
.await
|
||||
.context(SendPromRemoteRequestSnafu),
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
match resp {
|
||||
Ok(resp) => {
|
||||
if !resp.status().is_success() {
|
||||
error!("report export metrics error, msg: {:#?}", resp);
|
||||
}
|
||||
}
|
||||
Err(e) => error!(e; "report export metrics failed"),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// Send metrics collected by our internal handler
|
||||
/// for case `frontend` and `standalone` dispose it's own metrics,
|
||||
/// reducing compression and network transmission overhead.
|
||||
pub async fn write_system_metric_by_handler(
|
||||
db: String,
|
||||
handler: PromStoreProtocolHandlerRef,
|
||||
filter: Option<MetricFilter>,
|
||||
mut interval: Interval,
|
||||
) {
|
||||
info!(
|
||||
"Start export metrics task by handler, interval: {}s",
|
||||
interval.period().as_secs()
|
||||
);
|
||||
// Pass the first tick. Because the first tick completes immediately.
|
||||
interval.tick().await;
|
||||
let ctx = Arc::new(QueryContextBuilder::default().current_schema(db).build());
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let metric_families = prometheus::gather();
|
||||
let request = convert_metric_to_write_request(
|
||||
metric_families,
|
||||
filter.as_ref(),
|
||||
Timestamp::current_millis().value(),
|
||||
);
|
||||
|
||||
let (requests, samples) = match to_grpc_row_insert_requests(&request) {
|
||||
Ok((requests, samples)) => (requests, samples),
|
||||
Err(e) => {
|
||||
error!(e; "Failed to convert gathered metrics to RowInsertRequests");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = handler.write(requests, ctx.clone(), false).await {
|
||||
error!(e; "report export metrics by handler failed");
|
||||
} else {
|
||||
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES
|
||||
.with_label_values(&[ctx.get_db_string().as_str()])
|
||||
.inc_by(samples as u64);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::export_metrics::{
|
||||
ExportMetricsOption, ExportMetricsTask, RemoteWriteOption, SelfImportOption,
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_config() {
|
||||
// zero write_interval
|
||||
assert!(
|
||||
ExportMetricsTask::try_new(
|
||||
&ExportMetricsOption {
|
||||
enable: true,
|
||||
write_interval: Duration::from_secs(0),
|
||||
..Default::default()
|
||||
},
|
||||
None
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
// none self_import and remote_write
|
||||
assert!(
|
||||
ExportMetricsTask::try_new(
|
||||
&ExportMetricsOption {
|
||||
enable: true,
|
||||
..Default::default()
|
||||
},
|
||||
None
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
// both self_import and remote_write
|
||||
assert!(
|
||||
ExportMetricsTask::try_new(
|
||||
&ExportMetricsOption {
|
||||
enable: true,
|
||||
self_import: Some(SelfImportOption::default()),
|
||||
remote_write: Some(RemoteWriteOption::default()),
|
||||
..Default::default()
|
||||
},
|
||||
None
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
// empty db
|
||||
assert!(
|
||||
ExportMetricsTask::try_new(
|
||||
&ExportMetricsOption {
|
||||
enable: true,
|
||||
self_import: Some(SelfImportOption {
|
||||
db: String::default()
|
||||
}),
|
||||
remote_write: None,
|
||||
..Default::default()
|
||||
},
|
||||
None
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
// empty url
|
||||
assert!(
|
||||
ExportMetricsTask::try_new(
|
||||
&ExportMetricsOption {
|
||||
enable: true,
|
||||
self_import: None,
|
||||
remote_write: Some(RemoteWriteOption {
|
||||
url: String::default(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
None
|
||||
)
|
||||
.is_err()
|
||||
);
|
||||
// self import but no handle
|
||||
let s = ExportMetricsTask::try_new(
|
||||
&ExportMetricsOption {
|
||||
enable: true,
|
||||
self_import: Some(SelfImportOption::default()),
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
)
|
||||
.unwrap()
|
||||
.unwrap();
|
||||
assert!(s.start(None).is_err());
|
||||
}
|
||||
}
|
||||
@@ -28,7 +28,6 @@ pub mod addrs;
|
||||
pub mod configurator;
|
||||
pub(crate) mod elasticsearch;
|
||||
pub mod error;
|
||||
pub mod export_metrics;
|
||||
pub mod grpc;
|
||||
pub mod heartbeat_options;
|
||||
mod hint_headers;
|
||||
|
||||
@@ -28,7 +28,6 @@ use frontend::service_config::{
|
||||
use mito2::config::MitoConfig;
|
||||
use query::options::QueryOptions;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use servers::export_metrics::ExportMetricsOption;
|
||||
use servers::grpc::GrpcOptions;
|
||||
use servers::http::HttpOptions;
|
||||
|
||||
@@ -55,7 +54,6 @@ pub struct StandaloneOptions {
|
||||
pub user_provider: Option<String>,
|
||||
/// Options for different store engines.
|
||||
pub region_engine: Vec<RegionEngineConfig>,
|
||||
pub export_metrics: ExportMetricsOption,
|
||||
pub tracing: TracingOptions,
|
||||
pub init_regions_in_background: bool,
|
||||
pub init_regions_parallelism: usize,
|
||||
@@ -85,7 +83,6 @@ impl Default for StandaloneOptions {
|
||||
procedure: ProcedureConfig::default(),
|
||||
flow: FlowConfig::default(),
|
||||
logging: LoggingOptions::default(),
|
||||
export_metrics: ExportMetricsOption::default(),
|
||||
user_provider: None,
|
||||
region_engine: vec![
|
||||
RegionEngineConfig::Mito(MitoConfig::default()),
|
||||
@@ -134,8 +131,6 @@ impl StandaloneOptions {
|
||||
meta_client: None,
|
||||
logging: cloned_opts.logging,
|
||||
user_provider: cloned_opts.user_provider,
|
||||
// Handle the export metrics task run by standalone to frontend for execution
|
||||
export_metrics: cloned_opts.export_metrics,
|
||||
max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
|
||||
slow_query: cloned_opts.slow_query,
|
||||
..Default::default()
|
||||
|
||||
@@ -454,7 +454,6 @@ impl GreptimeDbClusterBuilder {
|
||||
instance,
|
||||
servers,
|
||||
heartbeat_task: Some(heartbeat_task),
|
||||
export_metrics_task: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -287,7 +287,6 @@ impl GreptimeDbStandaloneBuilder {
|
||||
instance,
|
||||
servers: ServerHandlers::default(),
|
||||
heartbeat_task: None,
|
||||
export_metrics_task: None,
|
||||
};
|
||||
|
||||
frontend.start().await.unwrap();
|
||||
|
||||
@@ -1547,10 +1547,6 @@ max_concurrent_gc_job = 4
|
||||
|
||||
[region_engine.file]
|
||||
|
||||
[export_metrics]
|
||||
enable = false
|
||||
write_interval = "30s"
|
||||
|
||||
[tracing]
|
||||
|
||||
[slow_query]
|
||||
|
||||
@@ -52,7 +52,6 @@ SHOW TABLES;
|
||||
| region_peers |
|
||||
| region_statistics |
|
||||
| routines |
|
||||
| runtime_metrics |
|
||||
| schema_privileges |
|
||||
| schemata |
|
||||
| session_status |
|
||||
@@ -103,7 +102,6 @@ SHOW FULL TABLES;
|
||||
| region_peers | LOCAL TEMPORARY |
|
||||
| region_statistics | LOCAL TEMPORARY |
|
||||
| routines | LOCAL TEMPORARY |
|
||||
| runtime_metrics | LOCAL TEMPORARY |
|
||||
| schema_privileges | LOCAL TEMPORARY |
|
||||
| schemata | LOCAL TEMPORARY |
|
||||
| session_status | LOCAL TEMPORARY |
|
||||
@@ -148,7 +146,6 @@ SHOW TABLE STATUS;
|
||||
|region_peers||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|
||||
|region_statistics||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|
||||
|routines||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|
||||
|runtime_metrics||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|
||||
|schema_privileges||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|
||||
|schemata||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|
||||
|session_status||11|Fixed|0|0|0|0|0|0|0|DATETIME|DATETIME||utf8_bin|0|||
|
||||
|
||||
@@ -39,7 +39,6 @@ order by table_schema, table_name;
|
||||
|greptime|information_schema|region_peers|LOCALTEMPORARY|29|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|
||||
|greptime|information_schema|region_statistics|LOCALTEMPORARY|35|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|
||||
|greptime|information_schema|routines|LOCALTEMPORARY|21|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|
||||
|greptime|information_schema|runtime_metrics|LOCALTEMPORARY|27|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|
||||
|greptime|information_schema|schema_privileges|LOCALTEMPORARY|22|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|
||||
|greptime|information_schema|schemata|LOCALTEMPORARY|15|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|
||||
|greptime|information_schema|session_status|LOCALTEMPORARY|26|0|0|0|0|0||11|Fixed|0|0|0|DATETIME|DATETIME||utf8_bin|0|||Y|
|
||||
@@ -369,12 +368,6 @@ select * from information_schema.columns order by table_schema, table_name, colu
|
||||
| greptime | information_schema | routines | sql_data_access | 21 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | routines | sql_mode | 26 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | routines | sql_path | 22 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | runtime_metrics | labels | 3 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | runtime_metrics | metric_name | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | runtime_metrics | peer_addr | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | Yes | string | | |
|
||||
| greptime | information_schema | runtime_metrics | peer_type | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | runtime_metrics | timestamp | 6 | | | | | 3 | | | | | select,insert | | TimestampMillisecond | timestamp(3) | FIELD | | No | timestamp(3) | | |
|
||||
| greptime | information_schema | runtime_metrics | value | 2 | | | 22 | | | | | | | select,insert | | Float64 | double | FIELD | | No | double | | |
|
||||
| greptime | information_schema | schema_privileges | grantee | 1 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | schema_privileges | is_grantable | 5 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
| greptime | information_schema | schema_privileges | privilege_type | 4 | 2147483647 | 2147483647 | | | | utf8 | utf8_bin | | | select,insert | | String | string | FIELD | | No | string | | |
|
||||
@@ -832,19 +825,6 @@ SELECT * FROM CHECK_CONSTRAINTS;
|
||||
+--------------------+-------------------+-----------------+--------------+
|
||||
+--------------------+-------------------+-----------------+--------------+
|
||||
|
||||
DESC TABLE RUNTIME_METRICS;
|
||||
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
| Column | Type | Key | Null | Default | Semantic Type |
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
| metric_name | String | | NO | | FIELD |
|
||||
| value | Float64 | | NO | | FIELD |
|
||||
| labels | String | | YES | | FIELD |
|
||||
| peer_addr | String | | YES | | FIELD |
|
||||
| peer_type | String | | NO | | FIELD |
|
||||
| timestamp | TimestampMillisecond | | NO | | FIELD |
|
||||
+-------------+----------------------+-----+------+---------+---------------+
|
||||
|
||||
DESC TABLE REGION_PEERS;
|
||||
|
||||
+---------------+--------+-----+------+---------+---------------+
|
||||
|
||||
@@ -130,8 +130,6 @@ DESC TABLE CHECK_CONSTRAINTS;
|
||||
|
||||
SELECT * FROM CHECK_CONSTRAINTS;
|
||||
|
||||
DESC TABLE RUNTIME_METRICS;
|
||||
|
||||
DESC TABLE REGION_PEERS;
|
||||
|
||||
USE INFORMATION_SCHEMA;
|
||||
|
||||
@@ -113,7 +113,6 @@ SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE;
|
||||
|greptime|information_schema|region_peers|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|
||||
|greptime|information_schema|region_statistics|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|
||||
|greptime|information_schema|routines|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|
||||
|greptime|information_schema|runtime_metrics|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|
||||
|greptime|information_schema|schema_privileges|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|
||||
|greptime|information_schema|schemata|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|
||||
|greptime|information_schema|session_status|LOCALTEMPORARY|ID|ID|ID|ID|ID|ID||ID|Fixed|ID|ID|ID|DATETIME|DATETIME||utf8_bin|ID|||Y|
|
||||
|
||||
Reference in New Issue
Block a user