From 86fb9d8ac7513eb03a4a27bf112e7f736a2ee58b Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Fri, 22 Mar 2024 10:09:00 +0800 Subject: [PATCH] refactor: remove redudant PromStoreProtocolHandler::write (#3553) refactor: remove redudant PromStoreProtocolHandler::write API and rename PromStoreProtocolHandler::write_fast to write --- src/frontend/src/instance/prom_store.rs | 54 ++--------------------- src/frontend/src/metrics.rs | 7 --- src/servers/src/export_metrics.rs | 15 ++++++- src/servers/src/http/prom_store.rs | 32 +++++++------- src/servers/src/metrics.rs | 7 +-- src/servers/src/proto.rs | 5 ++- src/servers/src/query_handler.rs | 10 +---- src/servers/tests/http/prom_store_test.rs | 11 +---- tests-integration/src/prom_store.rs | 4 +- 9 files changed, 46 insertions(+), 99 deletions(-) diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 1b5aa14c5f..104573bf86 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::prom_store::remote::read_request::ResponseType; -use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; +use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse}; use api::v1::RowInsertRequests; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; @@ -46,7 +46,6 @@ use crate::error::{ TableNotFoundSnafu, }; use crate::instance::Instance; -use crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES; const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; @@ -163,43 +162,6 @@ impl Instance { #[async_trait] impl PromStoreProtocolHandler for Instance { async fn write( - &self, - request: WriteRequest, - ctx: QueryContextRef, - with_metric_engine: bool, - ) -> ServerResult { - self.plugins - .get::() - .as_ref() - .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite) - .context(AuthSnafu)?; - let interceptor_ref = self - .plugins - .get::>(); - interceptor_ref.pre_write(&request, ctx.clone())?; - - let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?; - let output = if with_metric_engine { - let physical_table = ctx - .extension(PHYSICAL_TABLE_PARAM) - .unwrap_or(GREPTIME_PHYSICAL_TABLE) - .to_string(); - self.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string()) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)? - } else { - self.handle_row_inserts(requests, ctx.clone()) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)? - }; - - PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); - Ok(output) - } - - async fn write_fast( &self, request: RowInsertRequests, ctx: QueryContextRef, @@ -316,14 +278,13 @@ impl ExportMetricHandler { impl PromStoreProtocolHandler for ExportMetricHandler { async fn write( &self, - request: WriteRequest, + request: RowInsertRequests, ctx: QueryContextRef, _: bool, ) -> ServerResult { - let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?; self.inserter .handle_metric_row_inserts( - requests, + request, ctx, &self.statement_executor, GREPTIME_PHYSICAL_TABLE.to_string(), @@ -333,15 +294,6 @@ impl PromStoreProtocolHandler for ExportMetricHandler { .context(error::ExecuteGrpcQuerySnafu) } - async fn write_fast( - &self, - _request: RowInsertRequests, - _ctx: QueryContextRef, - _with_metric_engine: bool, - ) -> ServerResult { - unimplemented!() - } - async fn read( &self, _request: ReadRequest, diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 5c3c6122e4..db9d53ac19 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -41,13 +41,6 @@ lazy_static! { .with_label_values(&["insert"]); pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED .with_label_values(&["execute"]); - - /// The samples count of Prometheus remote write. - pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!( - "greptime_frontend_prometheus_remote_write_samples", - "frontend prometheus remote write samples" - ) - .unwrap(); pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!( "greptime_frontend_otlp_metrics_rows", "frontend otlp metrics rows" diff --git a/src/servers/src/export_metrics.rs b/src/servers/src/export_metrics.rs index 014f58e526..3ec089db41 100644 --- a/src/servers/src/export_metrics.rs +++ b/src/servers/src/export_metrics.rs @@ -29,7 +29,7 @@ use snafu::{ensure, ResultExt}; use tokio::time::{self, Interval}; use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu}; -use crate::prom_store::snappy_compress; +use crate::prom_store::{snappy_compress, to_grpc_row_insert_requests}; use crate::query_handler::PromStoreProtocolHandlerRef; /// Use to export the metrics generated by greptimedb, encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/), @@ -256,8 +256,19 @@ pub async fn write_system_metric_by_handler( filter.as_ref(), Timestamp::current_millis().value(), ); - if let Err(e) = handler.write(request, ctx.clone(), false).await { + + let (requests, samples) = match to_grpc_row_insert_requests(&request) { + Ok((requests, samples)) => (requests, samples), + Err(e) => { + error!(e; "Failed to convert gathered metrics to RowInsertRequests"); + continue; + } + }; + + if let Err(e) = handler.write(requests, ctx.clone(), false).await { error!("report export metrics by handler failed, error {}", e); + } else { + crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); } } } diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 514bf6d22c..bf3f248eb0 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use api::prom_store::remote::{ReadRequest, WriteRequest}; +use api::prom_store::remote::ReadRequest; use api::v1::RowInsertRequests; use axum::extract::{Query, RawBody, State}; use axum::http::{header, HeaderValue, StatusCode}; @@ -75,13 +75,14 @@ pub async fn route_write_without_metric_engine( .with_label_values(&[db.as_str()]) .start_timer(); - let request = decode_remote_write_request(body).await?; + let (request, samples) = decode_remote_write_request(body).await?; // reject if physical table is specified when metric engine is disabled if params.physical_table.is_some() { return UnexpectedPhysicalTableSnafu {}.fail(); } let output = handler.write(request, query_ctx, false).await?; + crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); Ok(( StatusCode::NO_CONTENT, write_cost_header_map(output.meta.cost), @@ -104,7 +105,7 @@ pub async fn remote_write( .with_label_values(&[db.as_str()]) .start_timer(); - let request = decode_remote_write_request_to_row_inserts(body).await?; + let (request, samples) = decode_remote_write_request_to_row_inserts(body).await?; if let Some(physical_table) = params.physical_table { let mut new_query_ctx = query_ctx.as_ref().clone(); @@ -112,7 +113,8 @@ pub async fn remote_write( query_ctx = Arc::new(new_query_ctx); } - let output = handler.write_fast(request, query_ctx, true).await?; + let output = handler.write(request, query_ctx, true).await?; + crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); Ok(( StatusCode::NO_CONTENT, write_cost_header_map(output.meta.cost), @@ -159,7 +161,9 @@ pub async fn remote_read( handler.read(request, query_ctx).await } -async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result { +async fn decode_remote_write_request_to_row_inserts( + body: Body, +) -> Result<(RowInsertRequests, usize)> { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); let body = hyper::body::to_bytes(body) .await @@ -171,24 +175,22 @@ async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result Result { +async fn decode_remote_write_request(body: Body) -> Result<(RowInsertRequests, usize)> { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); let body = hyper::body::to_bytes(body) .await .context(error::HyperSnafu)?; - let buf = snappy_decompress(&body[..])?; + let buf = Bytes::from(snappy_decompress(&body[..])?); - let request = WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)?; - crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES - .observe(request.timeseries.len() as f64); - - Ok(request) + let mut request = PromWriteRequest::default(); + request + .merge(buf) + .context(error::DecodePromRemoteRequestSnafu)?; + Ok(request.as_row_insert_requests()) } async fn decode_remote_read_request(body: Body) -> Result { diff --git a/src/servers/src/metrics.rs b/src/servers/src/metrics.rs index 41d56702bc..5fc63ba7d8 100644 --- a/src/servers/src/metrics.rs +++ b/src/servers/src/metrics.rs @@ -103,9 +103,10 @@ lazy_static! { /// Duration to convert prometheus write request to gRPC request. pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED .with_label_values(&["convert"]); - pub static ref METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES: Histogram = register_histogram!( - "greptime_servers_http_prometheus_decode_num_series", - "servers http prometheus decode num series", + /// The samples count of Prometheus remote write. + pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!( + "greptime_servers_prometheus_remote_write_samples", + "frontend prometheus remote write samples" ) .unwrap(); /// Http prometheus read duration per database. diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs index 9ea907306c..51b8fb05b6 100644 --- a/src/servers/src/proto.rs +++ b/src/servers/src/proto.rs @@ -40,7 +40,10 @@ pub struct PromLabel { } impl Clear for PromLabel { - fn clear(&mut self) {} + fn clear(&mut self) { + self.name.clear(); + self.value.clear(); + } } impl PromLabel { diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index f68dc1d02e..0430005aed 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -28,7 +28,7 @@ pub mod sql; use std::collections::HashMap; use std::sync::Arc; -use api::prom_store::remote::{ReadRequest, WriteRequest}; +use api::prom_store::remote::ReadRequest; use api::v1::RowInsertRequests; use async_trait::async_trait; use common_query::Output; @@ -90,14 +90,6 @@ pub struct PromStoreResponse { pub trait PromStoreProtocolHandler { /// Handling prometheus remote write requests async fn write( - &self, - request: WriteRequest, - ctx: QueryContextRef, - with_metric_engine: bool, - ) -> Result; - - /// Handling prometheus remote write requests - async fn write_fast( &self, request: RowInsertRequests, ctx: QueryContextRef, diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index 5d6b12bc9d..1a005fc0bd 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -58,16 +58,7 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl PromStoreProtocolHandler for DummyInstance { - async fn write(&self, request: WriteRequest, ctx: QueryContextRef, _: bool) -> Result { - let _ = self - .tx - .send((ctx.current_schema().to_owned(), request.encode_to_vec())) - .await; - - Ok(Output::new_with_affected_rows(0)) - } - - async fn write_fast( + async fn write( &self, _request: RowInsertRequests, _ctx: QueryContextRef, diff --git a/tests-integration/src/prom_store.rs b/tests-integration/src/prom_store.rs index 4c0c7a4a53..73502c4f74 100644 --- a/tests-integration/src/prom_store.rs +++ b/tests-integration/src/prom_store.rs @@ -25,6 +25,7 @@ mod tests { use prost::Message; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::prom_store; + use servers::prom_store::to_grpc_row_insert_requests; use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::PromStoreProtocolHandler; use session::context::QueryContext; @@ -107,8 +108,9 @@ mod tests { .unwrap() .is_ok()); + let (row_inserts, _) = to_grpc_row_insert_requests(&write_request).unwrap(); instance - .write(write_request, ctx.clone(), true) + .write(row_inserts, ctx.clone(), true) .await .unwrap();