From 3acd5bfad0ab5f0786ce9206e9cb8dd1a7cc5890 Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Mon, 18 Mar 2024 19:21:19 +0800 Subject: [PATCH] chore: http header with metrics (#3536) * chore: bring write cost to output * chore: add write cost to greptimev1result * chore: add metrics to influxdb write resp header * chore: add metrics to prom store * chore: add metrics to otlp * chore: add debug log * fix: prom remote read with output * fix: prom queries don't output metrics header * chore: extract header value * chore: refactor code * chore: fix cr issue --- src/common/function/src/handlers.rs | 3 +- src/common/function/src/state.rs | 5 +- src/common/plugins/src/consts.rs | 3 +- src/common/plugins/src/lib.rs | 2 +- src/common/query/src/lib.rs | 16 +++- src/frontend/src/instance/influxdb.rs | 9 +- src/frontend/src/instance/otlp.rs | 43 +++------ src/frontend/src/instance/prom_store.rs | 61 +++++++------ src/operator/src/insert.rs | 24 ++--- src/operator/src/statement.rs | 5 +- src/operator/src/statement/copy_database.rs | 13 ++- src/operator/src/statement/copy_table_from.rs | 32 +++++-- src/operator/src/table.rs | 3 +- src/query/src/datafusion.rs | 27 ++++-- src/query/src/dist_plan/merge_scan.rs | 4 +- src/servers/src/grpc/otlp.rs | 12 ++- src/servers/src/http/handler.rs | 49 ++-------- src/servers/src/http/header.rs | 64 +++++++++++++ src/servers/src/http/influxdb.rs | 8 +- src/servers/src/http/otlp.rs | 44 ++++++--- src/servers/src/http/prom_store.rs | 45 ++++++---- src/servers/src/http/prometheus.rs | 90 +++++++++++++------ src/servers/src/http/prometheus_resp.rs | 3 +- src/servers/src/query_handler.rs | 25 +++--- src/servers/tests/http/influxdb_test.rs | 4 +- src/servers/tests/http/prom_store_test.rs | 14 +-- tests-integration/src/otlp.rs | 4 +- 27 files changed, 376 insertions(+), 236 deletions(-) diff --git a/src/common/function/src/handlers.rs b/src/common/function/src/handlers.rs index 11175a87bd..a685b0dbc0 100644 --- a/src/common/function/src/handlers.rs +++ b/src/common/function/src/handlers.rs @@ -18,6 +18,7 @@ use async_trait::async_trait; use common_base::AffectedRows; use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; use common_query::error::Result; +use common_query::Output; use session::context::QueryContextRef; use store_api::storage::RegionId; use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest}; @@ -26,7 +27,7 @@ use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, Ins #[async_trait] pub trait TableMutationHandler: Send + Sync { /// Inserts rows into the table. - async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result; + async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result; /// Delete rows from the table. async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result; diff --git a/src/common/function/src/state.rs b/src/common/function/src/state.rs index 469f1ffa5a..0a740ddb4a 100644 --- a/src/common/function/src/state.rs +++ b/src/common/function/src/state.rs @@ -35,6 +35,7 @@ impl FunctionState { use common_base::AffectedRows; use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse}; use common_query::error::Result; + use common_query::Output; use session::context::QueryContextRef; use store_api::storage::RegionId; use table::requests::{ @@ -70,8 +71,8 @@ impl FunctionState { &self, _request: InsertRequest, _ctx: QueryContextRef, - ) -> Result { - Ok(ROWS) + ) -> Result { + Ok(Output::new_with_affected_rows(ROWS)) } async fn delete( diff --git a/src/common/plugins/src/consts.rs b/src/common/plugins/src/consts.rs index f2150cc24c..ff5d3e1b71 100644 --- a/src/common/plugins/src/consts.rs +++ b/src/common/plugins/src/consts.rs @@ -16,4 +16,5 @@ pub const GREPTIME_EXEC_PREFIX: &str = "greptime_exec_"; /// Execution cost metrics key -pub const GREPTIME_EXEC_COST: &str = "greptime_exec_cost"; +pub const GREPTIME_EXEC_READ_COST: &str = "greptime_exec_read_cost"; +pub const GREPTIME_EXEC_WRITE_COST: &str = "greptime_exec_write_cost"; diff --git a/src/common/plugins/src/lib.rs b/src/common/plugins/src/lib.rs index bd815d794d..9c2096ef61 100644 --- a/src/common/plugins/src/lib.rs +++ b/src/common/plugins/src/lib.rs @@ -17,4 +17,4 @@ /// since `plugins` crate is at the top depending on crates like `frontend` and `datanode` mod consts; -pub use consts::{GREPTIME_EXEC_COST, GREPTIME_EXEC_PREFIX}; +pub use consts::{GREPTIME_EXEC_PREFIX, GREPTIME_EXEC_READ_COST, GREPTIME_EXEC_WRITE_COST}; diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 1f00bfa22c..cb39d37fb4 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -40,7 +40,7 @@ pub struct Output { /// Original Output struct /// carrying result data to response/client/user interface pub enum OutputData { - AffectedRows(usize), + AffectedRows(OutputRows), RecordBatches(RecordBatches), Stream(SendableRecordBatchStream), } @@ -50,11 +50,11 @@ pub enum OutputData { pub struct OutputMeta { /// May exist for query output. One can retrieve execution metrics from this plan. pub plan: Option>, - pub cost: usize, + pub cost: OutputCost, } impl Output { - pub fn new_with_affected_rows(affected_rows: usize) -> Self { + pub fn new_with_affected_rows(affected_rows: OutputRows) -> Self { Self { data: OutputData::AffectedRows(affected_rows), meta: Default::default(), @@ -78,6 +78,13 @@ impl Output { pub fn new(data: OutputData, meta: OutputMeta) -> Self { Self { data, meta } } + + pub fn extract_rows_and_cost(&self) -> (OutputRows, OutputCost) { + match self.data { + OutputData::AffectedRows(rows) => (rows, self.meta.cost), + _ => (0, self.meta.cost), + } + } } impl Debug for OutputData { @@ -133,3 +140,6 @@ impl From<&AddColumnLocation> for Location { } } } + +pub type OutputRows = usize; +pub type OutputCost = usize; diff --git a/src/frontend/src/instance/influxdb.rs b/src/frontend/src/instance/influxdb.rs index 72fe0d92b4..23eecd11e7 100644 --- a/src/frontend/src/instance/influxdb.rs +++ b/src/frontend/src/instance/influxdb.rs @@ -14,6 +14,7 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; +use client::Output; use common_error::ext::BoxedError; use servers::error::{AuthSnafu, Error}; use servers::influxdb::InfluxdbRequest; @@ -30,7 +31,7 @@ impl InfluxdbLineProtocolHandler for Instance { &self, request: InfluxdbRequest, ctx: QueryContextRef, - ) -> servers::error::Result<()> { + ) -> servers::error::Result { self.plugins .get::() .as_ref() @@ -41,11 +42,9 @@ impl InfluxdbLineProtocolHandler for Instance { interceptor_ref.pre_execute(&request.lines, ctx.clone())?; let requests = request.try_into()?; - let _ = self - .handle_row_inserts(requests, ctx) + self.handle_row_inserts(requests, ctx) .await .map_err(BoxedError::new) - .context(servers::error::ExecuteGrpcQuerySnafu)?; - Ok(()) + .context(servers::error::ExecuteGrpcQuerySnafu) } } diff --git a/src/frontend/src/instance/otlp.rs b/src/frontend/src/instance/otlp.rs index 7817bb69f2..8d707d12bd 100644 --- a/src/frontend/src/instance/otlp.rs +++ b/src/frontend/src/instance/otlp.rs @@ -14,14 +14,11 @@ use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; +use client::Output; use common_error::ext::BoxedError; use common_telemetry::tracing; -use opentelemetry_proto::tonic::collector::metrics::v1::{ - ExportMetricsServiceRequest, ExportMetricsServiceResponse, -}; -use opentelemetry_proto::tonic::collector::trace::v1::{ - ExportTraceServiceRequest, ExportTraceServiceResponse, -}; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; use servers::error::{self, AuthSnafu, Result as ServerResult}; use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef}; use servers::otlp; @@ -40,7 +37,7 @@ impl OpenTelemetryProtocolHandler for Instance { &self, request: ExportMetricsServiceRequest, ctx: QueryContextRef, - ) -> ServerResult { + ) -> ServerResult { self.plugins .get::() .as_ref() @@ -53,19 +50,12 @@ impl OpenTelemetryProtocolHandler for Instance { interceptor_ref.pre_execute(ctx.clone())?; let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?; - let _ = self - .handle_row_inserts(requests, ctx) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)?; - OTLP_METRICS_ROWS.inc_by(rows as u64); - let resp = ExportMetricsServiceResponse { - // TODO(sunng87): add support for partial_success in future patch - partial_success: None, - }; - Ok(resp) + self.handle_row_inserts(requests, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) } #[tracing::instrument(skip_all)] @@ -73,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance { &self, request: ExportTraceServiceRequest, ctx: QueryContextRef, - ) -> ServerResult { + ) -> ServerResult { self.plugins .get::() .as_ref() @@ -95,18 +85,11 @@ impl OpenTelemetryProtocolHandler for Instance { let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?; - let _ = self - .handle_row_inserts(requests, ctx) - .await - .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)?; - OTLP_TRACES_ROWS.inc_by(rows as u64); - let resp = ExportTraceServiceResponse { - // TODO(fys): add support for partial_success in future patch - partial_success: None, - }; - Ok(resp) + self.handle_row_inserts(requests, ctx) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu) } } diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 5382cf9682..1b5aa14c5f 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use api::prom_store::remote::read_request::ResponseType; @@ -30,6 +31,7 @@ use operator::insert::InserterRef; use operator::statement::StatementExecutor; use prost::Message; use servers::error::{self, AuthSnafu, Result as ServerResult}; +use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; use servers::http::prom_store::PHYSICAL_TABLE_PARAM; use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef}; use servers::prom_store::{self, Metrics}; @@ -165,7 +167,7 @@ impl PromStoreProtocolHandler for Instance { request: WriteRequest, ctx: QueryContextRef, with_metric_engine: bool, - ) -> ServerResult<()> { + ) -> ServerResult { self.plugins .get::() .as_ref() @@ -177,26 +179,24 @@ impl PromStoreProtocolHandler for Instance { interceptor_ref.pre_write(&request, ctx.clone())?; let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?; - if with_metric_engine { + let output = if with_metric_engine { let physical_table = ctx .extension(PHYSICAL_TABLE_PARAM) .unwrap_or(GREPTIME_PHYSICAL_TABLE) .to_string(); - let _ = self - .handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string()) + self.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string()) .await .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)?; + .context(error::ExecuteGrpcQuerySnafu)? } else { - let _ = self - .handle_row_inserts(requests, ctx.clone()) + self.handle_row_inserts(requests, ctx.clone()) .await .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)?; - } + .context(error::ExecuteGrpcQuerySnafu)? + }; PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64); - Ok(()) + Ok(output) } async fn write_fast( @@ -204,31 +204,30 @@ impl PromStoreProtocolHandler for Instance { request: RowInsertRequests, ctx: QueryContextRef, with_metric_engine: bool, - ) -> ServerResult<()> { + ) -> ServerResult { self.plugins .get::() .as_ref() .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite) .context(AuthSnafu)?; - if with_metric_engine { + let output = if with_metric_engine { let physical_table = ctx .extension(PHYSICAL_TABLE_PARAM) .unwrap_or(GREPTIME_PHYSICAL_TABLE) .to_string(); - let _ = self - .handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string()) + self.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string()) .await .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)?; + .context(error::ExecuteGrpcQuerySnafu)? } else { - let _ = self - .handle_row_inserts(request, ctx.clone()) + self.handle_row_inserts(request, ctx.clone()) .await .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)?; - } - Ok(()) + .context(error::ExecuteGrpcQuerySnafu)? + }; + + Ok(output) } async fn read( @@ -254,18 +253,29 @@ impl PromStoreProtocolHandler for Instance { match response_type { ResponseType::Samples => { let mut query_results = Vec::with_capacity(results.len()); + let mut map = HashMap::new(); for (table_name, output) in results { + let plan = output.meta.plan.clone(); query_results.push(to_query_result(&table_name, output).await?); + if let Some(ref plan) = plan { + collect_plan_metrics(plan.clone(), &mut [&mut map]); + } } let response = ReadResponse { results: query_results, }; + let resp_metrics = map + .into_iter() + .map(|(k, v)| (k, v.into())) + .collect::>(); + // TODO(dennis): may consume too much memory, adds flow control Ok(PromStoreResponse { - content_type: "application/x-protobuf".to_string(), - content_encoding: "snappy".to_string(), + content_type: CONTENT_TYPE_PROTOBUF.clone(), + content_encoding: CONTENT_ENCODING_SNAPPY.clone(), + resp_metrics, body: prom_store::snappy_compress(&response.encode_to_vec())?, }) } @@ -309,7 +319,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler { request: WriteRequest, ctx: QueryContextRef, _: bool, - ) -> ServerResult<()> { + ) -> ServerResult { let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?; self.inserter .handle_metric_row_inserts( @@ -320,8 +330,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler { ) .await .map_err(BoxedError::new) - .context(error::ExecuteGrpcQuerySnafu)?; - Ok(()) + .context(error::ExecuteGrpcQuerySnafu) } async fn write_fast( @@ -329,7 +338,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler { _request: RowInsertRequests, _ctx: QueryContextRef, _with_metric_engine: bool, - ) -> ServerResult<()> { + ) -> ServerResult { unimplemented!() } diff --git a/src/operator/src/insert.rs b/src/operator/src/insert.rs index 91abfdf5c7..46fe574c38 100644 --- a/src/operator/src/insert.rs +++ b/src/operator/src/insert.rs @@ -22,6 +22,7 @@ use api::v1::{ RowInsertRequests, SemanticType, }; use catalog::CatalogManagerRef; +use client::{OutputData, OutputMeta}; use common_catalog::consts::default_engine; use common_grpc_expr::util::{extract_new_columns, ColumnExpr}; use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef}; @@ -110,8 +111,7 @@ impl Inserter { .convert(requests) .await?; - let affected_rows = self.do_request(inserts, &ctx).await?; - Ok(Output::new_with_affected_rows(affected_rows)) + self.do_request(inserts, &ctx).await } /// Handle row inserts request with metric engine. @@ -148,15 +148,14 @@ impl Inserter { .convert(requests) .await?; - let affected_rows = self.do_request(inserts, &ctx).await?; - Ok(Output::new_with_affected_rows(affected_rows)) + self.do_request(inserts, &ctx).await } pub async fn handle_table_insert( &self, request: TableInsertRequest, ctx: QueryContextRef, - ) -> Result { + ) -> Result { let catalog = request.catalog_name.as_str(); let schema = request.schema_name.as_str(); let table_name = request.table_name.as_str(); @@ -170,8 +169,7 @@ impl Inserter { .convert(request) .await?; - let affected_rows = self.do_request(inserts, &ctx).await?; - Ok(affected_rows as _) + self.do_request(inserts, &ctx).await } pub async fn handle_statement_insert( @@ -184,8 +182,7 @@ impl Inserter { .convert(insert, ctx) .await?; - let affected_rows = self.do_request(inserts, ctx).await?; - Ok(Output::new_with_affected_rows(affected_rows)) + self.do_request(inserts, ctx).await } } @@ -194,8 +191,8 @@ impl Inserter { &self, requests: RegionInsertRequests, ctx: &QueryContextRef, - ) -> Result { - write_meter!(ctx.current_catalog(), ctx.current_schema(), requests); + ) -> Result { + let write_cost = write_meter!(ctx.current_catalog(), ctx.current_schema(), requests); let request_factory = RegionRequestFactory::new(RegionRequestHeader { tracing_context: TracingContext::from_current_span().to_w3c(), dbname: ctx.get_db_string(), @@ -221,7 +218,10 @@ impl Inserter { let affected_rows = results.into_iter().sum::>()?; crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64); - Ok(affected_rows) + Ok(Output::new( + OutputData::AffectedRows(affected_rows), + OutputMeta::new_with_cost(write_cost as _), + )) } async fn group_requests_by_peer( diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index fc194e1496..974f02c52a 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -124,10 +124,7 @@ impl StatementExecutor { .copy_table_to(req, query_ctx) .await .map(Output::new_with_affected_rows), - CopyDirection::Import => self - .copy_table_from(req, query_ctx) - .await - .map(Output::new_with_affected_rows), + CopyDirection::Import => self.copy_table_from(req, query_ctx).await, } } diff --git a/src/operator/src/statement/copy_database.rs b/src/operator/src/statement/copy_database.rs index ca62fd97cd..fac6d77a6c 100644 --- a/src/operator/src/statement/copy_database.rs +++ b/src/operator/src/statement/copy_database.rs @@ -15,7 +15,7 @@ use std::path::Path; use std::str::FromStr; -use client::Output; +use client::{Output, OutputData, OutputMeta}; use common_datasource::file_format::Format; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::build_backend; @@ -129,7 +129,9 @@ impl StatementExecutor { .get(CONTINUE_ON_ERROR_KEY) .and_then(|v| bool::from_str(v).ok()) .unwrap_or(false); + let mut rows_inserted = 0; + let mut insert_cost = 0; for e in entries { let table_name = match parse_file_name_to_copy(&e) { @@ -156,8 +158,10 @@ impl StatementExecutor { }; debug!("Copy table, arg: {:?}", req); match self.copy_table_from(req, ctx.clone()).await { - Ok(rows) => { + Ok(o) => { + let (rows, cost) = o.extract_rows_and_cost(); rows_inserted += rows; + insert_cost += cost; } Err(err) => { if continue_on_error { @@ -169,7 +173,10 @@ impl StatementExecutor { } } } - Ok(Output::new_with_affected_rows(rows_inserted)) + Ok(Output::new( + OutputData::AffectedRows(rows_inserted), + OutputMeta::new_with_cost(insert_cost), + )) } } diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 7e7b748cb7..c8c1ae3688 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -15,7 +15,9 @@ use std::collections::HashMap; use std::future::Future; use std::sync::Arc; +use std::usize; +use client::{Output, OutputData, OutputMeta}; use common_base::readable_size::ReadableSize; use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener}; use common_datasource::file_format::json::{JsonFormat, JsonOpener}; @@ -24,6 +26,7 @@ use common_datasource::file_format::{FileFormat, Format}; use common_datasource::lister::{Lister, Source}; use common_datasource::object_store::{build_backend, parse_url}; use common_datasource::util::find_dir_and_filename; +use common_query::{OutputCost, OutputRows}; use common_recordbatch::adapter::RecordBatchStreamTypeAdapter; use common_recordbatch::DfSendableRecordBatchStream; use common_telemetry::{debug, tracing}; @@ -328,7 +331,7 @@ impl StatementExecutor { &self, req: CopyTableRequest, query_ctx: QueryContextRef, - ) -> Result { + ) -> Result { let table_ref = TableReference { catalog: &req.catalog_name, schema: &req.schema_name, @@ -373,6 +376,7 @@ impl StatementExecutor { } let mut rows_inserted = 0; + let mut insert_cost = 0; for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files { let mut stream = self @@ -419,28 +423,40 @@ impl StatementExecutor { )); if pending_mem_size as u64 >= pending_mem_threshold { - rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?; + let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?; + rows_inserted += rows; + insert_cost += cost; } } if !pending.is_empty() { - rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?; + let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?; + rows_inserted += rows; + insert_cost += cost; } } - Ok(rows_inserted) + Ok(Output::new( + OutputData::AffectedRows(rows_inserted), + OutputMeta::new_with_cost(insert_cost), + )) } } /// Executes all pending inserts all at once, drain pending requests and reset pending bytes. async fn batch_insert( - pending: &mut Vec>>, + pending: &mut Vec>>, pending_bytes: &mut usize, -) -> Result { +) -> Result<(OutputRows, OutputCost)> { let batch = pending.drain(..); - let res: usize = futures::future::try_join_all(batch).await?.iter().sum(); + let result = futures::future::try_join_all(batch) + .await? + .iter() + .map(|o| o.extract_rows_and_cost()) + .reduce(|(a, b), (c, d)| (a + c, b + d)) + .unwrap_or((0, 0)); *pending_bytes = 0; - Ok(res) + Ok(result) } fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> { diff --git a/src/operator/src/table.rs b/src/operator/src/table.rs index b516597e0b..52c37bb401 100644 --- a/src/operator/src/table.rs +++ b/src/operator/src/table.rs @@ -13,6 +13,7 @@ // limitations under the License. use async_trait::async_trait; +use client::Output; use common_base::AffectedRows; use common_error::ext::BoxedError; use common_function::handlers::TableMutationHandler; @@ -52,7 +53,7 @@ impl TableMutationHandler for TableMutationOperator { &self, request: TableInsertRequest, ctx: QueryContextRef, - ) -> QueryResult { + ) -> QueryResult { self.inserter .handle_table_insert(request, ctx) .await diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 0343f0b5fd..b6f0e9543f 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -128,6 +128,8 @@ impl DatafusionQueryEngine { }; let mut affected_rows = 0; + let mut insert_cost = 0; + while let Some(batch) = stream.next().await { let batch = batch.context(CreateRecordBatchSnafu)?; let column_vectors = batch @@ -135,20 +137,27 @@ impl DatafusionQueryEngine { .map_err(BoxedError::new) .context(QueryExecutionSnafu)?; - let rows = match dml.op { + match dml.op { WriteOp::InsertInto => { - self.insert(&table_name, column_vectors, query_ctx.clone()) - .await? + let output = self + .insert(&table_name, column_vectors, query_ctx.clone()) + .await?; + let (rows, cost) = output.extract_rows_and_cost(); + affected_rows += rows; + insert_cost += cost; } WriteOp::Delete => { - self.delete(&table_name, &table, column_vectors, query_ctx.clone()) - .await? + affected_rows += self + .delete(&table_name, &table, column_vectors, query_ctx.clone()) + .await?; } _ => unreachable!("guarded by the 'ensure!' at the beginning"), - }; - affected_rows += rows; + } } - Ok(Output::new_with_affected_rows(affected_rows)) + Ok(Output::new( + OutputData::AffectedRows(affected_rows), + OutputMeta::new_with_cost(insert_cost), + )) } #[tracing::instrument(skip_all)] @@ -201,7 +210,7 @@ impl DatafusionQueryEngine { table_name: &ResolvedTableReference<'a>, column_vectors: HashMap, query_ctx: QueryContextRef, - ) -> Result { + ) -> Result { let request = InsertRequest { catalog_name: table_name.catalog.to_string(), schema_name: table_name.schema.to_string(), diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index cca0081d4d..ed31ecb0c8 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -22,7 +22,7 @@ use common_base::bytes::Bytes; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::BoxedError; use common_meta::table_name::TableName; -use common_plugins::GREPTIME_EXEC_COST; +use common_plugins::GREPTIME_EXEC_READ_COST; use common_query::physical_plan::TaskContext; use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::error::ExternalSnafu; @@ -338,7 +338,7 @@ impl MergeScanMetric { first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1), finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1), output_rows: MetricBuilder::new(metric).output_rows(1), - greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_COST, 1), + greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1), } } diff --git a/src/servers/src/grpc/otlp.rs b/src/servers/src/grpc/otlp.rs index 9edc191e8f..c96aed7af1 100644 --- a/src/servers/src/grpc/otlp.rs +++ b/src/servers/src/grpc/otlp.rs @@ -52,9 +52,11 @@ impl TraceService for OtlpService { .cloned() .context(error::MissingQueryContextSnafu)?; - let res = self.handler.traces(req, ctx).await?; + let _ = self.handler.traces(req, ctx).await?; - Ok(Response::new(res)) + Ok(Response::new(ExportTraceServiceResponse { + partial_success: None, + })) } } @@ -71,8 +73,10 @@ impl MetricsService for OtlpService { .cloned() .context(error::MissingQueryContextSnafu)?; - let res = self.handler.metrics(req, ctx).await?; + let _ = self.handler.metrics(req, ctx).await?; - Ok(Response::new(res)) + Ok(Response::new(ExportMetricsServiceResponse { + partial_success: None, + })) } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index 7207f591e5..7506ebbddb 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; use std::env; -use std::sync::Arc; use std::time::Instant; use aide::transform::TransformOperation; @@ -23,18 +22,17 @@ use axum::response::{IntoResponse, Response}; use axum::{Extension, Form}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; -use common_plugins::GREPTIME_EXEC_PREFIX; -use common_query::physical_plan::PhysicalPlan; +use common_plugins::GREPTIME_EXEC_WRITE_COST; use common_query::{Output, OutputData}; use common_recordbatch::util; use common_telemetry::tracing; -use datafusion::physical_plan::metrics::MetricValue; use query::parser::PromQuery; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use serde_json::Value; use session::context::QueryContextRef; +use super::header::collect_plan_metrics; use crate::http::arrow_result::ArrowResponse; use crate::http::csv_result::CsvResponse; use crate::http::error_result::ErrorResponse; @@ -143,6 +141,9 @@ pub async fn from_output( Ok(o) => match o.data { OutputData::AffectedRows(rows) => { results.push(GreptimeQueryOutput::AffectedRows(rows)); + if o.meta.cost > 0 { + merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64); + } } OutputData::Stream(stream) => { let schema = stream.schema().clone(); @@ -166,8 +167,8 @@ pub async fn from_output( let re = result_map .into_iter() .map(|(k, v)| (k, Value::from(v))) - .collect(); - http_record_output.metrics = re; + .collect::>(); + http_record_output.metrics.extend(re); } results.push(GreptimeQueryOutput::Records(http_record_output)) } @@ -197,42 +198,6 @@ pub async fn from_output( Ok((results, merge_map)) } -fn collect_into_maps(name: &str, value: u64, maps: &mut [&mut HashMap]) { - if name.starts_with(GREPTIME_EXEC_PREFIX) && value > 0 { - maps.iter_mut().for_each(|map| { - map.entry(name.to_string()) - .and_modify(|v| *v += value) - .or_insert(value); - }); - } -} - -pub fn collect_plan_metrics(plan: Arc, maps: &mut [&mut HashMap]) { - if let Some(m) = plan.metrics() { - m.iter().for_each(|m| match m.value() { - MetricValue::Count { name, count } => { - collect_into_maps(name, count.value() as u64, maps); - } - MetricValue::Gauge { name, gauge } => { - collect_into_maps(name, gauge.value() as u64, maps); - } - MetricValue::Time { name, time } => { - if name.starts_with(GREPTIME_EXEC_PREFIX) { - // override - maps.iter_mut().for_each(|map| { - map.insert(name.to_string(), time.value() as u64); - }); - } - } - _ => {} - }); - } - - for c in plan.children() { - collect_plan_metrics(c, maps); - } -} - #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct PromqlQuery { pub query: String, diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index ec28f6a446..bd0eebff9d 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -12,7 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; +use std::sync::Arc; + +use common_plugins::GREPTIME_EXEC_PREFIX; +use common_query::physical_plan::PhysicalPlan; +use datafusion::physical_plan::metrics::MetricValue; use headers::{Header, HeaderName, HeaderValue}; +use hyper::HeaderMap; +use serde_json::Value; pub mod constants { // New HTTP headers would better distinguish use cases among: @@ -53,6 +61,9 @@ pub static GREPTIME_DB_HEADER_NAME: HeaderName = pub static GREPTIME_TIMEZONE_HEADER_NAME: HeaderName = HeaderName::from_static(constants::GREPTIME_TIMEZONE_HEADER_NAME); +pub static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf"); +pub static CONTENT_ENCODING_SNAPPY: HeaderValue = HeaderValue::from_static("snappy"); + pub struct GreptimeDbName(Option); impl Header for GreptimeDbName { @@ -87,3 +98,56 @@ impl GreptimeDbName { self.0.as_ref() } } + +// collect write +pub fn write_cost_header_map(cost: usize) -> HeaderMap { + let mut header_map = HeaderMap::new(); + if cost > 0 { + let mut map: HashMap = HashMap::new(); + map.insert( + common_plugins::GREPTIME_EXEC_WRITE_COST.to_string(), + Value::from(cost), + ); + let _ = serde_json::to_string(&map) + .ok() + .and_then(|s| HeaderValue::from_str(&s).ok()) + .and_then(|v| header_map.insert(&GREPTIME_DB_HEADER_METRICS, v)); + } + header_map +} + +fn collect_into_maps(name: &str, value: u64, maps: &mut [&mut HashMap]) { + if name.starts_with(GREPTIME_EXEC_PREFIX) && value > 0 { + maps.iter_mut().for_each(|map| { + map.entry(name.to_string()) + .and_modify(|v| *v += value) + .or_insert(value); + }); + } +} + +pub fn collect_plan_metrics(plan: Arc, maps: &mut [&mut HashMap]) { + if let Some(m) = plan.metrics() { + m.iter().for_each(|m| match m.value() { + MetricValue::Count { name, count } => { + collect_into_maps(name, count.value() as u64, maps); + } + MetricValue::Gauge { name, gauge } => { + collect_into_maps(name, gauge.value() as u64, maps); + } + MetricValue::Time { name, time } => { + if name.starts_with(GREPTIME_EXEC_PREFIX) { + // override + maps.iter_mut().for_each(|map| { + map.insert(name.to_string(), time.value() as u64); + }); + } + } + _ => {} + }); + } + + for c in plan.children() { + collect_plan_metrics(c, maps); + } +} diff --git a/src/servers/src/http/influxdb.rs b/src/servers/src/http/influxdb.rs index 4b8a606b68..4ebce72fb8 100644 --- a/src/servers/src/http/influxdb.rs +++ b/src/servers/src/http/influxdb.rs @@ -23,6 +23,7 @@ use common_grpc::writer::Precision; use common_telemetry::tracing; use session::context::QueryContextRef; +use super::header::write_cost_header_map; use crate::error::{Result, TimePrecisionSnafu}; use crate::influxdb::InfluxdbRequest; use crate::query_handler::InfluxdbLineProtocolHandlerRef; @@ -93,9 +94,12 @@ pub async fn influxdb_write( .start_timer(); let request = InfluxdbRequest { precision, lines }; - handler.exec(request, ctx).await?; + let output = handler.exec(request, ctx).await?; - Ok((StatusCode::NO_CONTENT, ())) + Ok(( + StatusCode::NO_CONTENT, + write_cost_header_map(output.meta.cost), + )) } fn parse_time_precision(value: &str) -> Result { diff --git a/src/servers/src/http/otlp.rs b/src/servers/src/http/otlp.rs index b113ae951c..17f98a3915 100644 --- a/src/servers/src/http/otlp.rs +++ b/src/servers/src/http/otlp.rs @@ -28,6 +28,7 @@ use prost::Message; use session::context::QueryContextRef; use snafu::prelude::*; +use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF}; use crate::error::{self, Result}; use crate::query_handler::OpenTelemetryProtocolHandlerRef; @@ -43,10 +44,16 @@ pub async fn metrics( .with_label_values(&[db.as_str()]) .start_timer(); let request = parse_metrics_body(body).await?; + handler .metrics(request, query_ctx) .await - .map(OtlpMetricsResponse) + .map(|o| OtlpMetricsResponse { + resp_body: ExportMetricsServiceResponse { + partial_success: None, + }, + write_cost: o.meta.cost, + }) } async fn parse_metrics_body(body: Body) -> Result { @@ -58,15 +65,17 @@ async fn parse_metrics_body(body: Body) -> Result { }) } -pub struct OtlpMetricsResponse(ExportMetricsServiceResponse); +pub struct OtlpMetricsResponse { + resp_body: ExportMetricsServiceResponse, + write_cost: usize, +} impl IntoResponse for OtlpMetricsResponse { fn into_response(self) -> axum::response::Response { - ( - [(header::CONTENT_TYPE, "application/x-protobuf")], - self.0.encode_to_vec(), - ) - .into_response() + let mut header_map = write_cost_header_map(self.write_cost); + header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); + + (header_map, self.resp_body.encode_to_vec()).into_response() } } @@ -85,7 +94,12 @@ pub async fn traces( handler .traces(request, query_ctx) .await - .map(OtlpTracesResponse) + .map(|o| OtlpTracesResponse { + resp_body: ExportTraceServiceResponse { + partial_success: None, + }, + write_cost: o.meta.cost, + }) } async fn parse_traces_body(body: Body) -> Result { @@ -97,14 +111,16 @@ async fn parse_traces_body(body: Body) -> Result { }) } -pub struct OtlpTracesResponse(ExportTraceServiceResponse); +pub struct OtlpTracesResponse { + resp_body: ExportTraceServiceResponse, + write_cost: usize, +} impl IntoResponse for OtlpTracesResponse { fn into_response(self) -> axum::response::Response { - ( - [(header::CONTENT_TYPE, "application/x-protobuf")], - self.0.encode_to_vec(), - ) - .into_response() + let mut header_map = write_cost_header_map(self.write_cost); + header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone()); + + (header_map, self.resp_body.encode_to_vec()).into_response() } } diff --git a/src/servers/src/http/prom_store.rs b/src/servers/src/http/prom_store.rs index 5e30b47fc0..514bf6d22c 100644 --- a/src/servers/src/http/prom_store.rs +++ b/src/servers/src/http/prom_store.rs @@ -17,14 +17,14 @@ use std::sync::Arc; use api::prom_store::remote::{ReadRequest, WriteRequest}; use api::v1::RowInsertRequests; use axum::extract::{Query, RawBody, State}; -use axum::http::{header, StatusCode}; +use axum::http::{header, HeaderValue, StatusCode}; use axum::response::IntoResponse; use axum::Extension; use bytes::Bytes; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_query::prelude::GREPTIME_PHYSICAL_TABLE; use common_telemetry::tracing; -use hyper::Body; +use hyper::{Body, HeaderMap}; use lazy_static::lazy_static; use object_pool::Pool; use prost::Message; @@ -33,6 +33,7 @@ use serde::{Deserialize, Serialize}; use session::context::QueryContextRef; use snafu::prelude::*; +use super::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS}; use crate::error::{self, Result, UnexpectedPhysicalTableSnafu}; use crate::prom_store::snappy_decompress; use crate::proto::PromWriteRequest; @@ -68,7 +69,7 @@ pub async fn route_write_without_metric_engine( Query(params): Query, Extension(query_ctx): Extension, RawBody(body): RawBody, -) -> Result<(StatusCode, ())> { +) -> Result { let db = params.db.clone().unwrap_or_default(); let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED .with_label_values(&[db.as_str()]) @@ -80,8 +81,11 @@ pub async fn route_write_without_metric_engine( return UnexpectedPhysicalTableSnafu {}.fail(); } - handler.write(request, query_ctx, false).await?; - Ok((StatusCode::NO_CONTENT, ())) + let output = handler.write(request, query_ctx, false).await?; + Ok(( + StatusCode::NO_CONTENT, + write_cost_header_map(output.meta.cost), + )) } #[axum_macros::debug_handler] @@ -94,7 +98,7 @@ pub async fn remote_write( Query(params): Query, Extension(mut query_ctx): Extension, RawBody(body): RawBody, -) -> Result<(StatusCode, ())> { +) -> Result { let db = params.db.clone().unwrap_or_default(); let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED .with_label_values(&[db.as_str()]) @@ -108,20 +112,29 @@ pub async fn remote_write( query_ctx = Arc::new(new_query_ctx); } - handler.write_fast(request, query_ctx, true).await?; - Ok((StatusCode::NO_CONTENT, ())) + let output = handler.write_fast(request, query_ctx, true).await?; + Ok(( + StatusCode::NO_CONTENT, + write_cost_header_map(output.meta.cost), + )) } impl IntoResponse for PromStoreResponse { fn into_response(self) -> axum::response::Response { - ( - [ - (header::CONTENT_TYPE, self.content_type), - (header::CONTENT_ENCODING, self.content_encoding), - ], - self.body, - ) - .into_response() + let mut header_map = HeaderMap::new(); + header_map.insert(&header::CONTENT_TYPE, self.content_type); + header_map.insert(&header::CONTENT_ENCODING, self.content_encoding); + + let metrics = if self.resp_metrics.is_empty() { + None + } else { + serde_json::to_string(&self.resp_metrics).ok() + }; + if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) { + header_map.insert(&GREPTIME_DB_HEADER_METRICS, m); + } + + (header_map, self.body).into_response() } } diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index bfece7a907..af5567993f 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -39,6 +39,7 @@ use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING}; use schemars::JsonSchema; use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; +use serde_json::Value; use session::context::QueryContextRef; use snafu::{Location, ResultExt}; @@ -46,6 +47,7 @@ pub use super::prometheus_resp::PrometheusJsonResponse; use crate::error::{ CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, UnexpectedResultSnafu, }; +use crate::http::header::collect_plan_metrics; use crate::prom_store::METRIC_NAME_LABEL; use crate::prometheus_handler::PrometheusHandlerRef; @@ -275,6 +277,7 @@ pub async fn labels_query( let mut labels = HashSet::new(); let _ = labels.insert(METRIC_NAME.to_string()); + let mut merge_map = HashMap::new(); for query in queries { let prom_query = PromQuery { query, @@ -284,10 +287,9 @@ pub async fn labels_query( }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; - - let response = retrieve_labels_name_from_query_result(result, &mut labels).await; - - if let Err(err) = response { + if let Err(err) = + retrieve_labels_name_from_query_result(result, &mut labels, &mut merge_map).await + { // Prometheus won't report error if querying nonexist label and metric if err.status_code() != StatusCode::TableNotFound && err.status_code() != StatusCode::TableColumnNotFound @@ -305,7 +307,13 @@ pub async fn labels_query( let mut sorted_labels: Vec = labels.into_iter().collect(); sorted_labels.sort(); - PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels)) + let merge_map = merge_map + .into_iter() + .map(|(k, v)| (k, Value::from(v))) + .collect(); + let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels)); + resp.resp_metrics = merge_map; + resp } async fn get_all_column_names( @@ -335,48 +343,53 @@ async fn retrieve_series_from_query_result( result: Result, series: &mut Vec>, table_name: &str, + metrics: &mut HashMap, ) -> Result<()> { - match result?.data { - OutputData::RecordBatches(batches) => { - record_batches_to_series(batches, series, table_name)?; - Ok(()) - } + let result = result?; + match result.data { + OutputData::RecordBatches(batches) => record_batches_to_series(batches, series, table_name), OutputData::Stream(stream) => { let batches = RecordBatches::try_collect(stream) .await .context(CollectRecordbatchSnafu)?; - record_batches_to_series(batches, series, table_name)?; - Ok(()) + record_batches_to_series(batches, series, table_name) } OutputData::AffectedRows(_) => Err(Error::UnexpectedResult { reason: "expected data result, but got affected rows".to_string(), location: Location::default(), }), + }?; + + if let Some(ref plan) = result.meta.plan { + collect_plan_metrics(plan.clone(), &mut [metrics]); } + Ok(()) } /// Retrieve labels name from query result async fn retrieve_labels_name_from_query_result( result: Result, labels: &mut HashSet, + metrics: &mut HashMap, ) -> Result<()> { - match result?.data { - OutputData::RecordBatches(batches) => { - record_batches_to_labels_name(batches, labels)?; - Ok(()) - } + let result = result?; + match result.data { + OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels), OutputData::Stream(stream) => { let batches = RecordBatches::try_collect(stream) .await .context(CollectRecordbatchSnafu)?; - record_batches_to_labels_name(batches, labels)?; - Ok(()) + record_batches_to_labels_name(batches, labels) } OutputData::AffectedRows(_) => UnexpectedResultSnafu { reason: "expected data result, but got affected rows".to_string(), } .fail(), + }?; + if let Some(ref plan) = result.meta.plan { + collect_plan_metrics(plan.clone(), &mut [metrics]); } + Ok(()) } fn record_batches_to_series( @@ -537,6 +550,7 @@ pub async fn label_values_query( let mut label_values = HashSet::new(); + let mut merge_map = HashMap::new(); for query in queries { let prom_query = PromQuery { query, @@ -545,8 +559,9 @@ pub async fn label_values_query( step: DEFAULT_LOOKBACK_STRING.to_string(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; - let result = retrieve_label_values(result, &label_name, &mut label_values).await; - if let Err(err) = result { + if let Err(err) = + retrieve_label_values(result, &label_name, &mut label_values, &mut merge_map).await + { // Prometheus won't report error if querying nonexist label and metric if err.status_code() != StatusCode::TableNotFound && err.status_code() != StatusCode::TableColumnNotFound @@ -559,17 +574,26 @@ pub async fn label_values_query( } } + let merge_map = merge_map + .into_iter() + .map(|(k, v)| (k, Value::from(v))) + .collect(); + let mut label_values: Vec<_> = label_values.into_iter().collect(); label_values.sort(); - PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values)) + let mut resp = PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values)); + resp.resp_metrics = merge_map; + resp } async fn retrieve_label_values( result: Result, label_name: &str, labels_values: &mut HashSet, + metrics: &mut HashMap, ) -> Result<()> { - match result?.data { + let result = result?; + match result.data { OutputData::RecordBatches(batches) => { retrieve_label_values_from_record_batch(batches, label_name, labels_values).await } @@ -583,7 +607,13 @@ async fn retrieve_label_values( reason: "expected data result, but got affected rows".to_string(), } .fail(), + }?; + + if let Some(ref plan) = result.meta.plan { + collect_plan_metrics(plan.clone(), &mut [metrics]); } + + Ok(()) } async fn retrieve_label_values_from_record_batch( @@ -658,6 +688,7 @@ pub async fn series_query( .unwrap_or_else(current_time_rfc3339); let mut series = Vec::new(); + let mut merge_map = HashMap::new(); for query in queries { let table_name = query.clone(); let prom_query = PromQuery { @@ -668,10 +699,19 @@ pub async fn series_query( step: DEFAULT_LOOKBACK_STRING.to_string(), }; let result = handler.do_query(&prom_query, query_ctx.clone()).await; - if let Err(err) = retrieve_series_from_query_result(result, &mut series, &table_name).await + + if let Err(err) = + retrieve_series_from_query_result(result, &mut series, &table_name, &mut merge_map) + .await { return PrometheusJsonResponse::error(err.status_code().to_string(), err.output_msg()); } } - PrometheusJsonResponse::success(PrometheusResponse::Series(series)) + let merge_map = merge_map + .into_iter() + .map(|(k, v)| (k, Value::from(v))) + .collect(); + let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series)); + resp.resp_metrics = merge_map; + resp } diff --git a/src/servers/src/http/prometheus_resp.rs b/src/servers/src/http/prometheus_resp.rs index 775d7e3c11..975fdc3bfc 100644 --- a/src/servers/src/http/prometheus_resp.rs +++ b/src/servers/src/http/prometheus_resp.rs @@ -32,8 +32,7 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use snafu::{OptionExt, ResultExt}; -use super::handler::collect_plan_metrics; -use super::header::GREPTIME_DB_HEADER_METRICS; +use super::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS}; use super::prometheus::{PromData, PromSeries, PrometheusResponse}; use crate::error::{CollectRecordbatchSnafu, InternalSnafu, Result}; diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index 347ec52456..f68dc1d02e 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -32,12 +32,10 @@ use api::prom_store::remote::{ReadRequest, WriteRequest}; use api::v1::RowInsertRequests; use async_trait::async_trait; use common_query::Output; -use opentelemetry_proto::tonic::collector::metrics::v1::{ - ExportMetricsServiceRequest, ExportMetricsServiceResponse, -}; -use opentelemetry_proto::tonic::collector::trace::v1::{ - ExportTraceServiceRequest, ExportTraceServiceResponse, -}; +use headers::HeaderValue; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; +use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; +use serde_json::Value; use session::context::QueryContextRef; use crate::error::Result; @@ -71,7 +69,7 @@ pub trait ScriptHandler { pub trait InfluxdbLineProtocolHandler { /// A successful request will not return a response. /// Only on error will the socket return a line of data. - async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()>; + async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result; } #[async_trait] @@ -82,8 +80,9 @@ pub trait OpentsdbProtocolHandler { } pub struct PromStoreResponse { - pub content_type: String, - pub content_encoding: String, + pub content_type: HeaderValue, + pub content_encoding: HeaderValue, + pub resp_metrics: HashMap, pub body: Vec, } @@ -95,7 +94,7 @@ pub trait PromStoreProtocolHandler { request: WriteRequest, ctx: QueryContextRef, with_metric_engine: bool, - ) -> Result<()>; + ) -> Result; /// Handling prometheus remote write requests async fn write_fast( @@ -103,7 +102,7 @@ pub trait PromStoreProtocolHandler { request: RowInsertRequests, ctx: QueryContextRef, with_metric_engine: bool, - ) -> Result<()>; + ) -> Result; /// Handling prometheus remote read requests async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result; @@ -118,12 +117,12 @@ pub trait OpenTelemetryProtocolHandler { &self, request: ExportMetricsServiceRequest, ctx: QueryContextRef, - ) -> Result; + ) -> Result; /// Handling opentelemetry traces request async fn traces( &self, request: ExportTraceServiceRequest, ctx: QueryContextRef, - ) -> Result; + ) -> Result; } diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index ff5f4c85af..fb000ec02e 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -55,7 +55,7 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl InfluxdbLineProtocolHandler for DummyInstance { - async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()> { + async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result { let requests: RowInsertRequests = request.try_into()?; for expr in requests.inserts { let _ = self @@ -64,7 +64,7 @@ impl InfluxdbLineProtocolHandler for DummyInstance { .await; } - Ok(()) + Ok(Output::new_with_affected_rows(0)) } } diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index f77c7d7cb2..5d6b12bc9d 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -29,6 +29,7 @@ use query::parser::PromQuery; use query::plan::LogicalPlan; use query::query_engine::DescribeResult; use servers::error::{Error, Result}; +use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF}; use servers::http::{HttpOptions, HttpServerBuilder}; use servers::prom_store; use servers::prom_store::{snappy_compress, Metrics}; @@ -57,13 +58,13 @@ impl GrpcQueryHandler for DummyInstance { #[async_trait] impl PromStoreProtocolHandler for DummyInstance { - async fn write(&self, request: WriteRequest, ctx: QueryContextRef, _: bool) -> Result<()> { + async fn write(&self, request: WriteRequest, ctx: QueryContextRef, _: bool) -> Result { let _ = self .tx .send((ctx.current_schema().to_owned(), request.encode_to_vec())) .await; - Ok(()) + Ok(Output::new_with_affected_rows(0)) } async fn write_fast( @@ -71,8 +72,8 @@ impl PromStoreProtocolHandler for DummyInstance { _request: RowInsertRequests, _ctx: QueryContextRef, _with_metric_engine: bool, - ) -> Result<()> { - Ok(()) + ) -> Result { + Ok(Output::new_with_affected_rows(0)) } async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result { @@ -88,8 +89,9 @@ impl PromStoreProtocolHandler for DummyInstance { }; Ok(PromStoreResponse { - content_type: "application/x-protobuf".to_string(), - content_encoding: "snappy".to_string(), + content_type: CONTENT_TYPE_PROTOBUF.clone(), + content_encoding: CONTENT_ENCODING_SNAPPY.clone(), + resp_metrics: Default::default(), body: response.encode_to_vec(), }) } diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs index b961bc2cf0..f72a7bae00 100644 --- a/tests-integration/src/otlp.rs +++ b/tests-integration/src/otlp.rs @@ -64,8 +64,8 @@ mod test { .unwrap() .is_ok()); - let resp = instance.metrics(req, ctx.clone()).await.unwrap(); - assert!(resp.partial_success.is_none()); + let resp = instance.metrics(req, ctx.clone()).await; + assert!(resp.is_ok()); let mut output = instance .do_query(