refactor: remove redudant PromStoreProtocolHandler::write (#3553)

refactor: remove redudant PromStoreProtocolHandler::write API and rename PromStoreProtocolHandler::write_fast to write
This commit is contained in:
Lei, HUANG
2024-03-22 10:09:00 +08:00
committed by GitHub
parent 1f0fc40287
commit 86fb9d8ac7
9 changed files with 46 additions and 99 deletions

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse};
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
@@ -46,7 +46,6 @@ use crate::error::{
TableNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES;
const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;
@@ -163,43 +162,6 @@ impl Instance {
#[async_trait]
impl PromStoreProtocolHandler for Instance {
async fn write(
&self,
request: WriteRequest,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;
let interceptor_ref = self
.plugins
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_write(&request, ctx.clone())?;
let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?;
let output = if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
self.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
} else {
self.handle_row_inserts(requests, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
};
PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok(output)
}
async fn write_fast(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,
@@ -316,14 +278,13 @@ impl ExportMetricHandler {
impl PromStoreProtocolHandler for ExportMetricHandler {
async fn write(
&self,
request: WriteRequest,
request: RowInsertRequests,
ctx: QueryContextRef,
_: bool,
) -> ServerResult<Output> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?;
self.inserter
.handle_metric_row_inserts(
requests,
request,
ctx,
&self.statement_executor,
GREPTIME_PHYSICAL_TABLE.to_string(),
@@ -333,15 +294,6 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
.context(error::ExecuteGrpcQuerySnafu)
}
async fn write_fast(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
_with_metric_engine: bool,
) -> ServerResult<Output> {
unimplemented!()
}
async fn read(
&self,
_request: ReadRequest,

View File

@@ -41,13 +41,6 @@ lazy_static! {
.with_label_values(&["insert"]);
pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED
.with_label_values(&["execute"]);
/// The samples count of Prometheus remote write.
pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!(
"greptime_frontend_prometheus_remote_write_samples",
"frontend prometheus remote write samples"
)
.unwrap();
pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_metrics_rows",
"frontend otlp metrics rows"

View File

@@ -29,7 +29,7 @@ use snafu::{ensure, ResultExt};
use tokio::time::{self, Interval};
use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu};
use crate::prom_store::snappy_compress;
use crate::prom_store::{snappy_compress, to_grpc_row_insert_requests};
use crate::query_handler::PromStoreProtocolHandlerRef;
/// Use to export the metrics generated by greptimedb, encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/),
@@ -256,8 +256,19 @@ pub async fn write_system_metric_by_handler(
filter.as_ref(),
Timestamp::current_millis().value(),
);
if let Err(e) = handler.write(request, ctx.clone(), false).await {
let (requests, samples) = match to_grpc_row_insert_requests(&request) {
Ok((requests, samples)) => (requests, samples),
Err(e) => {
error!(e; "Failed to convert gathered metrics to RowInsertRequests");
continue;
}
};
if let Err(e) = handler.write(requests, ctx.clone(), false).await {
error!("report export metrics by handler failed, error {}", e);
} else {
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
}
}
}

View File

@@ -14,7 +14,7 @@
use std::sync::Arc;
use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::prom_store::remote::ReadRequest;
use api::v1::RowInsertRequests;
use axum::extract::{Query, RawBody, State};
use axum::http::{header, HeaderValue, StatusCode};
@@ -75,13 +75,14 @@ pub async fn route_write_without_metric_engine(
.with_label_values(&[db.as_str()])
.start_timer();
let request = decode_remote_write_request(body).await?;
let (request, samples) = decode_remote_write_request(body).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
}
let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
@@ -104,7 +105,7 @@ pub async fn remote_write(
.with_label_values(&[db.as_str()])
.start_timer();
let request = decode_remote_write_request_to_row_inserts(body).await?;
let (request, samples) = decode_remote_write_request_to_row_inserts(body).await?;
if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
@@ -112,7 +113,8 @@ pub async fn remote_write(
query_ctx = Arc::new(new_query_ctx);
}
let output = handler.write_fast(request, query_ctx, true).await?;
let output = handler.write(request, query_ctx, true).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
@@ -159,7 +161,9 @@ pub async fn remote_read(
handler.read(request, query_ctx).await
}
async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result<RowInsertRequests> {
async fn decode_remote_write_request_to_row_inserts(
body: Body,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
@@ -171,24 +175,22 @@ async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result<RowIns
request
.merge(buf)
.context(error::DecodePromRemoteRequestSnafu)?;
let (requests, samples) = request.as_row_insert_requests();
crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES.observe(samples as f64);
Ok(requests)
Ok(request.as_row_insert_requests())
}
async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {
async fn decode_remote_write_request(body: Body) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;
let buf = snappy_decompress(&body[..])?;
let buf = Bytes::from(snappy_decompress(&body[..])?);
let request = WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)?;
crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES
.observe(request.timeseries.len() as f64);
Ok(request)
let mut request = PromWriteRequest::default();
request
.merge(buf)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(request.as_row_insert_requests())
}
async fn decode_remote_read_request(body: Body) -> Result<ReadRequest> {

View File

@@ -103,9 +103,10 @@ lazy_static! {
/// Duration to convert prometheus write request to gRPC request.
pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED
.with_label_values(&["convert"]);
pub static ref METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES: Histogram = register_histogram!(
"greptime_servers_http_prometheus_decode_num_series",
"servers http prometheus decode num series",
/// The samples count of Prometheus remote write.
pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!(
"greptime_servers_prometheus_remote_write_samples",
"frontend prometheus remote write samples"
)
.unwrap();
/// Http prometheus read duration per database.

View File

@@ -40,7 +40,10 @@ pub struct PromLabel {
}
impl Clear for PromLabel {
fn clear(&mut self) {}
fn clear(&mut self) {
self.name.clear();
self.value.clear();
}
}
impl PromLabel {

View File

@@ -28,7 +28,7 @@ pub mod sql;
use std::collections::HashMap;
use std::sync::Arc;
use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::prom_store::remote::ReadRequest;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_query::Output;
@@ -90,14 +90,6 @@ pub struct PromStoreResponse {
pub trait PromStoreProtocolHandler {
/// Handling prometheus remote write requests
async fn write(
&self,
request: WriteRequest,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> Result<Output>;
/// Handling prometheus remote write requests
async fn write_fast(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,

View File

@@ -58,16 +58,7 @@ impl GrpcQueryHandler for DummyInstance {
#[async_trait]
impl PromStoreProtocolHandler for DummyInstance {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef, _: bool) -> Result<Output> {
let _ = self
.tx
.send((ctx.current_schema().to_owned(), request.encode_to_vec()))
.await;
Ok(Output::new_with_affected_rows(0))
}
async fn write_fast(
async fn write(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,

View File

@@ -25,6 +25,7 @@ mod tests {
use prost::Message;
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::prom_store;
use servers::prom_store::to_grpc_row_insert_requests;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::PromStoreProtocolHandler;
use session::context::QueryContext;
@@ -107,8 +108,9 @@ mod tests {
.unwrap()
.is_ok());
let (row_inserts, _) = to_grpc_row_insert_requests(&write_request).unwrap();
instance
.write(write_request, ctx.clone(), true)
.write(row_inserts, ctx.clone(), true)
.await
.unwrap();