mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-27 18:30:38 +00:00
chore: http header with metrics (#3536)
* chore: bring write cost to output * chore: add write cost to greptimev1result * chore: add metrics to influxdb write resp header * chore: add metrics to prom store * chore: add metrics to otlp * chore: add debug log * fix: prom remote read with output * fix: prom queries don't output metrics header * chore: extract header value * chore: refactor code * chore: fix cr issue
This commit is contained in:
@@ -18,6 +18,7 @@ use async_trait::async_trait;
|
||||
use common_base::AffectedRows;
|
||||
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
|
||||
use common_query::error::Result;
|
||||
use common_query::Output;
|
||||
use session::context::QueryContextRef;
|
||||
use store_api::storage::RegionId;
|
||||
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
|
||||
@@ -26,7 +27,7 @@ use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, Ins
|
||||
#[async_trait]
|
||||
pub trait TableMutationHandler: Send + Sync {
|
||||
/// Inserts rows into the table.
|
||||
async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
|
||||
async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<Output>;
|
||||
|
||||
/// Delete rows from the table.
|
||||
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
|
||||
|
||||
@@ -35,6 +35,7 @@ impl FunctionState {
|
||||
use common_base::AffectedRows;
|
||||
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
|
||||
use common_query::error::Result;
|
||||
use common_query::Output;
|
||||
use session::context::QueryContextRef;
|
||||
use store_api::storage::RegionId;
|
||||
use table::requests::{
|
||||
@@ -70,8 +71,8 @@ impl FunctionState {
|
||||
&self,
|
||||
_request: InsertRequest,
|
||||
_ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
Ok(ROWS)
|
||||
) -> Result<Output> {
|
||||
Ok(Output::new_with_affected_rows(ROWS))
|
||||
}
|
||||
|
||||
async fn delete(
|
||||
|
||||
@@ -16,4 +16,5 @@
|
||||
pub const GREPTIME_EXEC_PREFIX: &str = "greptime_exec_";
|
||||
|
||||
/// Execution cost metrics key
|
||||
pub const GREPTIME_EXEC_COST: &str = "greptime_exec_cost";
|
||||
pub const GREPTIME_EXEC_READ_COST: &str = "greptime_exec_read_cost";
|
||||
pub const GREPTIME_EXEC_WRITE_COST: &str = "greptime_exec_write_cost";
|
||||
|
||||
@@ -17,4 +17,4 @@
|
||||
/// since `plugins` crate is at the top depending on crates like `frontend` and `datanode`
|
||||
mod consts;
|
||||
|
||||
pub use consts::{GREPTIME_EXEC_COST, GREPTIME_EXEC_PREFIX};
|
||||
pub use consts::{GREPTIME_EXEC_PREFIX, GREPTIME_EXEC_READ_COST, GREPTIME_EXEC_WRITE_COST};
|
||||
|
||||
@@ -40,7 +40,7 @@ pub struct Output {
|
||||
/// Original Output struct
|
||||
/// carrying result data to response/client/user interface
|
||||
pub enum OutputData {
|
||||
AffectedRows(usize),
|
||||
AffectedRows(OutputRows),
|
||||
RecordBatches(RecordBatches),
|
||||
Stream(SendableRecordBatchStream),
|
||||
}
|
||||
@@ -50,11 +50,11 @@ pub enum OutputData {
|
||||
pub struct OutputMeta {
|
||||
/// May exist for query output. One can retrieve execution metrics from this plan.
|
||||
pub plan: Option<Arc<dyn PhysicalPlan>>,
|
||||
pub cost: usize,
|
||||
pub cost: OutputCost,
|
||||
}
|
||||
|
||||
impl Output {
|
||||
pub fn new_with_affected_rows(affected_rows: usize) -> Self {
|
||||
pub fn new_with_affected_rows(affected_rows: OutputRows) -> Self {
|
||||
Self {
|
||||
data: OutputData::AffectedRows(affected_rows),
|
||||
meta: Default::default(),
|
||||
@@ -78,6 +78,13 @@ impl Output {
|
||||
pub fn new(data: OutputData, meta: OutputMeta) -> Self {
|
||||
Self { data, meta }
|
||||
}
|
||||
|
||||
pub fn extract_rows_and_cost(&self) -> (OutputRows, OutputCost) {
|
||||
match self.data {
|
||||
OutputData::AffectedRows(rows) => (rows, self.meta.cost),
|
||||
_ => (0, self.meta.cost),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for OutputData {
|
||||
@@ -133,3 +140,6 @@ impl From<&AddColumnLocation> for Location {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub type OutputRows = usize;
|
||||
pub type OutputCost = usize;
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use client::Output;
|
||||
use common_error::ext::BoxedError;
|
||||
use servers::error::{AuthSnafu, Error};
|
||||
use servers::influxdb::InfluxdbRequest;
|
||||
@@ -30,7 +31,7 @@ impl InfluxdbLineProtocolHandler for Instance {
|
||||
&self,
|
||||
request: InfluxdbRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> servers::error::Result<()> {
|
||||
) -> servers::error::Result<Output> {
|
||||
self.plugins
|
||||
.get::<PermissionCheckerRef>()
|
||||
.as_ref()
|
||||
@@ -41,11 +42,9 @@ impl InfluxdbLineProtocolHandler for Instance {
|
||||
interceptor_ref.pre_execute(&request.lines, ctx.clone())?;
|
||||
|
||||
let requests = request.try_into()?;
|
||||
let _ = self
|
||||
.handle_row_inserts(requests, ctx)
|
||||
self.handle_row_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(servers::error::ExecuteGrpcQuerySnafu)?;
|
||||
Ok(())
|
||||
.context(servers::error::ExecuteGrpcQuerySnafu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,14 +14,11 @@
|
||||
|
||||
use async_trait::async_trait;
|
||||
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
|
||||
use client::Output;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_telemetry::tracing;
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::{
|
||||
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
|
||||
};
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::{
|
||||
ExportTraceServiceRequest, ExportTraceServiceResponse,
|
||||
};
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use servers::error::{self, AuthSnafu, Result as ServerResult};
|
||||
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
|
||||
use servers::otlp;
|
||||
@@ -40,7 +37,7 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
&self,
|
||||
request: ExportMetricsServiceRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<ExportMetricsServiceResponse> {
|
||||
) -> ServerResult<Output> {
|
||||
self.plugins
|
||||
.get::<PermissionCheckerRef>()
|
||||
.as_ref()
|
||||
@@ -53,19 +50,12 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
interceptor_ref.pre_execute(ctx.clone())?;
|
||||
|
||||
let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
|
||||
let _ = self
|
||||
.handle_row_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
OTLP_METRICS_ROWS.inc_by(rows as u64);
|
||||
|
||||
let resp = ExportMetricsServiceResponse {
|
||||
// TODO(sunng87): add support for partial_success in future patch
|
||||
partial_success: None,
|
||||
};
|
||||
Ok(resp)
|
||||
self.handle_row_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -73,7 +63,7 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
&self,
|
||||
request: ExportTraceServiceRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> ServerResult<ExportTraceServiceResponse> {
|
||||
) -> ServerResult<Output> {
|
||||
self.plugins
|
||||
.get::<PermissionCheckerRef>()
|
||||
.as_ref()
|
||||
@@ -95,18 +85,11 @@ impl OpenTelemetryProtocolHandler for Instance {
|
||||
|
||||
let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;
|
||||
|
||||
let _ = self
|
||||
.handle_row_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
|
||||
OTLP_TRACES_ROWS.inc_by(rows as u64);
|
||||
|
||||
let resp = ExportTraceServiceResponse {
|
||||
// TODO(fys): add support for partial_success in future patch
|
||||
partial_success: None,
|
||||
};
|
||||
Ok(resp)
|
||||
self.handle_row_inserts(requests, ctx)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use api::prom_store::remote::read_request::ResponseType;
|
||||
@@ -30,6 +31,7 @@ use operator::insert::InserterRef;
|
||||
use operator::statement::StatementExecutor;
|
||||
use prost::Message;
|
||||
use servers::error::{self, AuthSnafu, Result as ServerResult};
|
||||
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
|
||||
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
|
||||
use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
|
||||
use servers::prom_store::{self, Metrics};
|
||||
@@ -165,7 +167,7 @@ impl PromStoreProtocolHandler for Instance {
|
||||
request: WriteRequest,
|
||||
ctx: QueryContextRef,
|
||||
with_metric_engine: bool,
|
||||
) -> ServerResult<()> {
|
||||
) -> ServerResult<Output> {
|
||||
self.plugins
|
||||
.get::<PermissionCheckerRef>()
|
||||
.as_ref()
|
||||
@@ -177,26 +179,24 @@ impl PromStoreProtocolHandler for Instance {
|
||||
interceptor_ref.pre_write(&request, ctx.clone())?;
|
||||
|
||||
let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?;
|
||||
if with_metric_engine {
|
||||
let output = if with_metric_engine {
|
||||
let physical_table = ctx
|
||||
.extension(PHYSICAL_TABLE_PARAM)
|
||||
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
|
||||
.to_string();
|
||||
let _ = self
|
||||
.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string())
|
||||
self.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
} else {
|
||||
let _ = self
|
||||
.handle_row_inserts(requests, ctx.clone())
|
||||
self.handle_row_inserts(requests, ctx.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
}
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
};
|
||||
|
||||
PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
|
||||
Ok(())
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
async fn write_fast(
|
||||
@@ -204,31 +204,30 @@ impl PromStoreProtocolHandler for Instance {
|
||||
request: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
with_metric_engine: bool,
|
||||
) -> ServerResult<()> {
|
||||
) -> ServerResult<Output> {
|
||||
self.plugins
|
||||
.get::<PermissionCheckerRef>()
|
||||
.as_ref()
|
||||
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
|
||||
.context(AuthSnafu)?;
|
||||
|
||||
if with_metric_engine {
|
||||
let output = if with_metric_engine {
|
||||
let physical_table = ctx
|
||||
.extension(PHYSICAL_TABLE_PARAM)
|
||||
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
|
||||
.to_string();
|
||||
let _ = self
|
||||
.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
|
||||
self.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
} else {
|
||||
let _ = self
|
||||
.handle_row_inserts(request, ctx.clone())
|
||||
self.handle_row_inserts(request, ctx.clone())
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
}
|
||||
Ok(())
|
||||
.context(error::ExecuteGrpcQuerySnafu)?
|
||||
};
|
||||
|
||||
Ok(output)
|
||||
}
|
||||
|
||||
async fn read(
|
||||
@@ -254,18 +253,29 @@ impl PromStoreProtocolHandler for Instance {
|
||||
match response_type {
|
||||
ResponseType::Samples => {
|
||||
let mut query_results = Vec::with_capacity(results.len());
|
||||
let mut map = HashMap::new();
|
||||
for (table_name, output) in results {
|
||||
let plan = output.meta.plan.clone();
|
||||
query_results.push(to_query_result(&table_name, output).await?);
|
||||
if let Some(ref plan) = plan {
|
||||
collect_plan_metrics(plan.clone(), &mut [&mut map]);
|
||||
}
|
||||
}
|
||||
|
||||
let response = ReadResponse {
|
||||
results: query_results,
|
||||
};
|
||||
|
||||
let resp_metrics = map
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, v.into()))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
// TODO(dennis): may consume too much memory, adds flow control
|
||||
Ok(PromStoreResponse {
|
||||
content_type: "application/x-protobuf".to_string(),
|
||||
content_encoding: "snappy".to_string(),
|
||||
content_type: CONTENT_TYPE_PROTOBUF.clone(),
|
||||
content_encoding: CONTENT_ENCODING_SNAPPY.clone(),
|
||||
resp_metrics,
|
||||
body: prom_store::snappy_compress(&response.encode_to_vec())?,
|
||||
})
|
||||
}
|
||||
@@ -309,7 +319,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
|
||||
request: WriteRequest,
|
||||
ctx: QueryContextRef,
|
||||
_: bool,
|
||||
) -> ServerResult<()> {
|
||||
) -> ServerResult<Output> {
|
||||
let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?;
|
||||
self.inserter
|
||||
.handle_metric_row_inserts(
|
||||
@@ -320,8 +330,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
|
||||
)
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(error::ExecuteGrpcQuerySnafu)?;
|
||||
Ok(())
|
||||
.context(error::ExecuteGrpcQuerySnafu)
|
||||
}
|
||||
|
||||
async fn write_fast(
|
||||
@@ -329,7 +338,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
|
||||
_request: RowInsertRequests,
|
||||
_ctx: QueryContextRef,
|
||||
_with_metric_engine: bool,
|
||||
) -> ServerResult<()> {
|
||||
) -> ServerResult<Output> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@ use api::v1::{
|
||||
RowInsertRequests, SemanticType,
|
||||
};
|
||||
use catalog::CatalogManagerRef;
|
||||
use client::{OutputData, OutputMeta};
|
||||
use common_catalog::consts::default_engine;
|
||||
use common_grpc_expr::util::{extract_new_columns, ColumnExpr};
|
||||
use common_meta::datanode_manager::{AffectedRows, DatanodeManagerRef};
|
||||
@@ -110,8 +111,7 @@ impl Inserter {
|
||||
.convert(requests)
|
||||
.await?;
|
||||
|
||||
let affected_rows = self.do_request(inserts, &ctx).await?;
|
||||
Ok(Output::new_with_affected_rows(affected_rows))
|
||||
self.do_request(inserts, &ctx).await
|
||||
}
|
||||
|
||||
/// Handle row inserts request with metric engine.
|
||||
@@ -148,15 +148,14 @@ impl Inserter {
|
||||
.convert(requests)
|
||||
.await?;
|
||||
|
||||
let affected_rows = self.do_request(inserts, &ctx).await?;
|
||||
Ok(Output::new_with_affected_rows(affected_rows))
|
||||
self.do_request(inserts, &ctx).await
|
||||
}
|
||||
|
||||
pub async fn handle_table_insert(
|
||||
&self,
|
||||
request: TableInsertRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
) -> Result<Output> {
|
||||
let catalog = request.catalog_name.as_str();
|
||||
let schema = request.schema_name.as_str();
|
||||
let table_name = request.table_name.as_str();
|
||||
@@ -170,8 +169,7 @@ impl Inserter {
|
||||
.convert(request)
|
||||
.await?;
|
||||
|
||||
let affected_rows = self.do_request(inserts, &ctx).await?;
|
||||
Ok(affected_rows as _)
|
||||
self.do_request(inserts, &ctx).await
|
||||
}
|
||||
|
||||
pub async fn handle_statement_insert(
|
||||
@@ -184,8 +182,7 @@ impl Inserter {
|
||||
.convert(insert, ctx)
|
||||
.await?;
|
||||
|
||||
let affected_rows = self.do_request(inserts, ctx).await?;
|
||||
Ok(Output::new_with_affected_rows(affected_rows))
|
||||
self.do_request(inserts, ctx).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,8 +191,8 @@ impl Inserter {
|
||||
&self,
|
||||
requests: RegionInsertRequests,
|
||||
ctx: &QueryContextRef,
|
||||
) -> Result<AffectedRows> {
|
||||
write_meter!(ctx.current_catalog(), ctx.current_schema(), requests);
|
||||
) -> Result<Output> {
|
||||
let write_cost = write_meter!(ctx.current_catalog(), ctx.current_schema(), requests);
|
||||
let request_factory = RegionRequestFactory::new(RegionRequestHeader {
|
||||
tracing_context: TracingContext::from_current_span().to_w3c(),
|
||||
dbname: ctx.get_db_string(),
|
||||
@@ -221,7 +218,10 @@ impl Inserter {
|
||||
|
||||
let affected_rows = results.into_iter().sum::<Result<AffectedRows>>()?;
|
||||
crate::metrics::DIST_INGEST_ROW_COUNT.inc_by(affected_rows as u64);
|
||||
Ok(affected_rows)
|
||||
Ok(Output::new(
|
||||
OutputData::AffectedRows(affected_rows),
|
||||
OutputMeta::new_with_cost(write_cost as _),
|
||||
))
|
||||
}
|
||||
|
||||
async fn group_requests_by_peer(
|
||||
|
||||
@@ -124,10 +124,7 @@ impl StatementExecutor {
|
||||
.copy_table_to(req, query_ctx)
|
||||
.await
|
||||
.map(Output::new_with_affected_rows),
|
||||
CopyDirection::Import => self
|
||||
.copy_table_from(req, query_ctx)
|
||||
.await
|
||||
.map(Output::new_with_affected_rows),
|
||||
CopyDirection::Import => self.copy_table_from(req, query_ctx).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
|
||||
use client::Output;
|
||||
use client::{Output, OutputData, OutputMeta};
|
||||
use common_datasource::file_format::Format;
|
||||
use common_datasource::lister::{Lister, Source};
|
||||
use common_datasource::object_store::build_backend;
|
||||
@@ -129,7 +129,9 @@ impl StatementExecutor {
|
||||
.get(CONTINUE_ON_ERROR_KEY)
|
||||
.and_then(|v| bool::from_str(v).ok())
|
||||
.unwrap_or(false);
|
||||
|
||||
let mut rows_inserted = 0;
|
||||
let mut insert_cost = 0;
|
||||
|
||||
for e in entries {
|
||||
let table_name = match parse_file_name_to_copy(&e) {
|
||||
@@ -156,8 +158,10 @@ impl StatementExecutor {
|
||||
};
|
||||
debug!("Copy table, arg: {:?}", req);
|
||||
match self.copy_table_from(req, ctx.clone()).await {
|
||||
Ok(rows) => {
|
||||
Ok(o) => {
|
||||
let (rows, cost) = o.extract_rows_and_cost();
|
||||
rows_inserted += rows;
|
||||
insert_cost += cost;
|
||||
}
|
||||
Err(err) => {
|
||||
if continue_on_error {
|
||||
@@ -169,7 +173,10 @@ impl StatementExecutor {
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(Output::new_with_affected_rows(rows_inserted))
|
||||
Ok(Output::new(
|
||||
OutputData::AffectedRows(rows_inserted),
|
||||
OutputMeta::new_with_cost(insert_cost),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -15,7 +15,9 @@
|
||||
use std::collections::HashMap;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::usize;
|
||||
|
||||
use client::{Output, OutputData, OutputMeta};
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_datasource::file_format::csv::{CsvConfigBuilder, CsvFormat, CsvOpener};
|
||||
use common_datasource::file_format::json::{JsonFormat, JsonOpener};
|
||||
@@ -24,6 +26,7 @@ use common_datasource::file_format::{FileFormat, Format};
|
||||
use common_datasource::lister::{Lister, Source};
|
||||
use common_datasource::object_store::{build_backend, parse_url};
|
||||
use common_datasource::util::find_dir_and_filename;
|
||||
use common_query::{OutputCost, OutputRows};
|
||||
use common_recordbatch::adapter::RecordBatchStreamTypeAdapter;
|
||||
use common_recordbatch::DfSendableRecordBatchStream;
|
||||
use common_telemetry::{debug, tracing};
|
||||
@@ -328,7 +331,7 @@ impl StatementExecutor {
|
||||
&self,
|
||||
req: CopyTableRequest,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<usize> {
|
||||
) -> Result<Output> {
|
||||
let table_ref = TableReference {
|
||||
catalog: &req.catalog_name,
|
||||
schema: &req.schema_name,
|
||||
@@ -373,6 +376,7 @@ impl StatementExecutor {
|
||||
}
|
||||
|
||||
let mut rows_inserted = 0;
|
||||
let mut insert_cost = 0;
|
||||
for (compat_schema, file_schema_projection, projected_table_schema, file_metadata) in files
|
||||
{
|
||||
let mut stream = self
|
||||
@@ -419,28 +423,40 @@ impl StatementExecutor {
|
||||
));
|
||||
|
||||
if pending_mem_size as u64 >= pending_mem_threshold {
|
||||
rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
|
||||
let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
|
||||
rows_inserted += rows;
|
||||
insert_cost += cost;
|
||||
}
|
||||
}
|
||||
|
||||
if !pending.is_empty() {
|
||||
rows_inserted += batch_insert(&mut pending, &mut pending_mem_size).await?;
|
||||
let (rows, cost) = batch_insert(&mut pending, &mut pending_mem_size).await?;
|
||||
rows_inserted += rows;
|
||||
insert_cost += cost;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(rows_inserted)
|
||||
Ok(Output::new(
|
||||
OutputData::AffectedRows(rows_inserted),
|
||||
OutputMeta::new_with_cost(insert_cost),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes all pending inserts all at once, drain pending requests and reset pending bytes.
|
||||
async fn batch_insert(
|
||||
pending: &mut Vec<impl Future<Output = Result<usize>>>,
|
||||
pending: &mut Vec<impl Future<Output = Result<Output>>>,
|
||||
pending_bytes: &mut usize,
|
||||
) -> Result<usize> {
|
||||
) -> Result<(OutputRows, OutputCost)> {
|
||||
let batch = pending.drain(..);
|
||||
let res: usize = futures::future::try_join_all(batch).await?.iter().sum();
|
||||
let result = futures::future::try_join_all(batch)
|
||||
.await?
|
||||
.iter()
|
||||
.map(|o| o.extract_rows_and_cost())
|
||||
.reduce(|(a, b), (c, d)| (a + c, b + d))
|
||||
.unwrap_or((0, 0));
|
||||
*pending_bytes = 0;
|
||||
Ok(res)
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn ensure_schema_compatible(from: &SchemaRef, to: &SchemaRef) -> Result<()> {
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use async_trait::async_trait;
|
||||
use client::Output;
|
||||
use common_base::AffectedRows;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_function::handlers::TableMutationHandler;
|
||||
@@ -52,7 +53,7 @@ impl TableMutationHandler for TableMutationOperator {
|
||||
&self,
|
||||
request: TableInsertRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> QueryResult<AffectedRows> {
|
||||
) -> QueryResult<Output> {
|
||||
self.inserter
|
||||
.handle_table_insert(request, ctx)
|
||||
.await
|
||||
|
||||
@@ -128,6 +128,8 @@ impl DatafusionQueryEngine {
|
||||
};
|
||||
|
||||
let mut affected_rows = 0;
|
||||
let mut insert_cost = 0;
|
||||
|
||||
while let Some(batch) = stream.next().await {
|
||||
let batch = batch.context(CreateRecordBatchSnafu)?;
|
||||
let column_vectors = batch
|
||||
@@ -135,20 +137,27 @@ impl DatafusionQueryEngine {
|
||||
.map_err(BoxedError::new)
|
||||
.context(QueryExecutionSnafu)?;
|
||||
|
||||
let rows = match dml.op {
|
||||
match dml.op {
|
||||
WriteOp::InsertInto => {
|
||||
self.insert(&table_name, column_vectors, query_ctx.clone())
|
||||
.await?
|
||||
let output = self
|
||||
.insert(&table_name, column_vectors, query_ctx.clone())
|
||||
.await?;
|
||||
let (rows, cost) = output.extract_rows_and_cost();
|
||||
affected_rows += rows;
|
||||
insert_cost += cost;
|
||||
}
|
||||
WriteOp::Delete => {
|
||||
self.delete(&table_name, &table, column_vectors, query_ctx.clone())
|
||||
.await?
|
||||
affected_rows += self
|
||||
.delete(&table_name, &table, column_vectors, query_ctx.clone())
|
||||
.await?;
|
||||
}
|
||||
_ => unreachable!("guarded by the 'ensure!' at the beginning"),
|
||||
};
|
||||
affected_rows += rows;
|
||||
}
|
||||
}
|
||||
Ok(Output::new_with_affected_rows(affected_rows))
|
||||
Ok(Output::new(
|
||||
OutputData::AffectedRows(affected_rows),
|
||||
OutputMeta::new_with_cost(insert_cost),
|
||||
))
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip_all)]
|
||||
@@ -201,7 +210,7 @@ impl DatafusionQueryEngine {
|
||||
table_name: &ResolvedTableReference<'a>,
|
||||
column_vectors: HashMap<String, VectorRef>,
|
||||
query_ctx: QueryContextRef,
|
||||
) -> Result<usize> {
|
||||
) -> Result<Output> {
|
||||
let request = InsertRequest {
|
||||
catalog_name: table_name.catalog.to_string(),
|
||||
schema_name: table_name.schema.to_string(),
|
||||
|
||||
@@ -22,7 +22,7 @@ use common_base::bytes::Bytes;
|
||||
use common_catalog::parse_catalog_and_schema_from_db_string;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_meta::table_name::TableName;
|
||||
use common_plugins::GREPTIME_EXEC_COST;
|
||||
use common_plugins::GREPTIME_EXEC_READ_COST;
|
||||
use common_query::physical_plan::TaskContext;
|
||||
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
|
||||
use common_recordbatch::error::ExternalSnafu;
|
||||
@@ -338,7 +338,7 @@ impl MergeScanMetric {
|
||||
first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
|
||||
finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
|
||||
output_rows: MetricBuilder::new(metric).output_rows(1),
|
||||
greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_COST, 1),
|
||||
greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -52,9 +52,11 @@ impl TraceService for OtlpService {
|
||||
.cloned()
|
||||
.context(error::MissingQueryContextSnafu)?;
|
||||
|
||||
let res = self.handler.traces(req, ctx).await?;
|
||||
let _ = self.handler.traces(req, ctx).await?;
|
||||
|
||||
Ok(Response::new(res))
|
||||
Ok(Response::new(ExportTraceServiceResponse {
|
||||
partial_success: None,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,8 +73,10 @@ impl MetricsService for OtlpService {
|
||||
.cloned()
|
||||
.context(error::MissingQueryContextSnafu)?;
|
||||
|
||||
let res = self.handler.metrics(req, ctx).await?;
|
||||
let _ = self.handler.metrics(req, ctx).await?;
|
||||
|
||||
Ok(Response::new(res))
|
||||
Ok(Response::new(ExportMetricsServiceResponse {
|
||||
partial_success: None,
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::env;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use aide::transform::TransformOperation;
|
||||
@@ -23,18 +22,17 @@ use axum::response::{IntoResponse, Response};
|
||||
use axum::{Extension, Form};
|
||||
use common_error::ext::ErrorExt;
|
||||
use common_error::status_code::StatusCode;
|
||||
use common_plugins::GREPTIME_EXEC_PREFIX;
|
||||
use common_query::physical_plan::PhysicalPlan;
|
||||
use common_plugins::GREPTIME_EXEC_WRITE_COST;
|
||||
use common_query::{Output, OutputData};
|
||||
use common_recordbatch::util;
|
||||
use common_telemetry::tracing;
|
||||
use datafusion::physical_plan::metrics::MetricValue;
|
||||
use query::parser::PromQuery;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
use super::header::collect_plan_metrics;
|
||||
use crate::http::arrow_result::ArrowResponse;
|
||||
use crate::http::csv_result::CsvResponse;
|
||||
use crate::http::error_result::ErrorResponse;
|
||||
@@ -143,6 +141,9 @@ pub async fn from_output(
|
||||
Ok(o) => match o.data {
|
||||
OutputData::AffectedRows(rows) => {
|
||||
results.push(GreptimeQueryOutput::AffectedRows(rows));
|
||||
if o.meta.cost > 0 {
|
||||
merge_map.insert(GREPTIME_EXEC_WRITE_COST.to_string(), o.meta.cost as u64);
|
||||
}
|
||||
}
|
||||
OutputData::Stream(stream) => {
|
||||
let schema = stream.schema().clone();
|
||||
@@ -166,8 +167,8 @@ pub async fn from_output(
|
||||
let re = result_map
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, Value::from(v)))
|
||||
.collect();
|
||||
http_record_output.metrics = re;
|
||||
.collect::<HashMap<String, Value>>();
|
||||
http_record_output.metrics.extend(re);
|
||||
}
|
||||
results.push(GreptimeQueryOutput::Records(http_record_output))
|
||||
}
|
||||
@@ -197,42 +198,6 @@ pub async fn from_output(
|
||||
Ok((results, merge_map))
|
||||
}
|
||||
|
||||
fn collect_into_maps(name: &str, value: u64, maps: &mut [&mut HashMap<String, u64>]) {
|
||||
if name.starts_with(GREPTIME_EXEC_PREFIX) && value > 0 {
|
||||
maps.iter_mut().for_each(|map| {
|
||||
map.entry(name.to_string())
|
||||
.and_modify(|v| *v += value)
|
||||
.or_insert(value);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn collect_plan_metrics(plan: Arc<dyn PhysicalPlan>, maps: &mut [&mut HashMap<String, u64>]) {
|
||||
if let Some(m) = plan.metrics() {
|
||||
m.iter().for_each(|m| match m.value() {
|
||||
MetricValue::Count { name, count } => {
|
||||
collect_into_maps(name, count.value() as u64, maps);
|
||||
}
|
||||
MetricValue::Gauge { name, gauge } => {
|
||||
collect_into_maps(name, gauge.value() as u64, maps);
|
||||
}
|
||||
MetricValue::Time { name, time } => {
|
||||
if name.starts_with(GREPTIME_EXEC_PREFIX) {
|
||||
// override
|
||||
maps.iter_mut().for_each(|map| {
|
||||
map.insert(name.to_string(), time.value() as u64);
|
||||
});
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
});
|
||||
}
|
||||
|
||||
for c in plan.children() {
|
||||
collect_plan_metrics(c, maps);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
|
||||
pub struct PromqlQuery {
|
||||
pub query: String,
|
||||
|
||||
@@ -12,7 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_plugins::GREPTIME_EXEC_PREFIX;
|
||||
use common_query::physical_plan::PhysicalPlan;
|
||||
use datafusion::physical_plan::metrics::MetricValue;
|
||||
use headers::{Header, HeaderName, HeaderValue};
|
||||
use hyper::HeaderMap;
|
||||
use serde_json::Value;
|
||||
|
||||
pub mod constants {
|
||||
// New HTTP headers would better distinguish use cases among:
|
||||
@@ -53,6 +61,9 @@ pub static GREPTIME_DB_HEADER_NAME: HeaderName =
|
||||
pub static GREPTIME_TIMEZONE_HEADER_NAME: HeaderName =
|
||||
HeaderName::from_static(constants::GREPTIME_TIMEZONE_HEADER_NAME);
|
||||
|
||||
pub static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static("application/x-protobuf");
|
||||
pub static CONTENT_ENCODING_SNAPPY: HeaderValue = HeaderValue::from_static("snappy");
|
||||
|
||||
pub struct GreptimeDbName(Option<String>);
|
||||
|
||||
impl Header for GreptimeDbName {
|
||||
@@ -87,3 +98,56 @@ impl GreptimeDbName {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
// collect write
|
||||
pub fn write_cost_header_map(cost: usize) -> HeaderMap {
|
||||
let mut header_map = HeaderMap::new();
|
||||
if cost > 0 {
|
||||
let mut map: HashMap<String, Value> = HashMap::new();
|
||||
map.insert(
|
||||
common_plugins::GREPTIME_EXEC_WRITE_COST.to_string(),
|
||||
Value::from(cost),
|
||||
);
|
||||
let _ = serde_json::to_string(&map)
|
||||
.ok()
|
||||
.and_then(|s| HeaderValue::from_str(&s).ok())
|
||||
.and_then(|v| header_map.insert(&GREPTIME_DB_HEADER_METRICS, v));
|
||||
}
|
||||
header_map
|
||||
}
|
||||
|
||||
fn collect_into_maps(name: &str, value: u64, maps: &mut [&mut HashMap<String, u64>]) {
|
||||
if name.starts_with(GREPTIME_EXEC_PREFIX) && value > 0 {
|
||||
maps.iter_mut().for_each(|map| {
|
||||
map.entry(name.to_string())
|
||||
.and_modify(|v| *v += value)
|
||||
.or_insert(value);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
pub fn collect_plan_metrics(plan: Arc<dyn PhysicalPlan>, maps: &mut [&mut HashMap<String, u64>]) {
|
||||
if let Some(m) = plan.metrics() {
|
||||
m.iter().for_each(|m| match m.value() {
|
||||
MetricValue::Count { name, count } => {
|
||||
collect_into_maps(name, count.value() as u64, maps);
|
||||
}
|
||||
MetricValue::Gauge { name, gauge } => {
|
||||
collect_into_maps(name, gauge.value() as u64, maps);
|
||||
}
|
||||
MetricValue::Time { name, time } => {
|
||||
if name.starts_with(GREPTIME_EXEC_PREFIX) {
|
||||
// override
|
||||
maps.iter_mut().for_each(|map| {
|
||||
map.insert(name.to_string(), time.value() as u64);
|
||||
});
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
});
|
||||
}
|
||||
|
||||
for c in plan.children() {
|
||||
collect_plan_metrics(c, maps);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ use common_grpc::writer::Precision;
|
||||
use common_telemetry::tracing;
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
use super::header::write_cost_header_map;
|
||||
use crate::error::{Result, TimePrecisionSnafu};
|
||||
use crate::influxdb::InfluxdbRequest;
|
||||
use crate::query_handler::InfluxdbLineProtocolHandlerRef;
|
||||
@@ -93,9 +94,12 @@ pub async fn influxdb_write(
|
||||
.start_timer();
|
||||
|
||||
let request = InfluxdbRequest { precision, lines };
|
||||
handler.exec(request, ctx).await?;
|
||||
let output = handler.exec(request, ctx).await?;
|
||||
|
||||
Ok((StatusCode::NO_CONTENT, ()))
|
||||
Ok((
|
||||
StatusCode::NO_CONTENT,
|
||||
write_cost_header_map(output.meta.cost),
|
||||
))
|
||||
}
|
||||
|
||||
fn parse_time_precision(value: &str) -> Result<Precision> {
|
||||
|
||||
@@ -28,6 +28,7 @@ use prost::Message;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::prelude::*;
|
||||
|
||||
use super::header::{write_cost_header_map, CONTENT_TYPE_PROTOBUF};
|
||||
use crate::error::{self, Result};
|
||||
use crate::query_handler::OpenTelemetryProtocolHandlerRef;
|
||||
|
||||
@@ -43,10 +44,16 @@ pub async fn metrics(
|
||||
.with_label_values(&[db.as_str()])
|
||||
.start_timer();
|
||||
let request = parse_metrics_body(body).await?;
|
||||
|
||||
handler
|
||||
.metrics(request, query_ctx)
|
||||
.await
|
||||
.map(OtlpMetricsResponse)
|
||||
.map(|o| OtlpMetricsResponse {
|
||||
resp_body: ExportMetricsServiceResponse {
|
||||
partial_success: None,
|
||||
},
|
||||
write_cost: o.meta.cost,
|
||||
})
|
||||
}
|
||||
|
||||
async fn parse_metrics_body(body: Body) -> Result<ExportMetricsServiceRequest> {
|
||||
@@ -58,15 +65,17 @@ async fn parse_metrics_body(body: Body) -> Result<ExportMetricsServiceRequest> {
|
||||
})
|
||||
}
|
||||
|
||||
pub struct OtlpMetricsResponse(ExportMetricsServiceResponse);
|
||||
pub struct OtlpMetricsResponse {
|
||||
resp_body: ExportMetricsServiceResponse,
|
||||
write_cost: usize,
|
||||
}
|
||||
|
||||
impl IntoResponse for OtlpMetricsResponse {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
(
|
||||
[(header::CONTENT_TYPE, "application/x-protobuf")],
|
||||
self.0.encode_to_vec(),
|
||||
)
|
||||
.into_response()
|
||||
let mut header_map = write_cost_header_map(self.write_cost);
|
||||
header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone());
|
||||
|
||||
(header_map, self.resp_body.encode_to_vec()).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,7 +94,12 @@ pub async fn traces(
|
||||
handler
|
||||
.traces(request, query_ctx)
|
||||
.await
|
||||
.map(OtlpTracesResponse)
|
||||
.map(|o| OtlpTracesResponse {
|
||||
resp_body: ExportTraceServiceResponse {
|
||||
partial_success: None,
|
||||
},
|
||||
write_cost: o.meta.cost,
|
||||
})
|
||||
}
|
||||
|
||||
async fn parse_traces_body(body: Body) -> Result<ExportTraceServiceRequest> {
|
||||
@@ -97,14 +111,16 @@ async fn parse_traces_body(body: Body) -> Result<ExportTraceServiceRequest> {
|
||||
})
|
||||
}
|
||||
|
||||
pub struct OtlpTracesResponse(ExportTraceServiceResponse);
|
||||
pub struct OtlpTracesResponse {
|
||||
resp_body: ExportTraceServiceResponse,
|
||||
write_cost: usize,
|
||||
}
|
||||
|
||||
impl IntoResponse for OtlpTracesResponse {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
(
|
||||
[(header::CONTENT_TYPE, "application/x-protobuf")],
|
||||
self.0.encode_to_vec(),
|
||||
)
|
||||
.into_response()
|
||||
let mut header_map = write_cost_header_map(self.write_cost);
|
||||
header_map.insert(header::CONTENT_TYPE, CONTENT_TYPE_PROTOBUF.clone());
|
||||
|
||||
(header_map, self.resp_body.encode_to_vec()).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,14 +17,14 @@ use std::sync::Arc;
|
||||
use api::prom_store::remote::{ReadRequest, WriteRequest};
|
||||
use api::v1::RowInsertRequests;
|
||||
use axum::extract::{Query, RawBody, State};
|
||||
use axum::http::{header, StatusCode};
|
||||
use axum::http::{header, HeaderValue, StatusCode};
|
||||
use axum::response::IntoResponse;
|
||||
use axum::Extension;
|
||||
use bytes::Bytes;
|
||||
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
|
||||
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
|
||||
use common_telemetry::tracing;
|
||||
use hyper::Body;
|
||||
use hyper::{Body, HeaderMap};
|
||||
use lazy_static::lazy_static;
|
||||
use object_pool::Pool;
|
||||
use prost::Message;
|
||||
@@ -33,6 +33,7 @@ use serde::{Deserialize, Serialize};
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::prelude::*;
|
||||
|
||||
use super::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
|
||||
use crate::error::{self, Result, UnexpectedPhysicalTableSnafu};
|
||||
use crate::prom_store::snappy_decompress;
|
||||
use crate::proto::PromWriteRequest;
|
||||
@@ -68,7 +69,7 @@ pub async fn route_write_without_metric_engine(
|
||||
Query(params): Query<DatabaseQuery>,
|
||||
Extension(query_ctx): Extension<QueryContextRef>,
|
||||
RawBody(body): RawBody,
|
||||
) -> Result<(StatusCode, ())> {
|
||||
) -> Result<impl IntoResponse> {
|
||||
let db = params.db.clone().unwrap_or_default();
|
||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
|
||||
.with_label_values(&[db.as_str()])
|
||||
@@ -80,8 +81,11 @@ pub async fn route_write_without_metric_engine(
|
||||
return UnexpectedPhysicalTableSnafu {}.fail();
|
||||
}
|
||||
|
||||
handler.write(request, query_ctx, false).await?;
|
||||
Ok((StatusCode::NO_CONTENT, ()))
|
||||
let output = handler.write(request, query_ctx, false).await?;
|
||||
Ok((
|
||||
StatusCode::NO_CONTENT,
|
||||
write_cost_header_map(output.meta.cost),
|
||||
))
|
||||
}
|
||||
|
||||
#[axum_macros::debug_handler]
|
||||
@@ -94,7 +98,7 @@ pub async fn remote_write(
|
||||
Query(params): Query<DatabaseQuery>,
|
||||
Extension(mut query_ctx): Extension<QueryContextRef>,
|
||||
RawBody(body): RawBody,
|
||||
) -> Result<(StatusCode, ())> {
|
||||
) -> Result<impl IntoResponse> {
|
||||
let db = params.db.clone().unwrap_or_default();
|
||||
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
|
||||
.with_label_values(&[db.as_str()])
|
||||
@@ -108,20 +112,29 @@ pub async fn remote_write(
|
||||
query_ctx = Arc::new(new_query_ctx);
|
||||
}
|
||||
|
||||
handler.write_fast(request, query_ctx, true).await?;
|
||||
Ok((StatusCode::NO_CONTENT, ()))
|
||||
let output = handler.write_fast(request, query_ctx, true).await?;
|
||||
Ok((
|
||||
StatusCode::NO_CONTENT,
|
||||
write_cost_header_map(output.meta.cost),
|
||||
))
|
||||
}
|
||||
|
||||
impl IntoResponse for PromStoreResponse {
|
||||
fn into_response(self) -> axum::response::Response {
|
||||
(
|
||||
[
|
||||
(header::CONTENT_TYPE, self.content_type),
|
||||
(header::CONTENT_ENCODING, self.content_encoding),
|
||||
],
|
||||
self.body,
|
||||
)
|
||||
.into_response()
|
||||
let mut header_map = HeaderMap::new();
|
||||
header_map.insert(&header::CONTENT_TYPE, self.content_type);
|
||||
header_map.insert(&header::CONTENT_ENCODING, self.content_encoding);
|
||||
|
||||
let metrics = if self.resp_metrics.is_empty() {
|
||||
None
|
||||
} else {
|
||||
serde_json::to_string(&self.resp_metrics).ok()
|
||||
};
|
||||
if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
|
||||
header_map.insert(&GREPTIME_DB_HEADER_METRICS, m);
|
||||
}
|
||||
|
||||
(header_map, self.body).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -39,6 +39,7 @@ use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
|
||||
use schemars::JsonSchema;
|
||||
use serde::de::{self, MapAccess, Visitor};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use session::context::QueryContextRef;
|
||||
use snafu::{Location, ResultExt};
|
||||
|
||||
@@ -46,6 +47,7 @@ pub use super::prometheus_resp::PrometheusJsonResponse;
|
||||
use crate::error::{
|
||||
CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, UnexpectedResultSnafu,
|
||||
};
|
||||
use crate::http::header::collect_plan_metrics;
|
||||
use crate::prom_store::METRIC_NAME_LABEL;
|
||||
use crate::prometheus_handler::PrometheusHandlerRef;
|
||||
|
||||
@@ -275,6 +277,7 @@ pub async fn labels_query(
|
||||
let mut labels = HashSet::new();
|
||||
let _ = labels.insert(METRIC_NAME.to_string());
|
||||
|
||||
let mut merge_map = HashMap::new();
|
||||
for query in queries {
|
||||
let prom_query = PromQuery {
|
||||
query,
|
||||
@@ -284,10 +287,9 @@ pub async fn labels_query(
|
||||
};
|
||||
|
||||
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
|
||||
|
||||
let response = retrieve_labels_name_from_query_result(result, &mut labels).await;
|
||||
|
||||
if let Err(err) = response {
|
||||
if let Err(err) =
|
||||
retrieve_labels_name_from_query_result(result, &mut labels, &mut merge_map).await
|
||||
{
|
||||
// Prometheus won't report error if querying nonexist label and metric
|
||||
if err.status_code() != StatusCode::TableNotFound
|
||||
&& err.status_code() != StatusCode::TableColumnNotFound
|
||||
@@ -305,7 +307,13 @@ pub async fn labels_query(
|
||||
|
||||
let mut sorted_labels: Vec<String> = labels.into_iter().collect();
|
||||
sorted_labels.sort();
|
||||
PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels))
|
||||
let merge_map = merge_map
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, Value::from(v)))
|
||||
.collect();
|
||||
let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Labels(sorted_labels));
|
||||
resp.resp_metrics = merge_map;
|
||||
resp
|
||||
}
|
||||
|
||||
async fn get_all_column_names(
|
||||
@@ -335,48 +343,53 @@ async fn retrieve_series_from_query_result(
|
||||
result: Result<Output>,
|
||||
series: &mut Vec<HashMap<String, String>>,
|
||||
table_name: &str,
|
||||
metrics: &mut HashMap<String, u64>,
|
||||
) -> Result<()> {
|
||||
match result?.data {
|
||||
OutputData::RecordBatches(batches) => {
|
||||
record_batches_to_series(batches, series, table_name)?;
|
||||
Ok(())
|
||||
}
|
||||
let result = result?;
|
||||
match result.data {
|
||||
OutputData::RecordBatches(batches) => record_batches_to_series(batches, series, table_name),
|
||||
OutputData::Stream(stream) => {
|
||||
let batches = RecordBatches::try_collect(stream)
|
||||
.await
|
||||
.context(CollectRecordbatchSnafu)?;
|
||||
record_batches_to_series(batches, series, table_name)?;
|
||||
Ok(())
|
||||
record_batches_to_series(batches, series, table_name)
|
||||
}
|
||||
OutputData::AffectedRows(_) => Err(Error::UnexpectedResult {
|
||||
reason: "expected data result, but got affected rows".to_string(),
|
||||
location: Location::default(),
|
||||
}),
|
||||
}?;
|
||||
|
||||
if let Some(ref plan) = result.meta.plan {
|
||||
collect_plan_metrics(plan.clone(), &mut [metrics]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieve labels name from query result
|
||||
async fn retrieve_labels_name_from_query_result(
|
||||
result: Result<Output>,
|
||||
labels: &mut HashSet<String>,
|
||||
metrics: &mut HashMap<String, u64>,
|
||||
) -> Result<()> {
|
||||
match result?.data {
|
||||
OutputData::RecordBatches(batches) => {
|
||||
record_batches_to_labels_name(batches, labels)?;
|
||||
Ok(())
|
||||
}
|
||||
let result = result?;
|
||||
match result.data {
|
||||
OutputData::RecordBatches(batches) => record_batches_to_labels_name(batches, labels),
|
||||
OutputData::Stream(stream) => {
|
||||
let batches = RecordBatches::try_collect(stream)
|
||||
.await
|
||||
.context(CollectRecordbatchSnafu)?;
|
||||
record_batches_to_labels_name(batches, labels)?;
|
||||
Ok(())
|
||||
record_batches_to_labels_name(batches, labels)
|
||||
}
|
||||
OutputData::AffectedRows(_) => UnexpectedResultSnafu {
|
||||
reason: "expected data result, but got affected rows".to_string(),
|
||||
}
|
||||
.fail(),
|
||||
}?;
|
||||
if let Some(ref plan) = result.meta.plan {
|
||||
collect_plan_metrics(plan.clone(), &mut [metrics]);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn record_batches_to_series(
|
||||
@@ -537,6 +550,7 @@ pub async fn label_values_query(
|
||||
|
||||
let mut label_values = HashSet::new();
|
||||
|
||||
let mut merge_map = HashMap::new();
|
||||
for query in queries {
|
||||
let prom_query = PromQuery {
|
||||
query,
|
||||
@@ -545,8 +559,9 @@ pub async fn label_values_query(
|
||||
step: DEFAULT_LOOKBACK_STRING.to_string(),
|
||||
};
|
||||
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
|
||||
let result = retrieve_label_values(result, &label_name, &mut label_values).await;
|
||||
if let Err(err) = result {
|
||||
if let Err(err) =
|
||||
retrieve_label_values(result, &label_name, &mut label_values, &mut merge_map).await
|
||||
{
|
||||
// Prometheus won't report error if querying nonexist label and metric
|
||||
if err.status_code() != StatusCode::TableNotFound
|
||||
&& err.status_code() != StatusCode::TableColumnNotFound
|
||||
@@ -559,17 +574,26 @@ pub async fn label_values_query(
|
||||
}
|
||||
}
|
||||
|
||||
let merge_map = merge_map
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, Value::from(v)))
|
||||
.collect();
|
||||
|
||||
let mut label_values: Vec<_> = label_values.into_iter().collect();
|
||||
label_values.sort();
|
||||
PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values))
|
||||
let mut resp = PrometheusJsonResponse::success(PrometheusResponse::LabelValues(label_values));
|
||||
resp.resp_metrics = merge_map;
|
||||
resp
|
||||
}
|
||||
|
||||
async fn retrieve_label_values(
|
||||
result: Result<Output>,
|
||||
label_name: &str,
|
||||
labels_values: &mut HashSet<String>,
|
||||
metrics: &mut HashMap<String, u64>,
|
||||
) -> Result<()> {
|
||||
match result?.data {
|
||||
let result = result?;
|
||||
match result.data {
|
||||
OutputData::RecordBatches(batches) => {
|
||||
retrieve_label_values_from_record_batch(batches, label_name, labels_values).await
|
||||
}
|
||||
@@ -583,7 +607,13 @@ async fn retrieve_label_values(
|
||||
reason: "expected data result, but got affected rows".to_string(),
|
||||
}
|
||||
.fail(),
|
||||
}?;
|
||||
|
||||
if let Some(ref plan) = result.meta.plan {
|
||||
collect_plan_metrics(plan.clone(), &mut [metrics]);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn retrieve_label_values_from_record_batch(
|
||||
@@ -658,6 +688,7 @@ pub async fn series_query(
|
||||
.unwrap_or_else(current_time_rfc3339);
|
||||
|
||||
let mut series = Vec::new();
|
||||
let mut merge_map = HashMap::new();
|
||||
for query in queries {
|
||||
let table_name = query.clone();
|
||||
let prom_query = PromQuery {
|
||||
@@ -668,10 +699,19 @@ pub async fn series_query(
|
||||
step: DEFAULT_LOOKBACK_STRING.to_string(),
|
||||
};
|
||||
let result = handler.do_query(&prom_query, query_ctx.clone()).await;
|
||||
if let Err(err) = retrieve_series_from_query_result(result, &mut series, &table_name).await
|
||||
|
||||
if let Err(err) =
|
||||
retrieve_series_from_query_result(result, &mut series, &table_name, &mut merge_map)
|
||||
.await
|
||||
{
|
||||
return PrometheusJsonResponse::error(err.status_code().to_string(), err.output_msg());
|
||||
}
|
||||
}
|
||||
PrometheusJsonResponse::success(PrometheusResponse::Series(series))
|
||||
let merge_map = merge_map
|
||||
.into_iter()
|
||||
.map(|(k, v)| (k, Value::from(v)))
|
||||
.collect();
|
||||
let mut resp = PrometheusJsonResponse::success(PrometheusResponse::Series(series));
|
||||
resp.resp_metrics = merge_map;
|
||||
resp
|
||||
}
|
||||
|
||||
@@ -32,8 +32,7 @@ use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
|
||||
use super::handler::collect_plan_metrics;
|
||||
use super::header::GREPTIME_DB_HEADER_METRICS;
|
||||
use super::header::{collect_plan_metrics, GREPTIME_DB_HEADER_METRICS};
|
||||
use super::prometheus::{PromData, PromSeries, PrometheusResponse};
|
||||
use crate::error::{CollectRecordbatchSnafu, InternalSnafu, Result};
|
||||
|
||||
|
||||
@@ -32,12 +32,10 @@ use api::prom_store::remote::{ReadRequest, WriteRequest};
|
||||
use api::v1::RowInsertRequests;
|
||||
use async_trait::async_trait;
|
||||
use common_query::Output;
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::{
|
||||
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
|
||||
};
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::{
|
||||
ExportTraceServiceRequest, ExportTraceServiceResponse,
|
||||
};
|
||||
use headers::HeaderValue;
|
||||
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
|
||||
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
|
||||
use serde_json::Value;
|
||||
use session::context::QueryContextRef;
|
||||
|
||||
use crate::error::Result;
|
||||
@@ -71,7 +69,7 @@ pub trait ScriptHandler {
|
||||
pub trait InfluxdbLineProtocolHandler {
|
||||
/// A successful request will not return a response.
|
||||
/// Only on error will the socket return a line of data.
|
||||
async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()>;
|
||||
async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<Output>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -82,8 +80,9 @@ pub trait OpentsdbProtocolHandler {
|
||||
}
|
||||
|
||||
pub struct PromStoreResponse {
|
||||
pub content_type: String,
|
||||
pub content_encoding: String,
|
||||
pub content_type: HeaderValue,
|
||||
pub content_encoding: HeaderValue,
|
||||
pub resp_metrics: HashMap<String, Value>,
|
||||
pub body: Vec<u8>,
|
||||
}
|
||||
|
||||
@@ -95,7 +94,7 @@ pub trait PromStoreProtocolHandler {
|
||||
request: WriteRequest,
|
||||
ctx: QueryContextRef,
|
||||
with_metric_engine: bool,
|
||||
) -> Result<()>;
|
||||
) -> Result<Output>;
|
||||
|
||||
/// Handling prometheus remote write requests
|
||||
async fn write_fast(
|
||||
@@ -103,7 +102,7 @@ pub trait PromStoreProtocolHandler {
|
||||
request: RowInsertRequests,
|
||||
ctx: QueryContextRef,
|
||||
with_metric_engine: bool,
|
||||
) -> Result<()>;
|
||||
) -> Result<Output>;
|
||||
|
||||
/// Handling prometheus remote read requests
|
||||
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse>;
|
||||
@@ -118,12 +117,12 @@ pub trait OpenTelemetryProtocolHandler {
|
||||
&self,
|
||||
request: ExportMetricsServiceRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<ExportMetricsServiceResponse>;
|
||||
) -> Result<Output>;
|
||||
|
||||
/// Handling opentelemetry traces request
|
||||
async fn traces(
|
||||
&self,
|
||||
request: ExportTraceServiceRequest,
|
||||
ctx: QueryContextRef,
|
||||
) -> Result<ExportTraceServiceResponse>;
|
||||
) -> Result<Output>;
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl InfluxdbLineProtocolHandler for DummyInstance {
|
||||
async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<()> {
|
||||
async fn exec(&self, request: InfluxdbRequest, ctx: QueryContextRef) -> Result<Output> {
|
||||
let requests: RowInsertRequests = request.try_into()?;
|
||||
for expr in requests.inserts {
|
||||
let _ = self
|
||||
@@ -64,7 +64,7 @@ impl InfluxdbLineProtocolHandler for DummyInstance {
|
||||
.await;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -29,6 +29,7 @@ use query::parser::PromQuery;
|
||||
use query::plan::LogicalPlan;
|
||||
use query::query_engine::DescribeResult;
|
||||
use servers::error::{Error, Result};
|
||||
use servers::http::header::{CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
|
||||
use servers::http::{HttpOptions, HttpServerBuilder};
|
||||
use servers::prom_store;
|
||||
use servers::prom_store::{snappy_compress, Metrics};
|
||||
@@ -57,13 +58,13 @@ impl GrpcQueryHandler for DummyInstance {
|
||||
|
||||
#[async_trait]
|
||||
impl PromStoreProtocolHandler for DummyInstance {
|
||||
async fn write(&self, request: WriteRequest, ctx: QueryContextRef, _: bool) -> Result<()> {
|
||||
async fn write(&self, request: WriteRequest, ctx: QueryContextRef, _: bool) -> Result<Output> {
|
||||
let _ = self
|
||||
.tx
|
||||
.send((ctx.current_schema().to_owned(), request.encode_to_vec()))
|
||||
.await;
|
||||
|
||||
Ok(())
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
async fn write_fast(
|
||||
@@ -71,8 +72,8 @@ impl PromStoreProtocolHandler for DummyInstance {
|
||||
_request: RowInsertRequests,
|
||||
_ctx: QueryContextRef,
|
||||
_with_metric_engine: bool,
|
||||
) -> Result<()> {
|
||||
Ok(())
|
||||
) -> Result<Output> {
|
||||
Ok(Output::new_with_affected_rows(0))
|
||||
}
|
||||
|
||||
async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result<PromStoreResponse> {
|
||||
@@ -88,8 +89,9 @@ impl PromStoreProtocolHandler for DummyInstance {
|
||||
};
|
||||
|
||||
Ok(PromStoreResponse {
|
||||
content_type: "application/x-protobuf".to_string(),
|
||||
content_encoding: "snappy".to_string(),
|
||||
content_type: CONTENT_TYPE_PROTOBUF.clone(),
|
||||
content_encoding: CONTENT_ENCODING_SNAPPY.clone(),
|
||||
resp_metrics: Default::default(),
|
||||
body: response.encode_to_vec(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -64,8 +64,8 @@ mod test {
|
||||
.unwrap()
|
||||
.is_ok());
|
||||
|
||||
let resp = instance.metrics(req, ctx.clone()).await.unwrap();
|
||||
assert!(resp.partial_success.is_none());
|
||||
let resp = instance.metrics(req, ctx.clone()).await;
|
||||
assert!(resp.is_ok());
|
||||
|
||||
let mut output = instance
|
||||
.do_query(
|
||||
|
||||
Reference in New Issue
Block a user