From bf5e1905cd7f4987a504d650a00ca6501cd5203c Mon Sep 17 00:00:00 2001 From: shuiyisong <113876041+shuiyisong@users.noreply.github.com> Date: Tue, 20 Feb 2024 11:25:18 +0800 Subject: [PATCH] refactor: bring metrics to http output (#3247) * refactor: bring metrics to http output * chore: remove unwrap * chore: make walk plan accumulate * chore: change field name and comment * chore: add metrics to http resp header * chore: move PrometheusJsonResponse to a separate file and impl IntoResponse * chore: put metrics in prometheus resp header too --- Cargo.lock | 16 +- Cargo.toml | 6 +- benchmarks/src/bin/nyc-taxi.rs | 2 +- src/client/Cargo.toml | 2 + src/client/src/database.rs | 2 +- src/client/src/region.rs | 9 +- src/cmd/src/cli/export.rs | 6 +- src/cmd/src/cli/repl.rs | 2 +- src/common/plugins/Cargo.toml | 9 + src/common/plugins/src/consts.rs | 19 ++ src/common/plugins/src/lib.rs | 20 ++ src/common/query/src/lib.rs | 19 +- src/common/recordbatch/src/adapter.rs | 11 +- src/common/recordbatch/src/lib.rs | 9 +- src/common/test-util/src/recordbatch.rs | 4 +- src/datanode/src/region_server.rs | 2 +- src/frontend/src/instance/prom_store.rs | 2 +- src/operator/src/statement/copy_table_to.rs | 2 +- src/operator/src/tests/partition_manager.rs | 51 ---- src/query/Cargo.toml | 1 + src/query/src/datafusion.rs | 9 +- src/query/src/dist_plan/merge_scan.rs | 29 +- src/query/src/tests.rs | 2 +- src/query/src/tests/query_engine_test.rs | 2 +- src/script/benches/py_benchmark.rs | 2 +- src/script/src/python/engine.rs | 10 +- src/script/src/python/ffi_types/copr.rs | 2 +- src/script/src/python/ffi_types/pair_tests.rs | 2 +- src/script/src/table.rs | 4 +- src/servers/Cargo.toml | 1 + src/servers/src/grpc/database.rs | 4 +- src/servers/src/grpc/flight.rs | 2 +- src/servers/src/grpc/flight/stream.rs | 8 +- src/servers/src/grpc/prom_query_gateway.rs | 5 +- src/servers/src/http.rs | 9 + src/servers/src/http/arrow_result.rs | 2 +- src/servers/src/http/csv_result.rs | 2 +- src/servers/src/http/greptime_result_v1.rs | 23 +- src/servers/src/http/handler.rs | 72 ++++- src/servers/src/http/header.rs | 1 + src/servers/src/http/influxdb_result_v1.rs | 2 +- src/servers/src/http/prometheus.rs | 241 +-------------- src/servers/src/http/prometheus_resp.rs | 289 ++++++++++++++++++ src/servers/src/mysql/writer.rs | 2 +- src/servers/src/postgres/handler.rs | 2 +- src/servers/tests/py_script/mod.rs | 2 +- src/table/src/requests.rs | 12 - src/table/src/table/scan.rs | 3 +- tests-integration/src/grpc.rs | 12 +- tests-integration/src/influxdb.rs | 4 +- tests-integration/src/instance.rs | 2 +- tests-integration/src/opentsdb.rs | 2 +- tests-integration/src/otlp.rs | 8 +- .../src/tests/instance_kafka_wal_test.rs | 2 +- tests-integration/src/tests/instance_test.rs | 6 +- tests-integration/src/tests/test_util.rs | 2 +- tests-integration/tests/grpc.rs | 7 +- tests/runner/src/env.rs | 4 +- 58 files changed, 592 insertions(+), 395 deletions(-) create mode 100644 src/common/plugins/Cargo.toml create mode 100644 src/common/plugins/src/consts.rs create mode 100644 src/common/plugins/src/lib.rs create mode 100644 src/servers/src/http/prometheus_resp.rs diff --git a/Cargo.lock b/Cargo.lock index 3cc6ead1d7..f49970b58f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1557,6 +1557,8 @@ dependencies = [ "prometheus", "prost 0.12.3", "rand", + "serde", + "serde_json", "session", "snafu", "substrait 0.17.1", @@ -1949,6 +1951,10 @@ dependencies = [ "uuid", ] +[[package]] +name = "common-plugins" +version = "0.6.0" + [[package]] name = "common-procedure" version = "0.6.0" @@ -4700,9 +4706,9 @@ checksum = "302d7ab3130588088d277783b1e2d2e10c9e9e4a16dd9050e6ec93fb3e7048f4" [[package]] name = "libgit2-sys" -version = "0.16.2+1.7.2" +version = "0.16.1+1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee4126d8b4ee5c9d9ea891dd875cfdc1e9d0950437179104b183d7d8a74d24e8" +checksum = "f2a2bb3680b094add03bb3732ec520ece34da31a8cd2d633d1389d0f0fb60d0c" dependencies = [ "cc", "libc", @@ -5178,7 +5184,7 @@ dependencies = [ [[package]] name = "meter-core" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d" dependencies = [ "anymap", "once_cell", @@ -5188,7 +5194,7 @@ dependencies = [ [[package]] name = "meter-macros" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=abbd357c1e193cd270ea65ee7652334a150b628f#abbd357c1e193cd270ea65ee7652334a150b628f" +source = "git+https://github.com/GreptimeTeam/greptime-meter.git?rev=80b72716dcde47ec4161478416a5c6c21343364d#80b72716dcde47ec4161478416a5c6c21343364d" dependencies = [ "meter-core", ] @@ -7242,6 +7248,7 @@ dependencies = [ "common-function", "common-macro", "common-meta", + "common-plugins", "common-query", "common-recordbatch", "common-telemetry", @@ -8898,6 +8905,7 @@ dependencies = [ "common-macro", "common-mem-prof", "common-meta", + "common-plugins", "common-query", "common-recordbatch", "common-runtime", diff --git a/Cargo.toml b/Cargo.toml index 7dfd452e32..9f4e54d137 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ members = [ "src/common/grpc-expr", "src/common/mem-prof", "src/common/meta", + "src/common/plugins", "src/common/procedure", "src/common/procedure-test", "src/common/query", @@ -100,7 +101,7 @@ greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", r humantime-serde = "1.1" itertools = "0.10" lazy_static = "1.4" -meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "abbd357c1e193cd270ea65ee7652334a150b628f" } +meter-core = { git = "https://github.com/GreptimeTeam/greptime-meter.git", rev = "80b72716dcde47ec4161478416a5c6c21343364d" } mockall = "0.11.4" moka = "0.12" num_cpus = "1.16" @@ -164,6 +165,7 @@ common-grpc-expr = { path = "src/common/grpc-expr" } common-macro = { path = "src/common/macro" } common-mem-prof = { path = "src/common/mem-prof" } common-meta = { path = "src/common/meta" } +common-plugins = { path = "src/common/plugins" } common-procedure = { path = "src/common/procedure" } common-procedure-test = { path = "src/common/procedure-test" } common-query = { path = "src/common/query" } @@ -201,7 +203,7 @@ table = { path = "src/table" } [workspace.dependencies.meter-macros] git = "https://github.com/GreptimeTeam/greptime-meter.git" -rev = "abbd357c1e193cd270ea65ee7652334a150b628f" +rev = "80b72716dcde47ec4161478416a5c6c21343364d" [profile.release] debug = 1 diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index f357ba5d88..53e44c9688 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -504,7 +504,7 @@ async fn do_query(num_iter: usize, db: &Database, table_name: &str) { let res = db.sql(&query).await.unwrap(); match res { Output::AffectedRows(_) | Output::RecordBatches(_) => (), - Output::Stream(stream) => { + Output::Stream(stream, _) => { stream.try_collect::>().await.unwrap(); } } diff --git a/src/client/Cargo.toml b/src/client/Cargo.toml index c8c827661f..ddd13b887d 100644 --- a/src/client/Cargo.toml +++ b/src/client/Cargo.toml @@ -34,6 +34,8 @@ parking_lot = "0.12" prometheus.workspace = true prost.workspace = true rand.workspace = true +serde.workspace = true +serde_json.workspace = true session.workspace = true snafu.workspace = true tokio.workspace = true diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 6d6ad9d609..fa665a4967 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -340,7 +340,7 @@ impl Database { output_ordering: None, metrics: Default::default(), }; - Ok(Output::Stream(Box::pin(record_batch_stream))) + Ok(Output::new_stream(Box::pin(record_batch_stream))) } } } diff --git a/src/client/src/region.rs b/src/client/src/region.rs index 7f1181ee59..a9a337808e 100644 --- a/src/client/src/region.rs +++ b/src/client/src/region.rs @@ -123,8 +123,8 @@ impl RegionRequester { .fail(); }; - let metrics_str = Arc::new(ArcSwapOption::from(None)); - let ref_str = metrics_str.clone(); + let metrics = Arc::new(ArcSwapOption::from(None)); + let metrics_ref = metrics.clone(); let tracing_context = TracingContext::from_current_span(); @@ -140,7 +140,8 @@ impl RegionRequester { match flight_message { FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch), FlightMessage::Metrics(s) => { - ref_str.swap(Some(Arc::new(s))); + let m = serde_json::from_str(&s).ok().map(Arc::new); + metrics_ref.swap(m); break; } _ => { @@ -159,7 +160,7 @@ impl RegionRequester { schema, stream, output_ordering: None, - metrics: metrics_str, + metrics, }; Ok(Box::pin(record_batch_stream)) } diff --git a/src/cmd/src/cli/export.rs b/src/cmd/src/cli/export.rs index 956460435d..cf43a9dec1 100644 --- a/src/cmd/src/cli/export.rs +++ b/src/cmd/src/cli/export.rs @@ -142,7 +142,7 @@ impl Export { .with_context(|_| RequestDatabaseSnafu { sql: "show databases".to_string(), })?; - let Output::Stream(stream) = result else { + let Output::Stream(stream, _) = result else { NotDataFromOutputSnafu.fail()? }; let record_batch = collect(stream) @@ -183,7 +183,7 @@ impl Export { .sql(&sql) .await .with_context(|_| RequestDatabaseSnafu { sql })?; - let Output::Stream(stream) = result else { + let Output::Stream(stream, _) = result else { NotDataFromOutputSnafu.fail()? }; let Some(record_batch) = collect(stream) @@ -235,7 +235,7 @@ impl Export { .sql(&sql) .await .with_context(|_| RequestDatabaseSnafu { sql })?; - let Output::Stream(stream) = result else { + let Output::Stream(stream, _) = result else { NotDataFromOutputSnafu.fail()? }; let record_batch = collect(stream) diff --git a/src/cmd/src/cli/repl.rs b/src/cmd/src/cli/repl.rs index ccc9d0a272..6eba505951 100644 --- a/src/cmd/src/cli/repl.rs +++ b/src/cmd/src/cli/repl.rs @@ -185,7 +185,7 @@ impl Repl { .context(RequestDatabaseSnafu { sql: &sql })?; let either = match output { - Output::Stream(s) => { + Output::Stream(s, _) => { let x = RecordBatches::try_collect(s) .await .context(CollectRecordBatchesSnafu)?; diff --git a/src/common/plugins/Cargo.toml b/src/common/plugins/Cargo.toml new file mode 100644 index 0000000000..8fa1069158 --- /dev/null +++ b/src/common/plugins/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "common-plugins" +version.workspace = true +edition.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] diff --git a/src/common/plugins/src/consts.rs b/src/common/plugins/src/consts.rs new file mode 100644 index 0000000000..f2150cc24c --- /dev/null +++ b/src/common/plugins/src/consts.rs @@ -0,0 +1,19 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// Greptime preserved metrics prefix +pub const GREPTIME_EXEC_PREFIX: &str = "greptime_exec_"; + +/// Execution cost metrics key +pub const GREPTIME_EXEC_COST: &str = "greptime_exec_cost"; diff --git a/src/common/plugins/src/lib.rs b/src/common/plugins/src/lib.rs new file mode 100644 index 0000000000..bd815d794d --- /dev/null +++ b/src/common/plugins/src/lib.rs @@ -0,0 +1,20 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// This crate is designed to be at the bottom of the depencey tree +/// to provide common and useful utils and consts to all plugin usage, +/// since `plugins` crate is at the top depending on crates like `frontend` and `datanode` +mod consts; + +pub use consts::{GREPTIME_EXEC_COST, GREPTIME_EXEC_PREFIX}; diff --git a/src/common/query/src/lib.rs b/src/common/query/src/lib.rs index 679fd53a98..4cb7282e53 100644 --- a/src/common/query/src/lib.rs +++ b/src/common/query/src/lib.rs @@ -13,10 +13,12 @@ // limitations under the License. use std::fmt::{Debug, Formatter}; +use std::sync::Arc; use api::greptime_proto::v1::add_column_location::LocationType; use api::greptime_proto::v1::AddColumnLocation as Location; use common_recordbatch::{RecordBatches, SendableRecordBatchStream}; +use physical_plan::PhysicalPlan; use serde::{Deserialize, Serialize}; pub mod columnar_value; @@ -32,7 +34,14 @@ use sqlparser_derive::{Visit, VisitMut}; pub enum Output { AffectedRows(usize), RecordBatches(RecordBatches), - Stream(SendableRecordBatchStream), + Stream(SendableRecordBatchStream, Option>), +} + +impl Output { + // helper function to build original `Output::Stream` + pub fn new_stream(stream: SendableRecordBatchStream) -> Self { + Output::Stream(stream, None) + } } impl Debug for Output { @@ -42,7 +51,13 @@ impl Debug for Output { Output::RecordBatches(recordbatches) => { write!(f, "Output::RecordBatches({recordbatches:?})") } - Output::Stream(_) => write!(f, "Output::Stream()"), + Output::Stream(_, df) => { + if df.is_some() { + write!(f, "Output::Stream(, Some)") + } else { + write!(f, "Output::Stream()") + } + } } } } diff --git a/src/common/recordbatch/src/adapter.rs b/src/common/recordbatch/src/adapter.rs index a2dfd4ebaa..d26296d449 100644 --- a/src/common/recordbatch/src/adapter.rs +++ b/src/common/recordbatch/src/adapter.rs @@ -182,7 +182,7 @@ pub struct RecordBatchStreamAdapter { enum Metrics { Unavailable, Unresolved(Arc), - Resolved(String), + Resolved(RecordBatchMetrics), } impl RecordBatchStreamAdapter { @@ -222,9 +222,9 @@ impl RecordBatchStream for RecordBatchStreamAdapter { self.schema.clone() } - fn metrics(&self) -> Option { + fn metrics(&self) -> Option { match &self.metrics_2 { - Metrics::Resolved(metrics) => Some(metrics.clone()), + Metrics::Resolved(metrics) => Some(*metrics), Metrics::Unavailable | Metrics::Unresolved(_) => None, } } @@ -254,8 +254,7 @@ impl Stream for RecordBatchStreamAdapter { let mut metrics_holder = RecordBatchMetrics::default(); collect_metrics(df_plan, &mut metrics_holder); if metrics_holder.elapsed_compute != 0 || metrics_holder.memory_usage != 0 { - self.metrics_2 = - Metrics::Resolved(serde_json::to_string(&metrics_holder).unwrap()); + self.metrics_2 = Metrics::Resolved(metrics_holder); } } Poll::Ready(None) @@ -285,7 +284,7 @@ fn collect_metrics(df_plan: &Arc, result: &mut RecordBatchMet /// [`RecordBatchMetrics`] carrys metrics value /// from datanode to frontend through gRPC -#[derive(serde::Serialize, serde::Deserialize, Default, Debug)] +#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone, Copy)] pub struct RecordBatchMetrics { // cpu consumption in nanoseconds pub elapsed_compute: usize, diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 6e87d3f6c2..93ba03a333 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -21,6 +21,7 @@ pub mod util; use std::pin::Pin; use std::sync::Arc; +use adapter::RecordBatchMetrics; use arc_swap::ArcSwapOption; use datafusion::physical_plan::memory::MemoryStream; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; @@ -42,7 +43,7 @@ pub trait RecordBatchStream: Stream> { None } - fn metrics(&self) -> Option { + fn metrics(&self) -> Option { None } } @@ -212,7 +213,7 @@ pub struct RecordBatchStreamWrapper { pub schema: SchemaRef, pub stream: S, pub output_ordering: Option>, - pub metrics: Arc>, + pub metrics: Arc>, } impl RecordBatchStreamWrapper { @@ -238,8 +239,8 @@ impl> + Unpin> RecordBatchStream self.output_ordering.as_deref() } - fn metrics(&self) -> Option { - self.metrics.load().as_ref().map(|s| s.as_ref().clone()) + fn metrics(&self) -> Option { + self.metrics.load().as_ref().map(|s| *s.as_ref()) } } diff --git a/src/common/test-util/src/recordbatch.rs b/src/common/test-util/src/recordbatch.rs index ad05965dc5..6a10df0633 100644 --- a/src/common/test-util/src/recordbatch.rs +++ b/src/common/test-util/src/recordbatch.rs @@ -28,7 +28,7 @@ pub async fn execute_and_check_output(db: &Database, sql: &str, expected: Expect assert_eq!(*x, y, "actual: \n{}", x) } (Output::RecordBatches(_), ExpectedOutput::QueryResult(x)) - | (Output::Stream(_), ExpectedOutput::QueryResult(x)) => { + | (Output::Stream(_, _), ExpectedOutput::QueryResult(x)) => { check_output_stream(output, x).await } _ => panic!(), @@ -37,7 +37,7 @@ pub async fn execute_and_check_output(db: &Database, sql: &str, expected: Expect pub async fn check_output_stream(output: Output, expected: &str) { let recordbatches = match output { - Output::Stream(stream) => util::collect_batches(stream).await.unwrap(), + Output::Stream(stream, _) => util::collect_batches(stream).await.unwrap(), Output::RecordBatches(recordbatches) => recordbatches, _ => unreachable!(), }; diff --git a/src/datanode/src/region_server.rs b/src/datanode/src/region_server.rs index ad5b6e13e4..773833408e 100644 --- a/src/datanode/src/region_server.rs +++ b/src/datanode/src/region_server.rs @@ -655,7 +655,7 @@ impl RegionServerInner { Output::AffectedRows(_) | Output::RecordBatches(_) => { UnsupportedOutputSnafu { expected: "stream" }.fail() } - Output::Stream(stream) => Ok(stream), + Output::Stream(stream, _) => Ok(stream), } } diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 0f08337850..7b4ac281a0 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -75,7 +75,7 @@ fn negotiate_response_type(accepted_response_types: &[i32]) -> ServerResult ServerResult { - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream) diff --git a/src/operator/src/statement/copy_table_to.rs b/src/operator/src/statement/copy_table_to.rs index f0498a88e2..58def6af54 100644 --- a/src/operator/src/statement/copy_table_to.rs +++ b/src/operator/src/statement/copy_table_to.rs @@ -135,7 +135,7 @@ impl StatementExecutor { .await .context(ExecLogicalPlanSnafu)?; let stream = match output { - Output::Stream(stream) => stream, + Output::Stream(stream, _) => stream, Output::RecordBatches(record_batches) => record_batches.as_stream(), _ => unreachable!(), }; diff --git a/src/operator/src/tests/partition_manager.rs b/src/operator/src/tests/partition_manager.rs index 859090a753..3b9d5ff87e 100644 --- a/src/operator/src/tests/partition_manager.rs +++ b/src/operator/src/tests/partition_manager.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::{BTreeMap, HashMap}; -use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use catalog::kvbackend::MetaKvBackend; @@ -29,10 +28,6 @@ use datafusion_expr::{lit, Operator}; use datatypes::prelude::ConcreteDataType; use datatypes::schema::{ColumnSchema, SchemaBuilder}; use meta_client::client::MetaClient; -use meter_core::collect::Collect; -use meter_core::data::{ReadRecord, WriteRecord}; -use meter_core::global::global_registry; -use meter_core::write_calc::WriteCalculator; use partition::columns::RangeColumnsPartitionRule; use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use partition::partition::{PartitionBound, PartitionDef}; @@ -40,8 +35,6 @@ use partition::range::RangePartitionRule; use partition::PartitionRuleRef; use store_api::storage::RegionNumber; use table::metadata::{TableInfo, TableInfoBuilder, TableMetaBuilder}; -use table::meter_insert_request; -use table::requests::InsertRequest; pub fn new_test_table_info( table_id: u32, @@ -421,47 +414,3 @@ async fn test_find_regions() { partition::error::Error::FindRegions { .. } )); } - -#[derive(Default)] -struct MockCollector { - pub write_sum: AtomicU32, -} - -impl Collect for MockCollector { - fn on_write(&self, record: WriteRecord) { - let _ = self - .write_sum - .fetch_add(record.byte_count, Ordering::Relaxed); - } - - fn on_read(&self, _record: ReadRecord) { - todo!() - } -} - -struct MockCalculator; - -impl WriteCalculator for MockCalculator { - fn calc_byte(&self, _value: &InsertRequest) -> u32 { - 1024 * 10 - } -} - -#[test] -#[ignore] -fn test_meter_insert_request() { - let collector = Arc::new(MockCollector::default()); - global_registry().set_collector(collector.clone()); - global_registry().register_calculator(Arc::new(MockCalculator)); - - let req = InsertRequest { - catalog_name: "greptime".to_string(), - schema_name: "public".to_string(), - table_name: "numbers".to_string(), - columns_values: Default::default(), - }; - meter_insert_request!(req); - - let re = collector.write_sum.load(Ordering::Relaxed); - assert_eq!(re, 1024 * 10); -} diff --git a/src/query/Cargo.toml b/src/query/Cargo.toml index a09eb7a9d2..8976a6967e 100644 --- a/src/query/Cargo.toml +++ b/src/query/Cargo.toml @@ -23,6 +23,7 @@ common-error.workspace = true common-function.workspace = true common-macro.workspace = true common-meta.workspace = true +common-plugins.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-telemetry.workspace = true diff --git a/src/query/src/datafusion.rs b/src/query/src/datafusion.rs index 121db1535b..458d2e8708 100644 --- a/src/query/src/datafusion.rs +++ b/src/query/src/datafusion.rs @@ -95,7 +95,10 @@ impl DatafusionQueryEngine { optimized_physical_plan }; - Ok(Output::Stream(self.execute_stream(&ctx, &physical_plan)?)) + Ok(Output::Stream( + self.execute_stream(&ctx, &physical_plan)?, + Some(physical_plan), + )) } #[tracing::instrument(skip_all)] @@ -121,7 +124,7 @@ impl DatafusionQueryEngine { .await?; let mut stream = match output { Output::RecordBatches(batches) => batches.as_stream(), - Output::Stream(stream) => stream, + Output::Stream(stream, _) => stream, _ => unreachable!(), }; @@ -601,7 +604,7 @@ mod tests { let output = engine.execute(plan, QueryContext::arc()).await.unwrap(); match output { - Output::Stream(recordbatch) => { + Output::Stream(recordbatch, _) => { let numbers = util::collect(recordbatch).await.unwrap(); assert_eq!(1, numbers.len()); assert_eq!(numbers[0].num_columns(), 1); diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index d4e4516d1b..d761923dea 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -22,15 +22,16 @@ use common_base::bytes::Bytes; use common_catalog::parse_catalog_and_schema_from_db_string; use common_error::ext::BoxedError; use common_meta::table_name::TableName; +use common_plugins::GREPTIME_EXEC_COST; use common_query::physical_plan::TaskContext; -use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics}; +use common_recordbatch::adapter::DfRecordBatchStreamAdapter; use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{ DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream, }; use common_telemetry::tracing_context::TracingContext; use datafusion::physical_plan::metrics::{ - Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time, + Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time, }; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning}; use datafusion_common::{Result, Statistics}; @@ -39,6 +40,7 @@ use datafusion_physical_expr::PhysicalSortExpr; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader}; +use meter_core::data::ReadItem; use meter_macros::read_meter; use snafu::ResultExt; use store_api::storage::RegionId; @@ -212,18 +214,17 @@ impl MergeScanExec { // reset poll timer poll_timer = Instant::now(); } - if let Some(metrics) = stream - .metrics() - .and_then(|m| serde_json::from_str::(&m).ok()) - { + if let Some(metrics) = stream.metrics() { let (c, s) = parse_catalog_and_schema_from_db_string(&dbname); - read_meter!( + let value = read_meter!( c, s, - metrics.elapsed_compute as u64, - metrics.memory_usage as u64, - 0 + ReadItem { + cpu_time: metrics.elapsed_compute as u64, + table_scan: metrics.memory_usage as u64 + } ); + metric.record_greptime_exec_cost(value as usize); } METRIC_MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64()); @@ -327,6 +328,9 @@ struct MergeScanMetric { finish_time: Time, /// Count of rows fetched from remote output_rows: Count, + + /// Gauge for greptime plan execution cost metrics for output + greptime_exec_cost: Gauge, } impl MergeScanMetric { @@ -336,6 +340,7 @@ impl MergeScanMetric { first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1), finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1), output_rows: MetricBuilder::new(metric).output_rows(1), + greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_COST, 1), } } @@ -354,4 +359,8 @@ impl MergeScanMetric { pub fn record_output_batch_rows(&self, num_rows: usize) { self.output_rows.add(num_rows); } + + pub fn record_greptime_exec_cost(&self, metrics: usize) { + self.greptime_exec_cost.add(metrics); + } } diff --git a/src/query/src/tests.rs b/src/query/src/tests.rs index 8496a648b6..ebf135d06e 100644 --- a/src/query/src/tests.rs +++ b/src/query/src/tests.rs @@ -43,7 +43,7 @@ async fn exec_selection(engine: QueryEngineRef, sql: &str) -> Vec { .plan(stmt, query_ctx.clone()) .await .unwrap(); - let Output::Stream(stream) = engine.execute(plan, query_ctx).await.unwrap() else { + let Output::Stream(stream, _) = engine.execute(plan, query_ctx).await.unwrap() else { unreachable!() }; util::collect(stream).await.unwrap() diff --git a/src/query/src/tests/query_engine_test.rs b/src/query/src/tests/query_engine_test.rs index c7e8246945..067747f263 100644 --- a/src/query/src/tests/query_engine_test.rs +++ b/src/query/src/tests/query_engine_test.rs @@ -80,7 +80,7 @@ async fn test_datafusion_query_engine() -> Result<()> { let output = engine.execute(plan, QueryContext::arc()).await?; let recordbatch = match output { - Output::Stream(recordbatch) => recordbatch, + Output::Stream(recordbatch, _) => recordbatch, _ => unreachable!(), }; diff --git a/src/script/benches/py_benchmark.rs b/src/script/benches/py_benchmark.rs index 27d8ef07ce..a4ead66f16 100644 --- a/src/script/benches/py_benchmark.rs +++ b/src/script/benches/py_benchmark.rs @@ -69,7 +69,7 @@ async fn run_compiled(script: &PyScript) { .await .unwrap(); let _res = match output { - Output::Stream(s) => common_recordbatch::util::collect_batches(s).await.unwrap(), + Output::Stream(s, _) => common_recordbatch::util::collect_batches(s).await.unwrap(), Output::RecordBatches(rbs) => rbs, _ => unreachable!(), }; diff --git a/src/script/src/python/engine.rs b/src/script/src/python/engine.rs index 70ff108271..c2e8eccb45 100644 --- a/src/script/src/python/engine.rs +++ b/src/script/src/python/engine.rs @@ -312,7 +312,7 @@ impl Script for PyScript { .context(DatabaseQuerySnafu)?; let copr = self.copr.clone(); match res { - Output::Stream(stream) => Ok(Output::Stream(Box::pin(CoprStream::try_new( + Output::Stream(stream, _) => Ok(Output::new_stream(Box::pin(CoprStream::try_new( stream, copr, params, ctx, )?))), _ => unreachable!(), @@ -411,7 +411,7 @@ def test(number) -> vector[u32]: .await .unwrap(); let res = common_recordbatch::util::collect_batches(match output { - Output::Stream(s) => s, + Output::Stream(s, _) => s, _ => unreachable!(), }) .await @@ -472,7 +472,7 @@ def test(number) -> vector[u32]: .await .unwrap(); let res = common_recordbatch::util::collect_batches(match _output { - Output::Stream(s) => s, + Output::Stream(s, _) => s, _ => todo!(), }) .await @@ -504,7 +504,7 @@ def test(a, b, c) -> vector[f64]: .await .unwrap(); match output { - Output::Stream(stream) => { + Output::Stream(stream, _) => { let numbers = util::collect(stream).await.unwrap(); assert_eq!(1, numbers.len()); @@ -542,7 +542,7 @@ def test(a) -> vector[i64]: .await .unwrap(); match output { - Output::Stream(stream) => { + Output::Stream(stream, _) => { let numbers = util::collect(stream).await.unwrap(); assert_eq!(1, numbers.len()); diff --git a/src/script/src/python/ffi_types/copr.rs b/src/script/src/python/ffi_types/copr.rs index cb90eca8a7..a93b22c5bb 100644 --- a/src/script/src/python/ffi_types/copr.rs +++ b/src/script/src/python/ffi_types/copr.rs @@ -403,7 +403,7 @@ impl PyQueryEngine { Ok(Either::AffectedRows(cnt)) } Ok(common_query::Output::RecordBatches(rbs)) => Ok(Either::Rb(rbs)), - Ok(common_query::Output::Stream(s)) => Ok(Either::Rb( + Ok(common_query::Output::Stream(s, _)) => Ok(Either::Rb( common_recordbatch::util::collect_batches(s).await.unwrap(), )), Err(e) => Err(e), diff --git a/src/script/src/python/ffi_types/pair_tests.rs b/src/script/src/python/ffi_types/pair_tests.rs index 6ae4ca5df1..ec01355357 100644 --- a/src/script/src/python/ffi_types/pair_tests.rs +++ b/src/script/src/python/ffi_types/pair_tests.rs @@ -88,7 +88,7 @@ async fn integrated_py_copr_test() { .await .unwrap(); let res = match output { - Output::Stream(s) => common_recordbatch::util::collect_batches(s).await.unwrap(), + Output::Stream(s, _) => common_recordbatch::util::collect_batches(s).await.unwrap(), Output::RecordBatches(rbs) => rbs, _ => unreachable!(), }; diff --git a/src/script/src/table.rs b/src/script/src/table.rs index 6ecbaa4e33..67e561bc67 100644 --- a/src/script/src/table.rs +++ b/src/script/src/table.rs @@ -231,7 +231,7 @@ impl ScriptsTable { .await .context(ExecuteInternalStatementSnafu)?; let stream = match output { - Output::Stream(stream) => stream, + Output::Stream(stream, _) => stream, Output::RecordBatches(record_batches) => record_batches.as_stream(), _ => unreachable!(), }; @@ -286,7 +286,7 @@ impl ScriptsTable { .await .context(ExecuteInternalStatementSnafu)?; let stream = match output { - Output::Stream(stream) => stream, + Output::Stream(stream, _) => stream, Output::RecordBatches(record_batches) => record_batches.as_stream(), _ => unreachable!(), }; diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 9a5ded1274..521c3023b6 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -33,6 +33,7 @@ common-grpc-expr.workspace = true common-macro.workspace = true common-mem-prof = { workspace = true, optional = true } common-meta.workspace = true +common-plugins.workspace = true common-query.workspace = true common-recordbatch.workspace = true common-runtime.workspace = true diff --git a/src/servers/src/grpc/database.rs b/src/servers/src/grpc/database.rs index 5be5d76254..133f3db6f9 100644 --- a/src/servers/src/grpc/database.rs +++ b/src/servers/src/grpc/database.rs @@ -52,7 +52,7 @@ impl GreptimeDatabase for DatabaseService { }), response: Some(RawResponse::AffectedRows(AffectedRows { value: rows as _ })), }, - Output::Stream(_) | Output::RecordBatches(_) => { + Output::Stream(_, _) | Output::RecordBatches(_) => { return Err(Status::unimplemented("GreptimeDatabase::Handle for query")); } }; @@ -71,7 +71,7 @@ impl GreptimeDatabase for DatabaseService { let output = self.handler.handle_request(request).await?; match output { Output::AffectedRows(rows) => affected_rows += rows, - Output::Stream(_) | Output::RecordBatches(_) => { + Output::Stream(_, _) | Output::RecordBatches(_) => { return Err(Status::unimplemented( "GreptimeDatabase::HandleRequests for query", )); diff --git a/src/servers/src/grpc/flight.rs b/src/servers/src/grpc/flight.rs index dd576ee6c7..63c06d5eed 100644 --- a/src/servers/src/grpc/flight.rs +++ b/src/servers/src/grpc/flight.rs @@ -165,7 +165,7 @@ fn to_flight_data_stream( tracing_context: TracingContext, ) -> TonicStream { match output { - Output::Stream(stream) => { + Output::Stream(stream, _) => { let stream = FlightRecordBatchStream::new(stream, tracing_context); Box::pin(stream) as _ } diff --git a/src/servers/src/grpc/flight/stream.rs b/src/servers/src/grpc/flight/stream.rs index c2d066a73b..25813f6b0e 100644 --- a/src/servers/src/grpc/flight/stream.rs +++ b/src/servers/src/grpc/flight/stream.rs @@ -84,9 +84,11 @@ impl FlightRecordBatchStream { } } // make last package to pass metrics - if let Some(m) = recordbatches.metrics() { - let metrics = FlightMessage::Metrics(m); - let _ = tx.send(Ok(metrics)).await; + if let Some(metrics_str) = recordbatches + .metrics() + .and_then(|m| serde_json::to_string(&m).ok()) + { + let _ = tx.send(Ok(FlightMessage::Metrics(metrics_str))).await; } } } diff --git a/src/servers/src/grpc/prom_query_gateway.rs b/src/servers/src/grpc/prom_query_gateway.rs index e91c1316db..e7e20c6533 100644 --- a/src/servers/src/grpc/prom_query_gateway.rs +++ b/src/servers/src/grpc/prom_query_gateway.rs @@ -126,7 +126,6 @@ impl PrometheusGatewayService { err.status_code().to_string(), err.output_msg(), ) - .0 } }; // range query only returns matrix @@ -134,8 +133,6 @@ impl PrometheusGatewayService { result_type = ValueType::Matrix; }; - PrometheusJsonResponse::from_query_result(result, metric_name, result_type) - .await - .0 + PrometheusJsonResponse::from_query_result(result, metric_name, result_type).await } } diff --git a/src/servers/src/http.rs b/src/servers/src/http.rs index 8a58d04548..3a5a9af9f4 100644 --- a/src/servers/src/http.rs +++ b/src/servers/src/http.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::fmt::Display; use std::net::SocketAddr; use std::sync::Mutex as StdMutex; @@ -82,6 +83,7 @@ pub mod otlp; pub mod pprof; pub mod prom_store; pub mod prometheus; +mod prometheus_resp; pub mod script; pub mod arrow_result; @@ -182,6 +184,11 @@ impl From for OutputSchema { pub struct HttpRecordsOutput { schema: OutputSchema, rows: Vec>, + + // plan level execution metrics + #[serde(skip_serializing_if = "HashMap::is_empty")] + #[serde(default)] + metrics: HashMap, } impl HttpRecordsOutput { @@ -211,6 +218,7 @@ impl HttpRecordsOutput { Ok(HttpRecordsOutput { schema: OutputSchema::from(schema), rows: vec![], + metrics: Default::default(), }) } else { let mut rows = @@ -231,6 +239,7 @@ impl HttpRecordsOutput { Ok(HttpRecordsOutput { schema: OutputSchema::from(schema), rows, + metrics: Default::default(), }) } } diff --git a/src/servers/src/http/arrow_result.rs b/src/servers/src/http/arrow_result.rs index 78d22b20c5..b47912b161 100644 --- a/src/servers/src/http/arrow_result.rs +++ b/src/servers/src/http/arrow_result.rs @@ -89,7 +89,7 @@ impl ArrowResponse { } } - Output::Stream(recordbatches) => { + Output::Stream(recordbatches, _) => { let schema = recordbatches.schema(); match write_arrow_bytes(recordbatches, schema.arrow_schema()).await { Ok(payload) => HttpResponse::Arrow(ArrowResponse { diff --git a/src/servers/src/http/csv_result.rs b/src/servers/src/http/csv_result.rs index 28b4c3b44f..30c5d4a026 100644 --- a/src/servers/src/http/csv_result.rs +++ b/src/servers/src/http/csv_result.rs @@ -37,7 +37,7 @@ impl CsvResponse { pub async fn from_output(outputs: Vec>) -> HttpResponse { match handler::from_output(ResponseFormat::Csv, outputs).await { Err(err) => HttpResponse::Error(err), - Ok(output) => { + Ok((output, _)) => { if output.len() > 1 { HttpResponse::Error(ErrorResponse::from_error_message( ResponseFormat::Csv, diff --git a/src/servers/src/http/greptime_result_v1.rs b/src/servers/src/http/greptime_result_v1.rs index 53e16948b7..596f1bcfdd 100644 --- a/src/servers/src/http/greptime_result_v1.rs +++ b/src/servers/src/http/greptime_result_v1.rs @@ -12,13 +12,17 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use axum::response::{IntoResponse, Response}; use axum::Json; use common_query::Output; use reqwest::header::HeaderValue; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use serde_json::Value; +use super::header::GREPTIME_DB_HEADER_METRICS; use crate::http::header::{GREPTIME_DB_HEADER_EXECUTION_TIME, GREPTIME_DB_HEADER_FORMAT}; use crate::http::{handler, GreptimeQueryOutput, HttpResponse, ResponseFormat}; @@ -27,14 +31,20 @@ pub struct GreptimedbV1Response { #[serde(skip_serializing_if = "Vec::is_empty", default)] pub(crate) output: Vec, pub(crate) execution_time_ms: u64, + + // placeholder for header value + #[serde(skip)] + #[serde(default)] + pub(crate) resp_metrics: HashMap, } impl GreptimedbV1Response { pub async fn from_output(outputs: Vec>) -> HttpResponse { match handler::from_output(ResponseFormat::GreptimedbV1, outputs).await { - Ok(output) => HttpResponse::GreptimedbV1(Self { + Ok((output, resp_metrics)) => HttpResponse::GreptimedbV1(Self { output, execution_time_ms: 0, + resp_metrics, }), Err(err) => HttpResponse::Error(err), } @@ -57,7 +67,14 @@ impl GreptimedbV1Response { impl IntoResponse for GreptimedbV1Response { fn into_response(self) -> Response { let execution_time = self.execution_time_ms; + let metrics = if self.resp_metrics.is_empty() { + None + } else { + serde_json::to_string(&self.resp_metrics).ok() + }; + let mut resp = Json(self).into_response(); + resp.headers_mut().insert( GREPTIME_DB_HEADER_FORMAT, HeaderValue::from_static("greptimedb_v1"), @@ -66,6 +83,10 @@ impl IntoResponse for GreptimedbV1Response { GREPTIME_DB_HEADER_EXECUTION_TIME, HeaderValue::from(execution_time), ); + if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) { + resp.headers_mut().insert(GREPTIME_DB_HEADER_METRICS, m); + } + resp } } diff --git a/src/servers/src/http/handler.rs b/src/servers/src/http/handler.rs index a1f5baab49..edac1e98fb 100644 --- a/src/servers/src/http/handler.rs +++ b/src/servers/src/http/handler.rs @@ -14,6 +14,7 @@ use std::collections::HashMap; use std::env; +use std::sync::Arc; use std::time::Instant; use aide::transform::TransformOperation; @@ -22,11 +23,15 @@ use axum::response::{IntoResponse, Response}; use axum::{Extension, Form}; use common_error::ext::ErrorExt; use common_error::status_code::StatusCode; +use common_plugins::GREPTIME_EXEC_PREFIX; +use common_query::physical_plan::PhysicalPlan; use common_query::Output; use common_recordbatch::util; +use datafusion::physical_plan::metrics::MetricValue; use query::parser::PromQuery; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use serde_json::Value; use session::context::QueryContextRef; use crate::http::arrow_result::ArrowResponse; @@ -125,23 +130,23 @@ pub async fn sql( pub async fn from_output( ty: ResponseFormat, outputs: Vec>, -) -> Result, ErrorResponse> { +) -> Result<(Vec, HashMap), ErrorResponse> { // TODO(sunng87): this api response structure cannot represent error well. // It hides successful execution results from error response let mut results = Vec::with_capacity(outputs.len()); + let mut merge_map = HashMap::new(); + for out in outputs { match out { Ok(Output::AffectedRows(rows)) => { results.push(GreptimeQueryOutput::AffectedRows(rows)); } - Ok(Output::Stream(stream)) => { + Ok(Output::Stream(stream, physical_plan)) => { let schema = stream.schema().clone(); // TODO(sunng87): streaming response - match util::collect(stream).await { + let mut http_record_output = match util::collect(stream).await { Ok(rows) => match HttpRecordsOutput::try_new(schema, rows) { - Ok(rows) => { - results.push(GreptimeQueryOutput::Records(rows)); - } + Ok(rows) => rows, Err(err) => { return Err(ErrorResponse::from_error(ty, err)); } @@ -149,7 +154,19 @@ pub async fn from_output( Err(err) => { return Err(ErrorResponse::from_error(ty, err)); } + }; + if let Some(physical_plan) = physical_plan { + let mut result_map = HashMap::new(); + + let mut tmp = vec![&mut merge_map, &mut result_map]; + collect_plan_metrics(physical_plan, &mut tmp); + let re = result_map + .into_iter() + .map(|(k, v)| (k, Value::from(v))) + .collect(); + http_record_output.metrics = re; } + results.push(GreptimeQueryOutput::Records(http_record_output)) } Ok(Output::RecordBatches(rbs)) => { match HttpRecordsOutput::try_new(rbs.schema(), rbs.take()) { @@ -167,7 +184,48 @@ pub async fn from_output( } } - Ok(results) + let merge_map = merge_map + .into_iter() + .map(|(k, v)| (k, Value::from(v))) + .collect(); + + Ok((results, merge_map)) +} + +fn collect_into_maps(name: &str, value: u64, maps: &mut [&mut HashMap]) { + if name.starts_with(GREPTIME_EXEC_PREFIX) && value > 0 { + maps.iter_mut().for_each(|map| { + map.entry(name.to_string()) + .and_modify(|v| *v += value) + .or_insert(value); + }); + } +} + +pub fn collect_plan_metrics(plan: Arc, maps: &mut [&mut HashMap]) { + if let Some(m) = plan.metrics() { + m.iter().for_each(|m| match m.value() { + MetricValue::Count { name, count } => { + collect_into_maps(name, count.value() as u64, maps); + } + MetricValue::Gauge { name, gauge } => { + collect_into_maps(name, gauge.value() as u64, maps); + } + MetricValue::Time { name, time } => { + if name.starts_with(GREPTIME_EXEC_PREFIX) { + // override + maps.iter_mut().for_each(|map| { + map.insert(name.to_string(), time.value() as u64); + }); + } + } + _ => {} + }); + } + + for c in plan.children() { + collect_plan_metrics(c, maps); + } } #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] diff --git a/src/servers/src/http/header.rs b/src/servers/src/http/header.rs index 7be168d545..aa0970dbdb 100644 --- a/src/servers/src/http/header.rs +++ b/src/servers/src/http/header.rs @@ -16,6 +16,7 @@ use headers::{Header, HeaderName, HeaderValue}; pub const GREPTIME_DB_HEADER_FORMAT: &str = "x-greptime-format"; pub const GREPTIME_DB_HEADER_EXECUTION_TIME: &str = "x-greptime-execution-time"; +pub const GREPTIME_DB_HEADER_METRICS: &str = "x-greptime-metrics"; /// Header key of `db-name`. Example format of the header value is `greptime-public`. pub static GREPTIME_DB_HEADER_NAME: HeaderName = HeaderName::from_static("x-greptime-db-name"); diff --git a/src/servers/src/http/influxdb_result_v1.rs b/src/servers/src/http/influxdb_result_v1.rs index aac76d253d..a4e8206058 100644 --- a/src/servers/src/http/influxdb_result_v1.rs +++ b/src/servers/src/http/influxdb_result_v1.rs @@ -159,7 +159,7 @@ impl InfluxdbV1Response { series: vec![], }); } - Ok(Output::Stream(stream)) => { + Ok(Output::Stream(stream, _)) => { // TODO(sunng87): streaming response match util::collect(stream).await { Ok(rows) => match InfluxdbRecordsOutput::try_from((epoch, rows)) { diff --git a/src/servers/src/http/prometheus.rs b/src/servers/src/http/prometheus.rs index ee8939744f..246b71f0f0 100644 --- a/src/servers/src/http/prometheus.rs +++ b/src/servers/src/http/prometheus.rs @@ -13,10 +13,10 @@ // limitations under the License. //! prom supply the prometheus HTTP API Server compliance -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{HashMap, HashSet}; use axum::extract::{Path, Query, State}; -use axum::{Extension, Form, Json}; +use axum::{Extension, Form}; use catalog::CatalogManagerRef; use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_catalog::parse_catalog_and_schema_from_db_string; @@ -28,7 +28,7 @@ use common_recordbatch::RecordBatches; use common_time::util::{current_time_rfc3339, yesterday_rfc3339}; use datatypes::prelude::ConcreteDataType; use datatypes::scalars::ScalarVector; -use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; +use datatypes::vectors::{Float64Vector, StringVector}; use promql_parser::label::METRIC_NAME; use promql_parser::parser::{ AggregateExpr, BinaryExpr, Call, Expr as PromqlExpr, MatrixSelector, ParenExpr, SubqueryExpr, @@ -39,10 +39,11 @@ use schemars::JsonSchema; use serde::de::{self, MapAccess, Visitor}; use serde::{Deserialize, Serialize}; use session::context::QueryContextRef; -use snafu::{Location, OptionExt, ResultExt}; +use snafu::{Location, ResultExt}; +pub use super::prometheus_resp::PrometheusJsonResponse; use crate::error::{ - CollectRecordbatchSnafu, Error, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedResultSnafu, + CollectRecordbatchSnafu, Error, InvalidQuerySnafu, Result, UnexpectedResultSnafu, }; use crate::prom_store::METRIC_NAME_LABEL; use crate::prometheus_handler::PrometheusHandlerRef; @@ -80,218 +81,6 @@ impl Default for PrometheusResponse { } } -#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] -pub struct PrometheusJsonResponse { - pub status: String, - pub data: PrometheusResponse, - #[serde(skip_serializing_if = "Option::is_none")] - pub error: Option, - #[serde(skip_serializing_if = "Option::is_none")] - #[serde(rename = "errorType")] - pub error_type: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub warnings: Option>, -} - -impl PrometheusJsonResponse { - pub fn error(error_type: S1, reason: S2) -> Json - where - S1: Into, - S2: Into, - { - Json(PrometheusJsonResponse { - status: "error".to_string(), - data: PrometheusResponse::default(), - error: Some(reason.into()), - error_type: Some(error_type.into()), - warnings: None, - }) - } - - pub fn success(data: PrometheusResponse) -> Json { - Json(PrometheusJsonResponse { - status: "success".to_string(), - data, - error: None, - error_type: None, - warnings: None, - }) - } - - /// Convert from `Result` - pub async fn from_query_result( - result: Result, - metric_name: String, - result_type: ValueType, - ) -> Json { - let response: Result> = try { - let json = match result? { - Output::RecordBatches(batches) => Self::success(Self::record_batches_to_data( - batches, - metric_name, - result_type, - )?), - Output::Stream(stream) => { - let record_batches = RecordBatches::try_collect(stream) - .await - .context(CollectRecordbatchSnafu)?; - Self::success(Self::record_batches_to_data( - record_batches, - metric_name, - result_type, - )?) - } - Output::AffectedRows(_) => { - Self::error("Unexpected", "expected data result, but got affected rows") - } - }; - - json - }; - - let result_type_string = result_type.to_string(); - - match response { - Ok(resp) => resp, - Err(err) => { - // Prometheus won't report error if querying nonexist label and metric - if err.status_code() == StatusCode::TableNotFound - || err.status_code() == StatusCode::TableColumnNotFound - { - Self::success(PrometheusResponse::PromData(PromData { - result_type: result_type_string, - ..Default::default() - })) - } else { - Self::error(err.status_code().to_string(), err.output_msg()) - } - } - } - } - - /// Convert [RecordBatches] to [PromData] - fn record_batches_to_data( - batches: RecordBatches, - metric_name: String, - result_type: ValueType, - ) -> Result { - // infer semantic type of each column from schema. - // TODO(ruihang): wish there is a better way to do this. - let mut timestamp_column_index = None; - let mut tag_column_indices = Vec::new(); - let mut first_field_column_index = None; - - for (i, column) in batches.schema().column_schemas().iter().enumerate() { - match column.data_type { - ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => { - if timestamp_column_index.is_none() { - timestamp_column_index = Some(i); - } - } - ConcreteDataType::Float64(_) => { - if first_field_column_index.is_none() { - first_field_column_index = Some(i); - } - } - ConcreteDataType::String(_) => { - tag_column_indices.push(i); - } - _ => {} - } - } - - let timestamp_column_index = timestamp_column_index.context(InternalSnafu { - err_msg: "no timestamp column found".to_string(), - })?; - let first_field_column_index = first_field_column_index.context(InternalSnafu { - err_msg: "no value column found".to_string(), - })?; - - let metric_name = (METRIC_NAME.to_string(), metric_name); - let mut buffer = BTreeMap::, Vec<(f64, String)>>::new(); - - for batch in batches.iter() { - // prepare things... - let tag_columns = tag_column_indices - .iter() - .map(|i| { - batch - .column(*i) - .as_any() - .downcast_ref::() - .unwrap() - }) - .collect::>(); - let tag_names = tag_column_indices - .iter() - .map(|c| batches.schema().column_name_by_index(*c).to_string()) - .collect::>(); - let timestamp_column = batch - .column(timestamp_column_index) - .as_any() - .downcast_ref::() - .unwrap(); - let field_column = batch - .column(first_field_column_index) - .as_any() - .downcast_ref::() - .unwrap(); - - // assemble rows - for row_index in 0..batch.num_rows() { - // retrieve tags - // TODO(ruihang): push table name `__metric__` - let mut tags = vec![metric_name.clone()]; - for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) { - // TODO(ruihang): add test for NULL tag - if let Some(tag_value) = tag_column.get_data(row_index) { - tags.push((tag_name.to_string(), tag_value.to_string())); - } - } - - // retrieve timestamp - let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into(); - let timestamp = timestamp_millis as f64 / 1000.0; - - // retrieve value - if let Some(v) = field_column.get_data(row_index) { - buffer - .entry(tags) - .or_default() - .push((timestamp, Into::::into(v).to_string())); - }; - } - } - - let result = buffer - .into_iter() - .map(|(tags, mut values)| { - let metric = tags.into_iter().collect(); - match result_type { - ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries { - metric, - value: values.pop(), - ..Default::default() - }), - ValueType::Matrix => Ok(PromSeries { - metric, - values, - ..Default::default() - }), - } - }) - .collect::>>()?; - - let result_type_string = result_type.to_string(); - let data = PrometheusResponse::PromData(PromData { - result_type: result_type_string, - result, - }); - - Ok(data) - } -} - #[derive(Debug, Default, Serialize, Deserialize, JsonSchema)] pub struct FormatQuery { query: Option, @@ -303,7 +92,7 @@ pub async fn format_query( Query(params): Query, Extension(_query_ctx): Extension, Form(form_params): Form, -) -> Json { +) -> PrometheusJsonResponse { let _timer = crate::metrics::METRIC_HTTP_PROMQL_FORMAT_QUERY_ELAPSED.start_timer(); let query = params.query.or(form_params.query).unwrap_or_default(); @@ -333,7 +122,7 @@ pub async fn instant_query( Query(params): Query, Extension(query_ctx): Extension, Form(form_params): Form, -) -> Json { +) -> PrometheusJsonResponse { let _timer = crate::metrics::METRIC_HTTP_PROMQL_INSTANT_QUERY_ELAPSED.start_timer(); // Extract time from query string, or use current server time if not specified. let time = params @@ -373,7 +162,7 @@ pub async fn range_query( Query(params): Query, Extension(query_ctx): Extension, Form(form_params): Form, -) -> Json { +) -> PrometheusJsonResponse { let _timer = crate::metrics::METRIC_HTTP_PROMQL_RANGE_QUERY_ELAPSED.start_timer(); let prom_query = PromQuery { query: params.query.or(form_params.query).unwrap_or_default(), @@ -442,7 +231,7 @@ pub async fn labels_query( Query(params): Query, Extension(query_ctx): Extension, Form(form_params): Form, -) -> Json { +) -> PrometheusJsonResponse { let _timer = crate::metrics::METRIC_HTTP_PROMQL_LABEL_QUERY_ELAPSED.start_timer(); let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); @@ -541,7 +330,7 @@ async fn retrieve_series_from_query_result( record_batches_to_series(batches, series, table_name)?; Ok(()) } - Output::Stream(stream) => { + Output::Stream(stream, _) => { let batches = RecordBatches::try_collect(stream) .await .context(CollectRecordbatchSnafu)?; @@ -565,7 +354,7 @@ async fn retrieve_labels_name_from_query_result( record_batches_to_labels_name(batches, labels)?; Ok(()) } - Output::Stream(stream) => { + Output::Stream(stream, _) => { let batches = RecordBatches::try_collect(stream) .await .context(CollectRecordbatchSnafu)?; @@ -708,7 +497,7 @@ pub async fn label_values_query( Path(label_name): Path, Extension(query_ctx): Extension, Query(params): Query, -) -> Json { +) -> PrometheusJsonResponse { let _timer = crate::metrics::METRIC_HTTP_PROMQL_LABEL_VALUE_QUERY_ELAPSED.start_timer(); let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); @@ -771,7 +560,7 @@ async fn retrieve_label_values( Output::RecordBatches(batches) => { retrieve_label_values_from_record_batch(batches, label_name, labels_values).await } - Output::Stream(stream) => { + Output::Stream(stream, _) => { let batches = RecordBatches::try_collect(stream) .await .context(CollectRecordbatchSnafu)?; @@ -834,7 +623,7 @@ pub async fn series_query( Query(params): Query, Extension(query_ctx): Extension, Form(form_params): Form, -) -> Json { +) -> PrometheusJsonResponse { let _timer = crate::metrics::METRIC_HTTP_PROMQL_SERIES_QUERY_ELAPSED.start_timer(); let mut queries: Vec = params.matches.0; if queries.is_empty() { diff --git a/src/servers/src/http/prometheus_resp.rs b/src/servers/src/http/prometheus_resp.rs new file mode 100644 index 0000000000..3deb0b1091 --- /dev/null +++ b/src/servers/src/http/prometheus_resp.rs @@ -0,0 +1,289 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! prom supply the prometheus HTTP API Server compliance +use std::collections::{BTreeMap, HashMap}; + +use axum::http::HeaderValue; +use axum::response::{IntoResponse, Response}; +use axum::Json; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_query::Output; +use common_recordbatch::RecordBatches; +use datatypes::prelude::ConcreteDataType; +use datatypes::scalars::ScalarVector; +use datatypes::vectors::{Float64Vector, StringVector, TimestampMillisecondVector}; +use promql_parser::label::METRIC_NAME; +use promql_parser::parser::ValueType; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use snafu::{OptionExt, ResultExt}; + +use super::handler::collect_plan_metrics; +use super::header::GREPTIME_DB_HEADER_METRICS; +use super::prometheus::{PromData, PromSeries, PrometheusResponse}; +use crate::error::{CollectRecordbatchSnafu, InternalSnafu, Result}; + +#[derive(Debug, Default, Serialize, Deserialize, JsonSchema, PartialEq)] +pub struct PrometheusJsonResponse { + pub status: String, + pub data: PrometheusResponse, + #[serde(skip_serializing_if = "Option::is_none")] + pub error: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(rename = "errorType")] + pub error_type: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub warnings: Option>, + + // placeholder for header value + #[serde(skip)] + #[serde(default)] + pub resp_metrics: HashMap, +} + +impl IntoResponse for PrometheusJsonResponse { + fn into_response(self) -> Response { + let metrics = if self.resp_metrics.is_empty() { + None + } else { + serde_json::to_string(&self.resp_metrics).ok() + }; + + let mut resp = Json(self).into_response(); + + if let Some(m) = metrics.and_then(|m| HeaderValue::from_str(&m).ok()) { + resp.headers_mut().insert(GREPTIME_DB_HEADER_METRICS, m); + } + + resp + } +} + +impl PrometheusJsonResponse { + pub fn error(error_type: S1, reason: S2) -> Self + where + S1: Into, + S2: Into, + { + PrometheusJsonResponse { + status: "error".to_string(), + data: PrometheusResponse::default(), + error: Some(reason.into()), + error_type: Some(error_type.into()), + warnings: None, + resp_metrics: Default::default(), + } + } + + pub fn success(data: PrometheusResponse) -> Self { + PrometheusJsonResponse { + status: "success".to_string(), + data, + error: None, + error_type: None, + warnings: None, + resp_metrics: Default::default(), + } + } + + /// Convert from `Result` + pub async fn from_query_result( + result: Result, + metric_name: String, + result_type: ValueType, + ) -> Self { + let response: Result = try { + let resp = match result? { + Output::RecordBatches(batches) => Self::success(Self::record_batches_to_data( + batches, + metric_name, + result_type, + )?), + Output::Stream(stream, physical_plan) => { + let record_batches = RecordBatches::try_collect(stream) + .await + .context(CollectRecordbatchSnafu)?; + let mut resp = Self::success(Self::record_batches_to_data( + record_batches, + metric_name, + result_type, + )?); + + if let Some(physical_plan) = physical_plan { + let mut result_map = HashMap::new(); + let mut tmp = vec![&mut result_map]; + collect_plan_metrics(physical_plan, &mut tmp); + + let re = result_map + .into_iter() + .map(|(k, v)| (k, Value::from(v))) + .collect(); + resp.resp_metrics = re; + } + + resp + } + Output::AffectedRows(_) => { + Self::error("Unexpected", "expected data result, but got affected rows") + } + }; + + resp + }; + + let result_type_string = result_type.to_string(); + + match response { + Ok(resp) => resp, + Err(err) => { + // Prometheus won't report error if querying nonexist label and metric + if err.status_code() == StatusCode::TableNotFound + || err.status_code() == StatusCode::TableColumnNotFound + { + Self::success(PrometheusResponse::PromData(PromData { + result_type: result_type_string, + ..Default::default() + })) + } else { + Self::error(err.status_code().to_string(), err.output_msg()) + } + } + } + } + + /// Convert [RecordBatches] to [PromData] + fn record_batches_to_data( + batches: RecordBatches, + metric_name: String, + result_type: ValueType, + ) -> Result { + // infer semantic type of each column from schema. + // TODO(ruihang): wish there is a better way to do this. + let mut timestamp_column_index = None; + let mut tag_column_indices = Vec::new(); + let mut first_field_column_index = None; + + for (i, column) in batches.schema().column_schemas().iter().enumerate() { + match column.data_type { + ConcreteDataType::Timestamp(datatypes::types::TimestampType::Millisecond(_)) => { + if timestamp_column_index.is_none() { + timestamp_column_index = Some(i); + } + } + ConcreteDataType::Float64(_) => { + if first_field_column_index.is_none() { + first_field_column_index = Some(i); + } + } + ConcreteDataType::String(_) => { + tag_column_indices.push(i); + } + _ => {} + } + } + + let timestamp_column_index = timestamp_column_index.context(InternalSnafu { + err_msg: "no timestamp column found".to_string(), + })?; + let first_field_column_index = first_field_column_index.context(InternalSnafu { + err_msg: "no value column found".to_string(), + })?; + + let metric_name = (METRIC_NAME.to_string(), metric_name); + let mut buffer = BTreeMap::, Vec<(f64, String)>>::new(); + + for batch in batches.iter() { + // prepare things... + let tag_columns = tag_column_indices + .iter() + .map(|i| { + batch + .column(*i) + .as_any() + .downcast_ref::() + .unwrap() + }) + .collect::>(); + let tag_names = tag_column_indices + .iter() + .map(|c| batches.schema().column_name_by_index(*c).to_string()) + .collect::>(); + let timestamp_column = batch + .column(timestamp_column_index) + .as_any() + .downcast_ref::() + .unwrap(); + let field_column = batch + .column(first_field_column_index) + .as_any() + .downcast_ref::() + .unwrap(); + + // assemble rows + for row_index in 0..batch.num_rows() { + // retrieve tags + // TODO(ruihang): push table name `__metric__` + let mut tags = vec![metric_name.clone()]; + for (tag_column, tag_name) in tag_columns.iter().zip(tag_names.iter()) { + // TODO(ruihang): add test for NULL tag + if let Some(tag_value) = tag_column.get_data(row_index) { + tags.push((tag_name.to_string(), tag_value.to_string())); + } + } + + // retrieve timestamp + let timestamp_millis: i64 = timestamp_column.get_data(row_index).unwrap().into(); + let timestamp = timestamp_millis as f64 / 1000.0; + + // retrieve value + if let Some(v) = field_column.get_data(row_index) { + buffer + .entry(tags) + .or_default() + .push((timestamp, Into::::into(v).to_string())); + }; + } + } + + let result = buffer + .into_iter() + .map(|(tags, mut values)| { + let metric = tags.into_iter().collect(); + match result_type { + ValueType::Vector | ValueType::Scalar | ValueType::String => Ok(PromSeries { + metric, + value: values.pop(), + ..Default::default() + }), + ValueType::Matrix => Ok(PromSeries { + metric, + values, + ..Default::default() + }), + } + }) + .collect::>>()?; + + let result_type_string = result_type.to_string(); + let data = PrometheusResponse::PromData(PromData { + result_type: result_type_string, + result, + }); + + Ok(data) + } +} diff --git a/src/servers/src/mysql/writer.rs b/src/servers/src/mysql/writer.rs index fd5304ec3e..b01adf374c 100644 --- a/src/servers/src/mysql/writer.rs +++ b/src/servers/src/mysql/writer.rs @@ -81,7 +81,7 @@ impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> { // a local variable. match output { Ok(output) => match output { - Output::Stream(stream) => { + Output::Stream(stream, _) => { let query_result = QueryResult { schema: stream.schema(), stream, diff --git a/src/servers/src/postgres/handler.rs b/src/servers/src/postgres/handler.rs index 11a8f7cdfc..12a05531eb 100644 --- a/src/servers/src/postgres/handler.rs +++ b/src/servers/src/postgres/handler.rs @@ -73,7 +73,7 @@ fn output_to_query_response<'a>( ) -> PgWireResult> { match output { Ok(Output::AffectedRows(rows)) => Ok(Response::Execution(Tag::new("OK").with_rows(rows))), - Ok(Output::Stream(record_stream)) => { + Ok(Output::Stream(record_stream, _)) => { let schema = record_stream.schema(); recordbatches_to_query_response(record_stream, schema, field_format) } diff --git a/src/servers/tests/py_script/mod.rs b/src/servers/tests/py_script/mod.rs index bde643cb84..5b63a91d1b 100644 --- a/src/servers/tests/py_script/mod.rs +++ b/src/servers/tests/py_script/mod.rs @@ -93,7 +93,7 @@ def hello() -> vector[str]: common_query::Output::RecordBatches(_) => { unreachable!() } - common_query::Output::Stream(s) => { + common_query::Output::Stream(s, _) => { let batches = common_recordbatch::util::collect_batches(s).await.unwrap(); let expected = "\ +---------+ diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 8e6b60fa99..7b0e8a625f 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -321,18 +321,6 @@ impl TruncateTableRequest { } } -#[macro_export] -macro_rules! meter_insert_request { - ($req: expr) => { - meter_macros::write_meter!( - $req.catalog_name.to_string(), - $req.schema_name.to_string(), - $req.table_name.to_string(), - $req - ); - }; -} - pub fn valid_table_option(key: &str) -> bool { matches!( key, diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 45e2782315..e1f19c5730 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -21,6 +21,7 @@ use std::task::{Context, Poll}; use common_query::error as query_error; use common_query::error::Result as QueryResult; use common_query::physical_plan::{Partitioning, PhysicalPlan, PhysicalPlanRef}; +use common_recordbatch::adapter::RecordBatchMetrics; use common_recordbatch::error::Result as RecordBatchResult; use common_recordbatch::{RecordBatch, RecordBatchStream, SendableRecordBatchStream}; use common_telemetry::tracing::Span; @@ -153,7 +154,7 @@ impl RecordBatchStream for StreamWithMetricWrapper { self.stream.schema() } - fn metrics(&self) -> Option { + fn metrics(&self) -> Option { self.stream.metrics() } } diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index e464d90a14..45ca65dd33 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -147,7 +147,7 @@ mod test { )), }); let output = query(instance, request).await; - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); @@ -384,7 +384,7 @@ CREATE TABLE {table_name} ( ))), }); let output = query(instance, request.clone()).await; - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); @@ -477,7 +477,7 @@ CREATE TABLE {table_name} ( assert!(matches!(output, Output::AffectedRows(6))); let output = query(instance, request).await; - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); @@ -651,7 +651,7 @@ CREATE TABLE {table_name} ( )), }); let output = query(instance, request.clone()).await; - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); @@ -693,7 +693,7 @@ CREATE TABLE {table_name} ( assert!(matches!(output, Output::AffectedRows(2))); let output = query(instance, request).await; - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); @@ -791,7 +791,7 @@ CREATE TABLE {table_name} ( })), }); let output = query(instance, request).await; - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/tests-integration/src/influxdb.rs b/tests-integration/src/influxdb.rs index 3dc647bd51..305bf55ab8 100644 --- a/tests-integration/src/influxdb.rs +++ b/tests-integration/src/influxdb.rs @@ -80,7 +80,7 @@ monitor1,host=host2 memory=1027"; ) .await; let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; @@ -109,7 +109,7 @@ monitor1,host=host2 memory=1027 1663840496400340001"; ) .await; let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/tests-integration/src/instance.rs b/tests-integration/src/instance.rs index aafed6106d..d3e4c4d80f 100644 --- a/tests-integration/src/instance.rs +++ b/tests-integration/src/instance.rs @@ -173,7 +173,7 @@ mod tests { let sql = "SELECT * FROM demo WHERE ts > cast(1000000000 as timestamp) ORDER BY host"; // use nanoseconds as where condition let output = query(instance, sql).await; - let Output::Stream(s) = output else { + let Output::Stream(s, _) = output else { unreachable!() }; let batches = common_recordbatch::util::collect_batches(s).await.unwrap(); diff --git a/tests-integration/src/opentsdb.rs b/tests-integration/src/opentsdb.rs index c5474ea4e2..497c214017 100644 --- a/tests-integration/src/opentsdb.rs +++ b/tests-integration/src/opentsdb.rs @@ -84,7 +84,7 @@ mod tests { .remove(0) .unwrap(); match output { - Output::Stream(stream) => { + Output::Stream(stream, _) => { let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); let pretty_print = recordbatches.pretty_print().unwrap(); let expected = vec![ diff --git a/tests-integration/src/otlp.rs b/tests-integration/src/otlp.rs index 9afbe1b9c3..8a3b0cee25 100644 --- a/tests-integration/src/otlp.rs +++ b/tests-integration/src/otlp.rs @@ -75,7 +75,7 @@ mod test { ) .await; let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); @@ -97,7 +97,7 @@ mod test { ) .await; let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); @@ -117,7 +117,7 @@ mod test { .do_query("SELECT * FROM my_test_histo_sum", ctx.clone()) .await; let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); @@ -135,7 +135,7 @@ mod test { .do_query("SELECT * FROM my_test_histo_count", ctx.clone()) .await; let output = output.remove(0).unwrap(); - let Output::Stream(stream) = output else { + let Output::Stream(stream, _) = output else { unreachable!() }; let recordbatches = RecordBatches::try_collect(stream).await.unwrap(); diff --git a/tests-integration/src/tests/instance_kafka_wal_test.rs b/tests-integration/src/tests/instance_kafka_wal_test.rs index c882f17f95..f6f05b53d0 100644 --- a/tests-integration/src/tests/instance_kafka_wal_test.rs +++ b/tests-integration/src/tests/instance_kafka_wal_test.rs @@ -60,7 +60,7 @@ async fn test_create_database_and_insert_query(instance: Option { + Output::Stream(s, _) => { let batches = util::collect(s).await.unwrap(); assert_eq!(1, batches[0].num_columns()); assert_eq!( diff --git a/tests-integration/src/tests/instance_test.rs b/tests-integration/src/tests/instance_test.rs index b1e796b796..88fba46d15 100644 --- a/tests-integration/src/tests/instance_test.rs +++ b/tests-integration/src/tests/instance_test.rs @@ -68,7 +68,7 @@ async fn test_create_database_and_insert_query(instance: Arc) let query_output = execute_sql(&instance, "select ts from test.demo order by ts limit 1").await; match query_output { - Output::Stream(s) => { + Output::Stream(s, _) => { let batches = util::collect(s).await.unwrap(); assert_eq!(1, batches[0].num_columns()); assert_eq!( @@ -299,7 +299,7 @@ async fn test_issue477_same_table_name_in_different_databases(instance: Arc, sql: &str, ts: i64, host: &str) { let query_output = execute_sql(instance, sql).await; match query_output { - Output::Stream(s) => { + Output::Stream(s, _) => { let batches = util::collect(s).await.unwrap(); // let columns = batches[0].df_recordbatch.columns(); assert_eq!(2, batches[0].num_columns()); @@ -421,7 +421,7 @@ async fn test_execute_query(instance: Arc) { let output = execute_sql(&instance, "select sum(number) from numbers limit 20").await; match output { - Output::Stream(recordbatch) => { + Output::Stream(recordbatch, _) => { let numbers = util::collect(recordbatch).await.unwrap(); assert_eq!(1, numbers[0].num_columns()); assert_eq!(numbers[0].column(0).len(), 1); diff --git a/tests-integration/src/tests/test_util.rs b/tests-integration/src/tests/test_util.rs index c6f782502b..c582844946 100644 --- a/tests-integration/src/tests/test_util.rs +++ b/tests-integration/src/tests/test_util.rs @@ -330,7 +330,7 @@ pub(crate) async fn check_unordered_output_stream(output: Output, expected: &str }; let recordbatches = match output { - Output::Stream(stream) => util::collect_batches(stream).await.unwrap(), + Output::Stream(stream, _) => util::collect_batches(stream).await.unwrap(), Output::RecordBatches(recordbatches) => recordbatches, _ => unreachable!(), }; diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 12625182ed..e54450589f 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -383,7 +383,7 @@ async fn insert_and_assert(db: &Database) { let record_batches = match output { Output::RecordBatches(record_batches) => record_batches, - Output::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(), + Output::Stream(stream, _) => RecordBatches::try_collect(stream).await.unwrap(), Output::AffectedRows(_) => unreachable!(), }; @@ -540,6 +540,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { error: None, error_type: None, warnings: None, + resp_metrics: Default::default(), }; assert_eq!(instant_query_result, expected); @@ -591,6 +592,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { error: None, error_type: None, warnings: None, + resp_metrics: Default::default(), }; assert_eq!(range_query_result, expected); @@ -621,6 +623,7 @@ pub async fn test_prom_gateway_query(store_type: StorageType) { error: None, error_type: None, warnings: None, + resp_metrics: Default::default(), }; assert_eq!(range_query_result, expected); @@ -683,7 +686,7 @@ pub async fn test_grpc_timezone(store_type: StorageType) { async fn to_batch(output: Output) -> String { match output { Output::RecordBatches(batch) => batch, - Output::Stream(stream) => RecordBatches::try_collect(stream).await.unwrap(), + Output::Stream(stream, _) => RecordBatches::try_collect(stream).await.unwrap(), Output::AffectedRows(_) => unreachable!(), } .pretty_print() diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 0b9ebe7050..c3c2db593f 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -466,7 +466,7 @@ impl Database for GreptimeDB { }) as _ } else { let mut result = client.sql(&query).await; - if let Ok(Output::Stream(stream)) = result { + if let Ok(Output::Stream(stream, _)) = result { match RecordBatches::try_collect(stream).await { Ok(recordbatches) => result = Ok(Output::RecordBatches(recordbatches)), Err(e) => { @@ -577,7 +577,7 @@ impl Display for ResultDisplayer { } } } - Output::Stream(_) => unreachable!(), + Output::Stream(_, _) => unreachable!(), }, Err(e) => { let status_code = e.status_code();