refactor: bring metrics to http output (#3247)

* refactor: bring metrics to http output

* chore: remove unwrap

* chore: make walk plan accumulate

* chore: change field name and comment

* chore: add metrics to http resp header

* chore: move PrometheusJsonResponse to a separate file and impl IntoResponse

* chore: put metrics in prometheus resp header too
This commit is contained in:
shuiyisong
2024-02-20 11:25:18 +08:00
committed by GitHub
parent 6628c41c36
commit bf5e1905cd
58 changed files with 592 additions and 395 deletions

16
Cargo.lock generated
View File

@@ -1557,6 +1557,8 @@ dependencies = [
"prometheus",
"prost 0.12.3",
"rand",
"serde",
"serde_json",
"session",
"snafu",
"substrait 0.17.1",
@@ -1949,6 +1951,10 @@ dependencies = [
"uuid",
]
[[package]]
name = "common-plugins"
version = "0.6.0"
[[package]]
name = "common-procedure"
version = "0.6.0"
@@ -4700,9 +4706,9 @@ checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4"
[[package]]
name = "libgit2-sys"
version = "0.16.2+1.7.2"
version = "0.16.1+1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee4126d8b4ee5c9d9ea891dd875cfdc1e9d0950437179104b183d7d8a74d24e8"
checksum = "f2a2bb3680b094add03bb3732ec520ece34da31a8cd2d633d1389d0f0fb60d0c"
dependencies = [
"cc",
"libc",
@@ -5178,7 +5184,7 @@ dependencies = [
[[package]]
name = "meter-core"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d"
dependencies = [
"anymap",
"once_cell",
@@ -5188,7 +5194,7 @@ dependencies = [
[[package]]
name = "meter-macros"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f"
source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d"
dependencies = [
"meter-core",
]
@@ -7242,6 +7248,7 @@ dependencies = [
"common-function",
"common-macro",
"common-meta",
"common-plugins",
"common-query",
"common-recordbatch",
"common-telemetry",
@@ -8898,6 +8905,7 @@ dependencies = [
"common-macro",
"common-mem-prof",
"common-meta",
"common-plugins",
"common-query",
"common-recordbatch",
"common-runtime",

View File

@@ -18,6 +18,7 @@ members = [
"src/common/grpc-expr",
"src/common/mem-prof",
"src/common/meta",
"src/common/plugins",
"src/common/procedure",
"src/common/procedure-test",
"src/common/query",
@@ -100,7 +101,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" }
meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80b72716dcde47ec4161478416a5c6c21343364d" }
mockall = "0.11.4"
moka = "0.12"
num_cpus = "1.16"
@@ -164,6 +165,7 @@ common-grpc-expr = { path = "src/common/grpc-expr" }
common-macro = { path = "src/common/macro" }
common-mem-prof = { path = "src/common/mem-prof" }
common-meta = { path = "src/common/meta" }
common-plugins = { path = "src/common/plugins" }
common-procedure = { path = "src/common/procedure" }
common-procedure-test = { path = "src/common/procedure-test" }
common-query = { path = "src/common/query" }
@@ -201,7 +203,7 @@ table = { path = "src/table" }
[workspace.dependencies.meter-macros]
git = "https://github.com/GreptimeTeam/greptime-meter.git"
rev = "abbd357c1e193cd270ea65ee7652334a150b628f"
rev = "80b72716dcde47ec4161478416a5c6c21343364d"
[profile.release]
debug = 1

View File

@@ -504,7 +504,7 @@ async fn do_query(num_iter: usize, db: &Database, table_name: &str) {
let res = db.sql(&query).await.unwrap();
match res {
Output::AffectedRows(_) | Output::RecordBatches(_) => (),
Output::Stream(stream) => {
Output::Stream(stream, _) => {
stream.try_collect::<Vec<_>>().await.unwrap();
}
}

View File

@@ -34,6 +34,8 @@ parking_lot = "0.12"
prometheus.workspace = true
prost.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
session.workspace = true
snafu.workspace = true
tokio.workspace = true

View File

@@ -340,7 +340,7 @@ impl Database {
output_ordering: None,
metrics: Default::default(),
};
Ok(Output::Stream(Box::pin(record_batch_stream)))
Ok(Output::new_stream(Box::pin(record_batch_stream)))
}
}
}

View File

@@ -123,8 +123,8 @@ impl RegionRequester {
.fail();
};
let metrics_str = Arc::new(ArcSwapOption::from(None));
let ref_str = metrics_str.clone();
let metrics = Arc::new(ArcSwapOption::from(None));
let metrics_ref = metrics.clone();
let tracing_context = TracingContext::from_current_span();
@@ -140,7 +140,8 @@ impl RegionRequester {
match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::Metrics(s) => {
ref_str.swap(Some(Arc::new(s)));
let m = serde_json::from_str(&s).ok().map(Arc::new);
metrics_ref.swap(m);
break;
}
_ => {
@@ -159,7 +160,7 @@ impl RegionRequester {
schema,
stream,
output_ordering: None,
metrics: metrics_str,
metrics,
};
Ok(Box::pin(record_batch_stream))
}

View File

@@ -142,7 +142,7 @@ impl Export {
.with_context(|_| RequestDatabaseSnafu {
sql: "show databases".to_string(),
})?;
let Output::Stream(stream) = result else {
let Output::Stream(stream, _) = result else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)
@@ -183,7 +183,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream) = result else {
let Output::Stream(stream, _) = result else {
NotDataFromOutputSnafu.fail()?
};
let Some(record_batch) = collect(stream)
@@ -235,7 +235,7 @@ impl Export {
.sql(&sql)
.await
.with_context(|_| RequestDatabaseSnafu { sql })?;
let Output::Stream(stream) = result else {
let Output::Stream(stream, _) = result else {
NotDataFromOutputSnafu.fail()?
};
let record_batch = collect(stream)

View File

@@ -185,7 +185,7 @@ impl Repl {
.context(RequestDatabaseSnafu { sql: &sql })?;
let either = match output {
Output::Stream(s) => {
Output::Stream(s, _) => {
let x = RecordBatches::try_collect(s)
.await
.context(CollectRecordBatchesSnafu)?;

View File

@@ -0,0 +1,9 @@
[package]
name = "common-plugins"
version.workspace = true
edition.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Greptime preserved metrics prefix
pub const GREPTIME_EXEC_PREFIX: &str = "greptime_exec_";
/// Execution cost metrics key
pub const GREPTIME_EXEC_COST: &str = "greptime_exec_cost";

View File

@@ -0,0 +1,20 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// This crate is designed to be at the bottom of the depencey tree
/// to provide common and useful utils and consts to all plugin usage,
/// since `plugins` crate is at the top depending on crates like `frontend` and `datanode`
mod consts;
pub use consts::{GREPTIME_EXEC_COST, GREPTIME_EXEC_PREFIX};

View File

@@ -13,10 +13,12 @@
// limitations under the License.
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use api::greptime_proto::v1::add_column_location::LocationType;
use api::greptime_proto::v1::AddColumnLocation as Location;
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use physical_plan::PhysicalPlan;
use serde::{Deserialize, Serialize};
pub mod columnar_value;
@@ -32,7 +34,14 @@ use sqlparser_derive::{Visit, VisitMut};
pub enum Output {
AffectedRows(usize),
RecordBatches(RecordBatches),
Stream(SendableRecordBatchStream),
Stream(SendableRecordBatchStream, Option<Arc<dyn PhysicalPlan>>),
}
impl Output {
// helper function to build original `Output::Stream`
pub fn new_stream(stream: SendableRecordBatchStream) -> Self {
Output::Stream(stream, None)
}
}
impl Debug for Output {
@@ -42,7 +51,13 @@ impl Debug for Output {
Output::RecordBatches(recordbatches) => {
write!(f, "Output::RecordBatches({recordbatches:?})")
}
Output::Stream(_) => write!(f, "Output::Stream(<stream>)"),
Output::Stream(_, df) => {
if df.is_some() {
write!(f, "Output::Stream(<stream>, Some<physical_plan>)")
} else {
write!(f, "Output::Stream(<stream>)")
}
}
}
}
}

View File

@@ -182,7 +182,7 @@ pub struct RecordBatchStreamAdapter {
enum Metrics {
Unavailable,
Unresolved(Arc<dyn ExecutionPlan>),
Resolved(String),
Resolved(RecordBatchMetrics),
}
impl RecordBatchStreamAdapter {
@@ -222,9 +222,9 @@ impl RecordBatchStream for RecordBatchStreamAdapter {
self.schema.clone()
}
fn metrics(&self) -> Option<String> {
fn metrics(&self) -> Option<RecordBatchMetrics> {
match &self.metrics_2 {
Metrics::Resolved(metrics) => Some(metrics.clone()),
Metrics::Resolved(metrics) => Some(*metrics),
Metrics::Unavailable | Metrics::Unresolved(_) => None,
}
}
@@ -254,8 +254,7 @@ impl Stream for RecordBatchStreamAdapter {
let mut metrics_holder = RecordBatchMetrics::default();
collect_metrics(df_plan, &mut metrics_holder);
if metrics_holder.elapsed_compute != 0 || metrics_holder.memory_usage != 0 {
self.metrics_2 =
Metrics::Resolved(serde_json::to_string(&metrics_holder).unwrap());
self.metrics_2 = Metrics::Resolved(metrics_holder);
}
}
Poll::Ready(None)
@@ -285,7 +284,7 @@ fn collect_metrics(df_plan: &Arc<dyn ExecutionPlan>, result: &mut RecordBatchMet
/// [`RecordBatchMetrics`] carrys metrics value
/// from datanode to frontend through gRPC
#[derive(serde::Serialize, serde::Deserialize, Default, Debug)]
#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone, Copy)]
pub struct RecordBatchMetrics {
// cpu consumption in nanoseconds
pub elapsed_compute: usize,

View File

@@ -21,6 +21,7 @@ pub mod util;
use std::pin::Pin;
use std::sync::Arc;
use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
use datafusion::physical_plan::memory::MemoryStream;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
@@ -42,7 +43,7 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
None
}
fn metrics(&self) -> Option<String> {
fn metrics(&self) -> Option<RecordBatchMetrics> {
None
}
}
@@ -212,7 +213,7 @@ pub struct RecordBatchStreamWrapper<S> {
pub schema: SchemaRef,
pub stream: S,
pub output_ordering: Option<Vec<OrderOption>>,
pub metrics: Arc<ArcSwapOption<String>>,
pub metrics: Arc<ArcSwapOption<RecordBatchMetrics>>,
}
impl<S> RecordBatchStreamWrapper<S> {
@@ -238,8 +239,8 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
self.output_ordering.as_deref()
}
fn metrics(&self) -> Option<String> {
self.metrics.load().as_ref().map(|s| s.as_ref().clone())
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.metrics.load().as_ref().map(|s| *s.as_ref())
}
}

View File

@@ -28,7 +28,7 @@ pub async fn execute_and_check_output(db: &Database, sql: &str, expected: Expect
assert_eq!(*x, y, "actual: \n{}", x)
}
(Output::RecordBatches(_), ExpectedOutput::QueryResult(x))
| (Output::Stream(_), ExpectedOutput::QueryResult(x)) => {
| (Output::Stream(_, _), ExpectedOutput::QueryResult(x)) => {
check_output_stream(output, x).await
}
_ => panic!(),
@@ -37,7 +37,7 @@ pub async fn execute_and_check_output(db: &Database, sql: &str, expected: Expect
pub async fn check_output_stream(output: Output, expected: &str) {
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::Stream(stream, _) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};

View File

@@ -655,7 +655,7 @@ impl RegionServerInner {
Output::AffectedRows(_) | Output::RecordBatches(_) => {
UnsupportedOutputSnafu { expected: "stream" }.fail()
}
Output::Stream(stream) => Ok(stream),
Output::Stream(stream, _) => Ok(stream),
}
}

View File

@@ -75,7 +75,7 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult<Resp
}
async fn to_query_result(table_name: &str, output: Output) -> ServerResult<QueryResult> {
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream)

View File

@@ -135,7 +135,7 @@ impl StatementExecutor {
.await
.context(ExecLogicalPlanSnafu)?;
let stream = match output {
Output::Stream(stream) => stream,
Output::Stream(stream, _) => stream,
Output::RecordBatches(record_batches) => record_batches.as_stream(),
_ => unreachable!(),
};

View File

@@ -13,7 +13,6 @@
// limitations under the License.
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use catalog::kvbackend::MetaKvBackend;
@@ -29,10 +28,6 @@ use datafusion_expr::{lit, Operator};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::{ColumnSchema, SchemaBuilder};
use meta_client::client::MetaClient;
use meter_core::collect::Collect;
use meter_core::data::{ReadRecord, WriteRecord};
use meter_core::global::global_registry;
use meter_core::write_calc::WriteCalculator;
use partition::columns::RangeColumnsPartitionRule;
use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef};
use partition::partition::{PartitionBound, PartitionDef};
@@ -40,8 +35,6 @@ use partition::range::RangePartitionRule;
use partition::PartitionRuleRef;
use store_api::storage::RegionNumber;
use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder};
use table::meter_insert_request;
use table::requests::InsertRequest;
pub fn new_test_table_info(
table_id: u32,
@@ -421,47 +414,3 @@ async fn test_find_regions() {
partition::error::Error::FindRegions { .. }
));
}
#[derive(Default)]
struct MockCollector {
pub write_sum: AtomicU32,
}
impl Collect for MockCollector {
fn on_write(&self, record: WriteRecord) {
let _ = self
.write_sum
.fetch_add(record.byte_count, Ordering::Relaxed);
}
fn on_read(&self, _record: ReadRecord) {
todo!()
}
}
struct MockCalculator;
impl WriteCalculator<InsertRequest> for MockCalculator {
fn calc_byte(&self, _value: &InsertRequest) -> u32 {
1024 * 10
}
}
#[test]
#[ignore]
fn test_meter_insert_request() {
let collector = Arc::new(MockCollector::default());
global_registry().set_collector(collector.clone());
global_registry().register_calculator(Arc::new(MockCalculator));
let req = InsertRequest {
catalog_name: "greptime".to_string(),
schema_name: "public".to_string(),
table_name: "numbers".to_string(),
columns_values: Default::default(),
};
meter_insert_request!(req);
let re = collector.write_sum.load(Ordering::Relaxed);
assert_eq!(re, 1024 * 10);
}

View File

@@ -23,6 +23,7 @@ common-error.workspace = true
common-function.workspace = true
common-macro.workspace = true
common-meta.workspace = true
common-plugins.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true

View File

@@ -95,7 +95,10 @@ impl DatafusionQueryEngine {
optimized_physical_plan
};
Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?))
Ok(Output::Stream(
self.execute_stream(&ctx, &physical_plan)?,
Some(physical_plan),
))
}
#[tracing::instrument(skip_all)]
@@ -121,7 +124,7 @@ impl DatafusionQueryEngine {
.await?;
let mut stream = match output {
Output::RecordBatches(batches) => batches.as_stream(),
Output::Stream(stream) => stream,
Output::Stream(stream, _) => stream,
_ => unreachable!(),
};
@@ -601,7 +604,7 @@ mod tests {
let output = engine.execute(plan, QueryContext::arc()).await.unwrap();
match output {
Output::Stream(recordbatch) => {
Output::Stream(recordbatch, _) => {
let numbers = util::collect(recordbatch).await.unwrap();
assert_eq!(1, numbers.len());
assert_eq!(numbers[0].num_columns(), 1);

View File

@@ -22,15 +22,16 @@ use common_base::bytes::Bytes;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::BoxedError;
use common_meta::table_name::TableName;
use common_plugins::GREPTIME_EXEC_COST;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
};
use common_telemetry::tracing_context::TracingContext;
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning};
use datafusion_common::{Result, Statistics};
@@ -39,6 +40,7 @@ use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader};
use meter_core::data::ReadItem;
use meter_macros::read_meter;
use snafu::ResultExt;
use store_api::storage::RegionId;
@@ -212,18 +214,17 @@ impl MergeScanExec {
// reset poll timer
poll_timer = Instant::now();
}
if let Some(metrics) = stream
.metrics()
.and_then(|m| serde_json::from_str::<RecordBatchMetrics>(&m).ok())
{
if let Some(metrics) = stream.metrics() {
let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
read_meter!(
let value = read_meter!(
c,
s,
metrics.elapsed_compute as u64,
metrics.memory_usage as u64,
0
ReadItem {
cpu_time: metrics.elapsed_compute as u64,
table_scan: metrics.memory_usage as u64
}
);
metric.record_greptime_exec_cost(value as usize);
}
METRIC_MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
@@ -327,6 +328,9 @@ struct MergeScanMetric {
finish_time: Time,
/// Count of rows fetched from remote
output_rows: Count,
/// Gauge for greptime plan execution cost metrics for output
greptime_exec_cost: Gauge,
}
impl MergeScanMetric {
@@ -336,6 +340,7 @@ impl MergeScanMetric {
first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
output_rows: MetricBuilder::new(metric).output_rows(1),
greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_COST, 1),
}
}
@@ -354,4 +359,8 @@ impl MergeScanMetric {
pub fn record_output_batch_rows(&self, num_rows: usize) {
self.output_rows.add(num_rows);
}
pub fn record_greptime_exec_cost(&self, metrics: usize) {
self.greptime_exec_cost.add(metrics);
}
}

View File

@@ -43,7 +43,7 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec<RecordBatch> {
.plan(stmt, query_ctx.clone())
.await
.unwrap();
let Output::Stream(stream) = engine.execute(plan, query_ctx).await.unwrap() else {
let Output::Stream(stream, _) = engine.execute(plan, query_ctx).await.unwrap() else {
unreachable!()
};
util::collect(stream).await.unwrap()

View File

@@ -80,7 +80,7 @@ async fn test_datafusion_query_engine() -> Result<()> {
let output = engine.execute(plan, QueryContext::arc()).await?;
let recordbatch = match output {
Output::Stream(recordbatch) => recordbatch,
Output::Stream(recordbatch, _) => recordbatch,
_ => unreachable!(),
};

View File

@@ -69,7 +69,7 @@ async fn run_compiled(script: &PyScript) {
.await
.unwrap();
let _res = match output {
Output::Stream(s) => common_recordbatch::util::collect_batches(s).await.unwrap(),
Output::Stream(s, _) => common_recordbatch::util::collect_batches(s).await.unwrap(),
Output::RecordBatches(rbs) => rbs,
_ => unreachable!(),
};

View File

@@ -312,7 +312,7 @@ impl Script for PyScript {
.context(DatabaseQuerySnafu)?;
let copr = self.copr.clone();
match res {
Output::Stream(stream) => Ok(Output::Stream(Box::pin(CoprStream::try_new(
Output::Stream(stream, _) => Ok(Output::new_stream(Box::pin(CoprStream::try_new(
stream, copr, params, ctx,
)?))),
_ => unreachable!(),
@@ -411,7 +411,7 @@ def test(number) -> vector[u32]:
.await
.unwrap();
let res = common_recordbatch::util::collect_batches(match output {
Output::Stream(s) => s,
Output::Stream(s, _) => s,
_ => unreachable!(),
})
.await
@@ -472,7 +472,7 @@ def test(number) -> vector[u32]:
.await
.unwrap();
let res = common_recordbatch::util::collect_batches(match _output {
Output::Stream(s) => s,
Output::Stream(s, _) => s,
_ => todo!(),
})
.await
@@ -504,7 +504,7 @@ def test(a, b, c) -> vector[f64]:
.await
.unwrap();
match output {
Output::Stream(stream) => {
Output::Stream(stream, _) => {
let numbers = util::collect(stream).await.unwrap();
assert_eq!(1, numbers.len());
@@ -542,7 +542,7 @@ def test(a) -> vector[i64]:
.await
.unwrap();
match output {
Output::Stream(stream) => {
Output::Stream(stream, _) => {
let numbers = util::collect(stream).await.unwrap();
assert_eq!(1, numbers.len());

View File

@@ -403,7 +403,7 @@ impl PyQueryEngine {
Ok(Either::AffectedRows(cnt))
}
Ok(common_query::Output::RecordBatches(rbs)) => Ok(Either::Rb(rbs)),
Ok(common_query::Output::Stream(s)) => Ok(Either::Rb(
Ok(common_query::Output::Stream(s, _)) => Ok(Either::Rb(
common_recordbatch::util::collect_batches(s).await.unwrap(),
)),
Err(e) => Err(e),

View File

@@ -88,7 +88,7 @@ async fn integrated_py_copr_test() {
.await
.unwrap();
let res = match output {
Output::Stream(s) => common_recordbatch::util::collect_batches(s).await.unwrap(),
Output::Stream(s, _) => common_recordbatch::util::collect_batches(s).await.unwrap(),
Output::RecordBatches(rbs) => rbs,
_ => unreachable!(),
};

View File

@@ -231,7 +231,7 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptsTable<E> {
.await
.context(ExecuteInternalStatementSnafu)?;
let stream = match output {
Output::Stream(stream) => stream,
Output::Stream(stream, _) => stream,
Output::RecordBatches(record_batches) => record_batches.as_stream(),
_ => unreachable!(),
};
@@ -286,7 +286,7 @@ impl<E: ErrorExt + Send + Sync + 'static> ScriptsTable<E> {
.await
.context(ExecuteInternalStatementSnafu)?;
let stream = match output {
Output::Stream(stream) => stream,
Output::Stream(stream, _) => stream,
Output::RecordBatches(record_batches) => record_batches.as_stream(),
_ => unreachable!(),
};

View File

@@ -33,6 +33,7 @@ common-grpc-expr.workspace = true
common-macro.workspace = true
common-mem-prof = { workspace = true, optional = true }
common-meta.workspace = true
common-plugins.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-runtime.workspace = true

View File

@@ -52,7 +52,7 @@ impl GreptimeDatabase for DatabaseService {
}),
response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })),
},
Output::Stream(_) | Output::RecordBatches(_) => {
Output::Stream(_, _) | Output::RecordBatches(_) => {
return Err(Status::unimplemented("GreptimeDatabase::Handle for query"));
}
};
@@ -71,7 +71,7 @@ impl GreptimeDatabase for DatabaseService {
let output = self.handler.handle_request(request).await?;
match output {
Output::AffectedRows(rows) => affected_rows += rows,
Output::Stream(_) | Output::RecordBatches(_) => {
Output::Stream(_, _) | Output::RecordBatches(_) => {
return Err(Status::unimplemented(
"GreptimeDatabase::HandleRequests for query",
));

View File

@@ -165,7 +165,7 @@ fn to_flight_data_stream(
tracing_context: TracingContext,
) -> TonicStream<FlightData> {
match output {
Output::Stream(stream) => {
Output::Stream(stream, _) => {
let stream = FlightRecordBatchStream::new(stream, tracing_context);
Box::pin(stream) as _
}

View File

@@ -84,9 +84,11 @@ impl FlightRecordBatchStream {
}
}
// make last package to pass metrics
if let Some(m) = recordbatches.metrics() {
let metrics = FlightMessage::Metrics(m);
let _ = tx.send(Ok(metrics)).await;
if let Some(metrics_str) = recordbatches
.metrics()
.and_then(|m| serde_json::to_string(&m).ok())
{
let _ = tx.send(Ok(FlightMessage::Metrics(metrics_str))).await;
}
}
}

View File

@@ -126,7 +126,6 @@ impl PrometheusGatewayService {
err.status_code().to_string(),
err.output_msg(),
)
.0
}
};
// range query only returns matrix
@@ -134,8 +133,6 @@ impl PrometheusGatewayService {
result_type = ValueType::Matrix;
};
PrometheusJsonResponse::from_query_result(result, metric_name, result_type)
.await
.0
PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await
}
}

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::fmt::Display;
use std::net::SocketAddr;
use std::sync::Mutex as StdMutex;
@@ -82,6 +83,7 @@ pub mod otlp;
pub mod pprof;
pub mod prom_store;
pub mod prometheus;
mod prometheus_resp;
pub mod script;
pub mod arrow_result;
@@ -182,6 +184,11 @@ impl From<SchemaRef> for OutputSchema {
pub struct HttpRecordsOutput {
schema: OutputSchema,
rows: Vec<Vec<Value>>,
// plan level execution metrics
#[serde(skip_serializing_if = "HashMap::is_empty")]
#[serde(default)]
metrics: HashMap<String, Value>,
}
impl HttpRecordsOutput {
@@ -211,6 +218,7 @@ impl HttpRecordsOutput {
Ok(HttpRecordsOutput {
schema: OutputSchema::from(schema),
rows: vec![],
metrics: Default::default(),
})
} else {
let mut rows =
@@ -231,6 +239,7 @@ impl HttpRecordsOutput {
Ok(HttpRecordsOutput {
schema: OutputSchema::from(schema),
rows,
metrics: Default::default(),
})
}
}

View File

@@ -89,7 +89,7 @@ impl ArrowResponse {
}
}
Output::Stream(recordbatches) => {
Output::Stream(recordbatches, _) => {
let schema = recordbatches.schema();
match write_arrow_bytes(recordbatches, schema.arrow_schema()).await {
Ok(payload) => HttpResponse::Arrow(ArrowResponse {

View File

@@ -37,7 +37,7 @@ impl CsvResponse {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::Csv, outputs).await {
Err(err) => HttpResponse::Error(err),
Ok(output) => {
Ok((output, _)) => {
if output.len() > 1 {
HttpResponse::Error(ErrorResponse::from_error_message(
ResponseFormat::Csv,

View File

@@ -12,13 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_query::Output;
use reqwest::header::HeaderValue;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use super::header::GREPTIME_DB_HEADER_METRICS;
use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT};
use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat};
@@ -27,14 +31,20 @@ pub struct GreptimedbV1Response {
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub(crate) output: Vec<GreptimeQueryOutput>,
pub(crate) execution_time_ms: u64,
// placeholder for header value
#[serde(skip)]
#[serde(default)]
pub(crate) resp_metrics: HashMap<String, Value>,
}
impl GreptimedbV1Response {
pub async fn from_output(outputs: Vec<crate::error::Result<Output>>) -> HttpResponse {
match handler::from_output(ResponseFormat::GreptimedbV1, outputs).await {
Ok(output) => HttpResponse::GreptimedbV1(Self {
Ok((output, resp_metrics)) => HttpResponse::GreptimedbV1(Self {
output,
execution_time_ms: 0,
resp_metrics,
}),
Err(err) => HttpResponse::Error(err),
}
@@ -57,7 +67,14 @@ impl GreptimedbV1Response {
impl IntoResponse for GreptimedbV1Response {
fn into_response(self) -> Response {
let execution_time = self.execution_time_ms;
let metrics = if self.resp_metrics.is_empty() {
None
} else {
serde_json::to_string(&self.resp_metrics).ok()
};
let mut resp = Json(self).into_response();
resp.headers_mut().insert(
GREPTIME_DB_HEADER_FORMAT,
HeaderValue::from_static("greptimedb_v1"),
@@ -66,6 +83,10 @@ impl IntoResponse for GreptimedbV1Response {
GREPTIME_DB_HEADER_EXECUTION_TIME,
HeaderValue::from(execution_time),
);
if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
resp.headers_mut().insert(GREPTIME_DB_HEADER_METRICS, m);
}
resp
}
}

View File

@@ -14,6 +14,7 @@
use std::collections::HashMap;
use std::env;
use std::sync::Arc;
use std::time::Instant;
use aide::transform::TransformOperation;
@@ -22,11 +23,15 @@ use axum::response::{IntoResponse, Response};
use axum::{Extension, Form};
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_plugins::GREPTIME_EXEC_PREFIX;
use common_query::physical_plan::PhysicalPlan;
use common_query::Output;
use common_recordbatch::util;
use datafusion::physical_plan::metrics::MetricValue;
use query::parser::PromQuery;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use session::context::QueryContextRef;
use crate::http::arrow_result::ArrowResponse;
@@ -125,23 +130,23 @@ pub async fn sql(
pub async fn from_output(
ty: ResponseFormat,
outputs: Vec<crate::error::Result<Output>>,
) -> Result<Vec<GreptimeQueryOutput>, ErrorResponse> {
) -> Result<(Vec<GreptimeQueryOutput>, HashMap<String, Value>), ErrorResponse> {
// TODO(sunng87): this api response structure cannot represent error well.
// It hides successful execution results from error response
let mut results = Vec::with_capacity(outputs.len());
let mut merge_map = HashMap::new();
for out in outputs {
match out {
Ok(Output::AffectedRows(rows)) => {
results.push(GreptimeQueryOutput::AffectedRows(rows));
}
Ok(Output::Stream(stream)) => {
Ok(Output::Stream(stream, physical_plan)) => {
let schema = stream.schema().clone();
// TODO(sunng87): streaming response
match util::collect(stream).await {
let mut http_record_output = match util::collect(stream).await {
Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) {
Ok(rows) => {
results.push(GreptimeQueryOutput::Records(rows));
}
Ok(rows) => rows,
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
}
@@ -149,7 +154,19 @@ pub async fn from_output(
Err(err) => {
return Err(ErrorResponse::from_error(ty, err));
}
};
if let Some(physical_plan) = physical_plan {
let mut result_map = HashMap::new();
let mut tmp = vec![&mut merge_map, &mut result_map];
collect_plan_metrics(physical_plan, &mut tmp);
let re = result_map
.into_iter()
.map(|(k, v)| (k, Value::from(v)))
.collect();
http_record_output.metrics = re;
}
results.push(GreptimeQueryOutput::Records(http_record_output))
}
Ok(Output::RecordBatches(rbs)) => {
match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) {
@@ -167,7 +184,48 @@ pub async fn from_output(
}
}
Ok(results)
let merge_map = merge_map
.into_iter()
.map(|(k, v)| (k, Value::from(v)))
.collect();
Ok((results, merge_map))
}
fn collect_into_maps(name: &str, value: u64, maps: &mut [&mut HashMap<String, u64>]) {
if name.starts_with(GREPTIME_EXEC_PREFIX) && value > 0 {
maps.iter_mut().for_each(|map| {
map.entry(name.to_string())
.and_modify(|v| *v += value)
.or_insert(value);
});
}
}
pub fn collect_plan_metrics(plan: Arc<dyn PhysicalPlan>, maps: &mut [&mut HashMap<String, u64>]) {
if let Some(m) = plan.metrics() {
m.iter().for_each(|m| match m.value() {
MetricValue::Count { name, count } => {
collect_into_maps(name, count.value() as u64, maps);
}
MetricValue::Gauge { name, gauge } => {
collect_into_maps(name, gauge.value() as u64, maps);
}
MetricValue::Time { name, time } => {
if name.starts_with(GREPTIME_EXEC_PREFIX) {
// override
maps.iter_mut().for_each(|map| {
map.insert(name.to_string(), time.value() as u64);
});
}
}
_ => {}
});
}
for c in plan.children() {
collect_plan_metrics(c, maps);
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]

View File

@@ -16,6 +16,7 @@ use headers::{Header, HeaderName, HeaderValue};
pub const GREPTIME_DB_HEADER_FORMAT: &str = "x-greptime-format";
pub const GREPTIME_DB_HEADER_EXECUTION_TIME: &str = "x-greptime-execution-time";
pub const GREPTIME_DB_HEADER_METRICS: &str = "x-greptime-metrics";
/// Header key of `db-name`. Example format of the header value is `greptime-public`.
pub static GREPTIME_DB_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-db-name");

View File

@@ -159,7 +159,7 @@ impl InfluxdbV1Response {
series: vec![],
});
}
Ok(Output::Stream(stream)) => {
Ok(Output::Stream(stream, _)) => {
// TODO(sunng87): streaming response
match util::collect(stream).await {
Ok(rows) => match InfluxdbRecordsOutput::try_from((epoch, rows)) {

View File

@@ -13,10 +13,10 @@
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use axum::extract::{Path, Query, State};
use axum::{Extension, Form, Json};
use axum::{Extension, Form};
use catalog::CatalogManagerRef;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_catalog::parse_catalog_and_schema_from_db_string;
@@ -28,7 +28,7 @@ use common_recordbatch::RecordBatches;
use common_time::util::{current_time_rfc3339, yesterday_rfc3339};
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use datatypes::vectors::{Float64Vector, StringVector};
use promql_parser::label::METRIC_NAME;
use promql_parser::parser::{
AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr,
@@ -39,10 +39,11 @@ use schemars::JsonSchema;
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Serialize};
use session::context::QueryContextRef;
use snafu::{Location, OptionExt, ResultExt};
use snafu::{Location, ResultExt};
pub use super::prometheus_resp::PrometheusJsonResponse;
use crate::error::{
CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedResultSnafu,
CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, UnexpectedResultSnafu,
};
use crate::prom_store::METRIC_NAME_LABEL;
use crate::prometheus_handler::PrometheusHandlerRef;
@@ -80,218 +81,6 @@ impl Default for PrometheusResponse {
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PrometheusJsonResponse {
pub status: String,
pub data: PrometheusResponse,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "errorType")]
pub error_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub warnings: Option<Vec<String>>,
}
impl PrometheusJsonResponse {
pub fn error<S1, S2>(error_type: S1, reason: S2) -> Json<Self>
where
S1: Into<String>,
S2: Into<String>,
{
Json(PrometheusJsonResponse {
status: "error".to_string(),
data: PrometheusResponse::default(),
error: Some(reason.into()),
error_type: Some(error_type.into()),
warnings: None,
})
}
pub fn success(data: PrometheusResponse) -> Json<Self> {
Json(PrometheusJsonResponse {
status: "success".to_string(),
data,
error: None,
error_type: None,
warnings: None,
})
}
/// Convert from `Result<Output>`
pub async fn from_query_result(
result: Result<Output>,
metric_name: String,
result_type: ValueType,
) -> Json<Self> {
let response: Result<Json<Self>> = try {
let json = match result? {
Output::RecordBatches(batches) => Self::success(Self::record_batches_to_data(
batches,
metric_name,
result_type,
)?),
Output::Stream(stream) => {
let record_batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
Self::success(Self::record_batches_to_data(
record_batches,
metric_name,
result_type,
)?)
}
Output::AffectedRows(_) => {
Self::error("Unexpected", "expected data result, but got affected rows")
}
};
json
};
let result_type_string = result_type.to_string();
match response {
Ok(resp) => resp,
Err(err) => {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() == StatusCode::TableNotFound
|| err.status_code() == StatusCode::TableColumnNotFound
{
Self::success(PrometheusResponse::PromData(PromData {
result_type: result_type_string,
..Default::default()
}))
} else {
Self::error(err.status_code().to_string(), err.output_msg())
}
}
}
}
/// Convert [RecordBatches] to [PromData]
fn record_batches_to_data(
batches: RecordBatches,
metric_name: String,
result_type: ValueType,
) -> Result<PrometheusResponse> {
// infer semantic type of each column from schema.
// TODO(ruihang): wish there is a better way to do this.
let mut timestamp_column_index = None;
let mut tag_column_indices = Vec::new();
let mut first_field_column_index = None;
for (i, column) in batches.schema().column_schemas().iter().enumerate() {
match column.data_type {
ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
if timestamp_column_index.is_none() {
timestamp_column_index = Some(i);
}
}
ConcreteDataType::Float64(_) => {
if first_field_column_index.is_none() {
first_field_column_index = Some(i);
}
}
ConcreteDataType::String(_) => {
tag_column_indices.push(i);
}
_ => {}
}
}
let timestamp_column_index = timestamp_column_index.context(InternalSnafu {
err_msg: "no timestamp column found".to_string(),
})?;
let first_field_column_index = first_field_column_index.context(InternalSnafu {
err_msg: "no value column found".to_string(),
})?;
let metric_name = (METRIC_NAME.to_string(), metric_name);
let mut buffer = BTreeMap::<Vec<(String, String)>, Vec<(f64, String)>>::new();
for batch in batches.iter() {
// prepare things...
let tag_columns = tag_column_indices
.iter()
.map(|i| {
batch
.column(*i)
.as_any()
.downcast_ref::<StringVector>()
.unwrap()
})
.collect::<Vec<_>>();
let tag_names = tag_column_indices
.iter()
.map(|c| batches.schema().column_name_by_index(*c).to_string())
.collect::<Vec<_>>();
let timestamp_column = batch
.column(timestamp_column_index)
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
let field_column = batch
.column(first_field_column_index)
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap();
// assemble rows
for row_index in 0..batch.num_rows() {
// retrieve tags
// TODO(ruihang): push table name `__metric__`
let mut tags = vec![metric_name.clone()];
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
// TODO(ruihang): add test for NULL tag
if let Some(tag_value) = tag_column.get_data(row_index) {
tags.push((tag_name.to_string(), tag_value.to_string()));
}
}
// retrieve timestamp
let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into();
let timestamp = timestamp_millis as f64 / 1000.0;
// retrieve value
if let Some(v) = field_column.get_data(row_index) {
buffer
.entry(tags)
.or_default()
.push((timestamp, Into::<f64>::into(v).to_string()));
};
}
}
let result = buffer
.into_iter()
.map(|(tags, mut values)| {
let metric = tags.into_iter().collect();
match result_type {
ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries {
metric,
value: values.pop(),
..Default::default()
}),
ValueType::Matrix => Ok(PromSeries {
metric,
values,
..Default::default()
}),
}
})
.collect::<Result<Vec<_>>>()?;
let result_type_string = result_type.to_string();
let data = PrometheusResponse::PromData(PromData {
result_type: result_type_string,
result,
});
Ok(data)
}
}
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct FormatQuery {
query: Option<String>,
@@ -303,7 +92,7 @@ pub async fn format_query(
Query(params): Query<InstantQuery>,
Extension(_query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<InstantQuery>,
) -> Json<PrometheusJsonResponse> {
) -> PrometheusJsonResponse {
let _timer = crate::metrics::METRIC_HTTP_PROMQL_FORMAT_QUERY_ELAPSED.start_timer();
let query = params.query.or(form_params.query).unwrap_or_default();
@@ -333,7 +122,7 @@ pub async fn instant_query(
Query(params): Query<InstantQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<InstantQuery>,
) -> Json<PrometheusJsonResponse> {
) -> PrometheusJsonResponse {
let _timer = crate::metrics::METRIC_HTTP_PROMQL_INSTANT_QUERY_ELAPSED.start_timer();
// Extract time from query string, or use current server time if not specified.
let time = params
@@ -373,7 +162,7 @@ pub async fn range_query(
Query(params): Query<RangeQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<RangeQuery>,
) -> Json<PrometheusJsonResponse> {
) -> PrometheusJsonResponse {
let _timer = crate::metrics::METRIC_HTTP_PROMQL_RANGE_QUERY_ELAPSED.start_timer();
let prom_query = PromQuery {
query: params.query.or(form_params.query).unwrap_or_default(),
@@ -442,7 +231,7 @@ pub async fn labels_query(
Query(params): Query<LabelsQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<LabelsQuery>,
) -> Json<PrometheusJsonResponse> {
) -> PrometheusJsonResponse {
let _timer = crate::metrics::METRIC_HTTP_PROMQL_LABEL_QUERY_ELAPSED.start_timer();
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
@@ -541,7 +330,7 @@ async fn retrieve_series_from_query_result(
record_batches_to_series(batches, series, table_name)?;
Ok(())
}
Output::Stream(stream) => {
Output::Stream(stream, _) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
@@ -565,7 +354,7 @@ async fn retrieve_labels_name_from_query_result(
record_batches_to_labels_name(batches, labels)?;
Ok(())
}
Output::Stream(stream) => {
Output::Stream(stream, _) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
@@ -708,7 +497,7 @@ pub async fn label_values_query(
Path(label_name): Path<String>,
Extension(query_ctx): Extension<QueryContextRef>,
Query(params): Query<LabelValueQuery>,
) -> Json<PrometheusJsonResponse> {
) -> PrometheusJsonResponse {
let _timer = crate::metrics::METRIC_HTTP_PROMQL_LABEL_VALUE_QUERY_ELAPSED.start_timer();
let db = &params.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string());
@@ -771,7 +560,7 @@ async fn retrieve_label_values(
Output::RecordBatches(batches) => {
retrieve_label_values_from_record_batch(batches, label_name, labels_values).await
}
Output::Stream(stream) => {
Output::Stream(stream, _) => {
let batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
@@ -834,7 +623,7 @@ pub async fn series_query(
Query(params): Query<SeriesQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
Form(form_params): Form<SeriesQuery>,
) -> Json<PrometheusJsonResponse> {
) -> PrometheusJsonResponse {
let _timer = crate::metrics::METRIC_HTTP_PROMQL_SERIES_QUERY_ELAPSED.start_timer();
let mut queries: Vec<String> = params.matches.0;
if queries.is_empty() {

View File

@@ -0,0 +1,289 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! prom supply the prometheus HTTP API Server compliance
use std::collections::{BTreeMap, HashMap};
use axum::http::HeaderValue;
use axum::response::{IntoResponse, Response};
use axum::Json;
use common_error::ext::ErrorExt;
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::RecordBatches;
use datatypes::prelude::ConcreteDataType;
use datatypes::scalars::ScalarVector;
use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector};
use promql_parser::label::METRIC_NAME;
use promql_parser::parser::ValueType;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use snafu::{OptionExt, ResultExt};
use super::handler::collect_plan_metrics;
use super::header::GREPTIME_DB_HEADER_METRICS;
use super::prometheus::{PromData, PromSeries, PrometheusResponse};
use crate::error::{CollectRecordbatchSnafu, InternalSnafu, Result};
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)]
pub struct PrometheusJsonResponse {
pub status: String,
pub data: PrometheusResponse,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(rename = "errorType")]
pub error_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub warnings: Option<Vec<String>>,
// placeholder for header value
#[serde(skip)]
#[serde(default)]
pub resp_metrics: HashMap<String, Value>,
}
impl IntoResponse for PrometheusJsonResponse {
fn into_response(self) -> Response {
let metrics = if self.resp_metrics.is_empty() {
None
} else {
serde_json::to_string(&self.resp_metrics).ok()
};
let mut resp = Json(self).into_response();
if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) {
resp.headers_mut().insert(GREPTIME_DB_HEADER_METRICS, m);
}
resp
}
}
impl PrometheusJsonResponse {
pub fn error<S1, S2>(error_type: S1, reason: S2) -> Self
where
S1: Into<String>,
S2: Into<String>,
{
PrometheusJsonResponse {
status: "error".to_string(),
data: PrometheusResponse::default(),
error: Some(reason.into()),
error_type: Some(error_type.into()),
warnings: None,
resp_metrics: Default::default(),
}
}
pub fn success(data: PrometheusResponse) -> Self {
PrometheusJsonResponse {
status: "success".to_string(),
data,
error: None,
error_type: None,
warnings: None,
resp_metrics: Default::default(),
}
}
/// Convert from `Result<Output>`
pub async fn from_query_result(
result: Result<Output>,
metric_name: String,
result_type: ValueType,
) -> Self {
let response: Result<Self> = try {
let resp = match result? {
Output::RecordBatches(batches) => Self::success(Self::record_batches_to_data(
batches,
metric_name,
result_type,
)?),
Output::Stream(stream, physical_plan) => {
let record_batches = RecordBatches::try_collect(stream)
.await
.context(CollectRecordbatchSnafu)?;
let mut resp = Self::success(Self::record_batches_to_data(
record_batches,
metric_name,
result_type,
)?);
if let Some(physical_plan) = physical_plan {
let mut result_map = HashMap::new();
let mut tmp = vec![&mut result_map];
collect_plan_metrics(physical_plan, &mut tmp);
let re = result_map
.into_iter()
.map(|(k, v)| (k, Value::from(v)))
.collect();
resp.resp_metrics = re;
}
resp
}
Output::AffectedRows(_) => {
Self::error("Unexpected", "expected data result, but got affected rows")
}
};
resp
};
let result_type_string = result_type.to_string();
match response {
Ok(resp) => resp,
Err(err) => {
// Prometheus won't report error if querying nonexist label and metric
if err.status_code() == StatusCode::TableNotFound
|| err.status_code() == StatusCode::TableColumnNotFound
{
Self::success(PrometheusResponse::PromData(PromData {
result_type: result_type_string,
..Default::default()
}))
} else {
Self::error(err.status_code().to_string(), err.output_msg())
}
}
}
}
/// Convert [RecordBatches] to [PromData]
fn record_batches_to_data(
batches: RecordBatches,
metric_name: String,
result_type: ValueType,
) -> Result<PrometheusResponse> {
// infer semantic type of each column from schema.
// TODO(ruihang): wish there is a better way to do this.
let mut timestamp_column_index = None;
let mut tag_column_indices = Vec::new();
let mut first_field_column_index = None;
for (i, column) in batches.schema().column_schemas().iter().enumerate() {
match column.data_type {
ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => {
if timestamp_column_index.is_none() {
timestamp_column_index = Some(i);
}
}
ConcreteDataType::Float64(_) => {
if first_field_column_index.is_none() {
first_field_column_index = Some(i);
}
}
ConcreteDataType::String(_) => {
tag_column_indices.push(i);
}
_ => {}
}
}
let timestamp_column_index = timestamp_column_index.context(InternalSnafu {
err_msg: "no timestamp column found".to_string(),
})?;
let first_field_column_index = first_field_column_index.context(InternalSnafu {
err_msg: "no value column found".to_string(),
})?;
let metric_name = (METRIC_NAME.to_string(), metric_name);
let mut buffer = BTreeMap::<Vec<(String, String)>, Vec<(f64, String)>>::new();
for batch in batches.iter() {
// prepare things...
let tag_columns = tag_column_indices
.iter()
.map(|i| {
batch
.column(*i)
.as_any()
.downcast_ref::<StringVector>()
.unwrap()
})
.collect::<Vec<_>>();
let tag_names = tag_column_indices
.iter()
.map(|c| batches.schema().column_name_by_index(*c).to_string())
.collect::<Vec<_>>();
let timestamp_column = batch
.column(timestamp_column_index)
.as_any()
.downcast_ref::<TimestampMillisecondVector>()
.unwrap();
let field_column = batch
.column(first_field_column_index)
.as_any()
.downcast_ref::<Float64Vector>()
.unwrap();
// assemble rows
for row_index in 0..batch.num_rows() {
// retrieve tags
// TODO(ruihang): push table name `__metric__`
let mut tags = vec![metric_name.clone()];
for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) {
// TODO(ruihang): add test for NULL tag
if let Some(tag_value) = tag_column.get_data(row_index) {
tags.push((tag_name.to_string(), tag_value.to_string()));
}
}
// retrieve timestamp
let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into();
let timestamp = timestamp_millis as f64 / 1000.0;
// retrieve value
if let Some(v) = field_column.get_data(row_index) {
buffer
.entry(tags)
.or_default()
.push((timestamp, Into::<f64>::into(v).to_string()));
};
}
}
let result = buffer
.into_iter()
.map(|(tags, mut values)| {
let metric = tags.into_iter().collect();
match result_type {
ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries {
metric,
value: values.pop(),
..Default::default()
}),
ValueType::Matrix => Ok(PromSeries {
metric,
values,
..Default::default()
}),
}
})
.collect::<Result<Vec<_>>>()?;
let result_type_string = result_type.to_string();
let data = PrometheusResponse::PromData(PromData {
result_type: result_type_string,
result,
});
Ok(data)
}
}

View File

@@ -81,7 +81,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
// a local variable.
match output {
Ok(output) => match output {
Output::Stream(stream) => {
Output::Stream(stream, _) => {
let query_result = QueryResult {
schema: stream.schema(),
stream,

View File

@@ -73,7 +73,7 @@ fn output_to_query_response<'a>(
) -> PgWireResult<Response<'a>> {
match output {
Ok(Output::AffectedRows(rows)) => Ok(Response::Execution(Tag::new("OK").with_rows(rows))),
Ok(Output::Stream(record_stream)) => {
Ok(Output::Stream(record_stream, _)) => {
let schema = record_stream.schema();
recordbatches_to_query_response(record_stream, schema, field_format)
}

View File

@@ -93,7 +93,7 @@ def hello() -> vector[str]:
common_query::Output::RecordBatches(_) => {
unreachable!()
}
common_query::Output::Stream(s) => {
common_query::Output::Stream(s, _) => {
let batches = common_recordbatch::util::collect_batches(s).await.unwrap();
let expected = "\
+---------+

View File

@@ -321,18 +321,6 @@ impl TruncateTableRequest {
}
}
#[macro_export]
macro_rules! meter_insert_request {
($req: expr) => {
meter_macros::write_meter!(
$req.catalog_name.to_string(),
$req.schema_name.to_string(),
$req.table_name.to_string(),
$req
);
};
}
pub fn valid_table_option(key: &str) -> bool {
matches!(
key,

View File

@@ -21,6 +21,7 @@ use std::task::{Context, Poll};
use common_query::error as query_error;
use common_query::error::Result as QueryResult;
use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef};
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::error::Result as RecordBatchResult;
use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_telemetry::tracing::Span;
@@ -153,7 +154,7 @@ impl RecordBatchStream for StreamWithMetricWrapper {
self.stream.schema()
}
fn metrics(&self) -> Option<String> {
fn metrics(&self) -> Option<RecordBatchMetrics> {
self.stream.metrics()
}
}

View File

@@ -147,7 +147,7 @@ mod test {
)),
});
let output = query(instance, request).await;
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
@@ -384,7 +384,7 @@ CREATE TABLE {table_name} (
))),
});
let output = query(instance, request.clone()).await;
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
@@ -477,7 +477,7 @@ CREATE TABLE {table_name} (
assert!(matches!(output, Output::AffectedRows(6)));
let output = query(instance, request).await;
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
@@ -651,7 +651,7 @@ CREATE TABLE {table_name} (
)),
});
let output = query(instance, request.clone()).await;
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
@@ -693,7 +693,7 @@ CREATE TABLE {table_name} (
assert!(matches!(output, Output::AffectedRows(2)));
let output = query(instance, request).await;
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
@@ -791,7 +791,7 @@ CREATE TABLE {table_name} (
})),
});
let output = query(instance, request).await;
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -80,7 +80,7 @@ monitor1,host=host2 memory=1027";
)
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
@@ -109,7 +109,7 @@ monitor1,host=host2 memory=1027 1663840496400340001";
)
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -173,7 +173,7 @@ mod tests {
let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition
let output = query(instance, sql).await;
let Output::Stream(s) = output else {
let Output::Stream(s, _) = output else {
unreachable!()
};
let batches = common_recordbatch::util::collect_batches(s).await.unwrap();

View File

@@ -84,7 +84,7 @@ mod tests {
.remove(0)
.unwrap();
match output {
Output::Stream(stream) => {
Output::Stream(stream, _) => {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let pretty_print = recordbatches.pretty_print().unwrap();
let expected = vec![

View File

@@ -75,7 +75,7 @@ mod test {
)
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
@@ -97,7 +97,7 @@ mod test {
)
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
@@ -117,7 +117,7 @@ mod test {
.do_query("SELECT * FROM my_test_histo_sum", ctx.clone())
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
@@ -135,7 +135,7 @@ mod test {
.do_query("SELECT * FROM my_test_histo_count", ctx.clone())
.await;
let output = output.remove(0).unwrap();
let Output::Stream(stream) = output else {
let Output::Stream(stream, _) = output else {
unreachable!()
};
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();

View File

@@ -60,7 +60,7 @@ async fn test_create_database_and_insert_query(instance: Option<Box<dyn Rebuilda
let query_output = execute_sql(&instance, "select ts from test.demo order by ts limit 1").await;
match query_output {
Output::Stream(s) => {
Output::Stream(s, _) => {
let batches = util::collect(s).await.unwrap();
assert_eq!(1, batches[0].num_columns());
assert_eq!(

View File

@@ -68,7 +68,7 @@ async fn test_create_database_and_insert_query(instance: Arc<dyn MockInstance>)
let query_output = execute_sql(&instance, "select ts from test.demo order by ts limit 1").await;
match query_output {
Output::Stream(s) => {
Output::Stream(s, _) => {
let batches = util::collect(s).await.unwrap();
assert_eq!(1, batches[0].num_columns());
assert_eq!(
@@ -299,7 +299,7 @@ async fn test_issue477_same_table_name_in_different_databases(instance: Arc<dyn
async fn assert_query_result(instance: &Arc<Instance>, sql: &str, ts: i64, host: &str) {
let query_output = execute_sql(instance, sql).await;
match query_output {
Output::Stream(s) => {
Output::Stream(s, _) => {
let batches = util::collect(s).await.unwrap();
// let columns = batches[0].df_recordbatch.columns();
assert_eq!(2, batches[0].num_columns());
@@ -421,7 +421,7 @@ async fn test_execute_query(instance: Arc<dyn MockInstance>) {
let output = execute_sql(&instance, "select sum(number) from numbers limit 20").await;
match output {
Output::Stream(recordbatch) => {
Output::Stream(recordbatch, _) => {
let numbers = util::collect(recordbatch).await.unwrap();
assert_eq!(1, numbers[0].num_columns());
assert_eq!(numbers[0].column(0).len(), 1);

View File

@@ -330,7 +330,7 @@ pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str
};
let recordbatches = match output {
Output::Stream(stream) => util::collect_batches(stream).await.unwrap(),
Output::Stream(stream, _) => util::collect_batches(stream).await.unwrap(),
Output::RecordBatches(recordbatches) => recordbatches,
_ => unreachable!(),
};

View File

@@ -383,7 +383,7 @@ async fn insert_and_assert(db: &Database) {
let record_batches = match output {
Output::RecordBatches(record_batches) => record_batches,
Output::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(),
Output::Stream(stream, _) => RecordBatches::try_collect(stream).await.unwrap(),
Output::AffectedRows(_) => unreachable!(),
};
@@ -540,6 +540,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
error: None,
error_type: None,
warnings: None,
resp_metrics: Default::default(),
};
assert_eq!(instant_query_result, expected);
@@ -591,6 +592,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
error: None,
error_type: None,
warnings: None,
resp_metrics: Default::default(),
};
assert_eq!(range_query_result, expected);
@@ -621,6 +623,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) {
error: None,
error_type: None,
warnings: None,
resp_metrics: Default::default(),
};
assert_eq!(range_query_result, expected);
@@ -683,7 +686,7 @@ pub async fn test_grpc_timezone(store_type: StorageType) {
async fn to_batch(output: Output) -> String {
match output {
Output::RecordBatches(batch) => batch,
Output::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(),
Output::Stream(stream, _) => RecordBatches::try_collect(stream).await.unwrap(),
Output::AffectedRows(_) => unreachable!(),
}
.pretty_print()

View File

@@ -466,7 +466,7 @@ impl Database for GreptimeDB {
}) as _
} else {
let mut result = client.sql(&query).await;
if let Ok(Output::Stream(stream)) = result {
if let Ok(Output::Stream(stream, _)) = result {
match RecordBatches::try_collect(stream).await {
Ok(recordbatches) => result = Ok(Output::RecordBatches(recordbatches)),
Err(e) => {
@@ -577,7 +577,7 @@ impl Display for ResultDisplayer {
}
}
}
Output::Stream(_) => unreachable!(),
Output::Stream(_, _) => unreachable!(),
},
Err(e) => {
let status_code = e.status_code();