chore: carry metrics in flight metadata from datanode to frontend (#3113)

* chore: carry metrics in flight metadata from datanode to frontend

* chore: fix typo

* fix: ignore metric flight message on client

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* chore: add cr comment

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: add cr comment

Co-authored-by: Ruihang Xia <waynestxia@gmail.com>

* chore: update proto

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
shuiyisong
2024-01-17 19:38:03 +08:00
committed by GitHub
parent ae160c2def
commit a29b9f71be
16 changed files with 178 additions and 28 deletions

6
Cargo.lock generated
View File

@@ -1455,6 +1455,7 @@ name = "client"
version = "0.6.0"
dependencies = [
"api",
"arc-swap",
"arrow-flight",
"async-stream",
"async-trait",
@@ -1915,6 +1916,7 @@ dependencies = [
name = "common-recordbatch"
version = "0.6.0"
dependencies = [
"arc-swap",
"common-error",
"common-macro",
"datafusion",
@@ -3635,7 +3637,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "greptime-proto"
version = "0.1.0"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=2c1f17dce7af748c9a1255e82d6ceb7959f8919b#2c1f17dce7af748c9a1255e82d6ceb7959f8919b"
source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=65b008f018395f8fa917a7d3c7883b82f309cb74#65b008f018395f8fa917a7d3c7883b82f309cb74"
dependencies = [
"prost 0.12.3",
"serde",
@@ -6914,6 +6916,8 @@ dependencies = [
"greptime-proto",
"humantime",
"lazy_static",
"meter-core",
"meter-macros",
"num",
"num-traits",
"object-store",

View File

@@ -92,7 +92,7 @@ etcd-client = "0.12"
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "2c1f17dce7af748c9a1255e82d6ceb7959f8919b" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "65b008f018395f8fa917a7d3c7883b82f309cb74" }
humantime-serde = "1.1"
itertools = "0.10"
lazy_static = "1.4"

View File

@@ -312,6 +312,7 @@ impl DataSource for InformationTableDataSource {
schema: projected_schema,
stream: Box::pin(stream),
output_ordering: None,
metrics: Default::default(),
};
Ok(Box::pin(stream))

View File

@@ -9,6 +9,7 @@ testing = []
[dependencies]
api.workspace = true
arc-swap = "1.6"
arrow-flight.workspace = true
async-stream.workspace = true
async-trait.workspace = true

View File

@@ -309,30 +309,36 @@ impl Database {
);
Ok(Output::AffectedRows(rows))
}
FlightMessage::Recordbatch(_) => IllegalFlightMessagesSnafu {
reason: "The first flight message cannot be a RecordBatch message",
FlightMessage::Recordbatch(_) | FlightMessage::Metrics(_) => {
IllegalFlightMessagesSnafu {
reason: "The first flight message cannot be a RecordBatch or Metrics message",
}
.fail()
}
.fail(),
FlightMessage::Schema(schema) => {
let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let FlightMessage::Recordbatch(record_batch) = flight_message else {
yield IllegalFlightMessagesSnafu {reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"}
match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::Metrics(_) => {}
FlightMessage::AffectedRows(_) | FlightMessage::Schema(_) => {
yield IllegalFlightMessagesSnafu {reason: format!("A Schema message must be succeeded exclusively by a set of RecordBatch messages, flight_message: {:?}", flight_message)}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
};
yield Ok(record_batch);
break;
}
}
}
}));
let record_batch_stream = RecordBatchStreamWrapper {
schema,
stream,
output_ordering: None,
metrics: Default::default(),
};
Ok(Output::Stream(Box::pin(record_batch_stream)))
}

View File

@@ -12,8 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use api::v1::region::{QueryRequest, RegionRequest, RegionResponse};
use api::v1::ResponseHeader;
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
use async_stream::stream;
use async_trait::async_trait;
@@ -119,27 +122,38 @@ impl RegionRequester {
.fail();
};
let metrics_str = Arc::new(ArcSwapOption::from(None));
let ref_str = metrics_str.clone();
let stream = Box::pin(stream!({
while let Some(flight_message) = flight_message_stream.next().await {
let flight_message = flight_message
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
let FlightMessage::Recordbatch(record_batch) = flight_message else {
yield IllegalFlightMessagesSnafu {
match flight_message {
FlightMessage::Recordbatch(record_batch) => yield Ok(record_batch),
FlightMessage::Metrics(s) => {
ref_str.swap(Some(Arc::new(s)));
break;
}
_ => {
yield IllegalFlightMessagesSnafu {
reason: "A Schema message must be succeeded exclusively by a set of RecordBatch messages"
}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
};
yield Ok(record_batch);
break;
}
}
}
}));
let record_batch_stream = RecordBatchStreamWrapper {
schema,
stream,
output_ordering: None,
metrics: metrics_str,
};
Ok(Box::pin(record_batch_stream))
}

View File

@@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::{AffectedRows, FlightMetadata};
use api::v1::{AffectedRows, FlightMetadata, Metrics};
use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::{FlightData, SchemaAsIpc};
use common_base::bytes::Bytes;
@@ -39,6 +39,7 @@ pub enum FlightMessage {
Schema(SchemaRef),
Recordbatch(RecordBatch),
AffectedRows(usize),
Metrics(String),
}
pub struct FlightEncoder {
@@ -85,6 +86,22 @@ impl FlightEncoder {
FlightMessage::AffectedRows(rows) => {
let metadata = FlightMetadata {
affected_rows: Some(AffectedRows { value: rows as _ }),
metrics: None,
}
.encode_to_vec();
FlightData {
flight_descriptor: None,
data_header: build_none_flight_msg().into(),
app_metadata: metadata.into(),
data_body: ProstBytes::default(),
}
}
FlightMessage::Metrics(s) => {
let metadata = FlightMetadata {
affected_rows: None,
metrics: Some(Metrics {
metrics: s.as_bytes().to_vec(),
}),
}
.encode_to_vec();
FlightData {
@@ -119,6 +136,11 @@ impl FlightDecoder {
if let Some(AffectedRows { value }) = metadata.affected_rows {
return Ok(FlightMessage::AffectedRows(value as _));
}
if let Some(Metrics { metrics }) = metadata.metrics {
return Ok(FlightMessage::Metrics(
String::from_utf8_lossy(&metrics).to_string(),
));
}
InvalidFlightDataSnafu {
reason: "Expecting FlightMetadata have some meaningful content.",
}

View File

@@ -143,8 +143,12 @@ impl PhysicalPlan for PhysicalPlanAdapter {
let stream = df_plan
.execute(partition, context)
.context(error::GeneralDataFusionSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new_with_metrics(stream, baseline_metric)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
let adapter = RecordBatchStreamAdapter::try_new_with_metrics_and_df_plan(
stream,
baseline_metric,
df_plan,
)
.context(error::ConvertDfRecordBatchStreamSnafu)?;
Ok(Box::pin(adapter))
}

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
arc-swap = "1.6"
common-error.workspace = true
common-macro.workspace = true
datafusion-common.workspace = true
@@ -13,8 +14,8 @@ datatypes.workspace = true
futures.workspace = true
paste = "1.0"
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
[dev-dependencies]
serde_json = "1.0"
tokio.workspace = true

View File

@@ -21,8 +21,8 @@ use datafusion::arrow::compute::cast;
use datafusion::arrow::datatypes::SchemaRef as DfSchemaRef;
use datafusion::error::Result as DfResult;
use datafusion::parquet::arrow::async_reader::{AsyncFileReader, ParquetRecordBatchStream};
use datafusion::physical_plan::metrics::BaselineMetrics;
use datafusion::physical_plan::RecordBatchStream as DfRecordBatchStream;
use datafusion::physical_plan::metrics::{BaselineMetrics, MetricValue};
use datafusion::physical_plan::{ExecutionPlan, RecordBatchStream as DfRecordBatchStream};
use datafusion_common::DataFusionError;
use datatypes::schema::{Schema, SchemaRef};
use futures::ready;
@@ -154,6 +154,15 @@ pub struct RecordBatchStreamAdapter {
schema: SchemaRef,
stream: DfSendableRecordBatchStream,
metrics: Option<BaselineMetrics>,
/// Aggregated plan-level metrics. Resolved after an [ExecutionPlan] is finished.
metrics_2: Metrics,
}
/// Json encoded metrics. Contains metric from a whole plan tree.
enum Metrics {
Unavailable,
Unresolved(Arc<dyn ExecutionPlan>),
Resolved(String),
}
impl RecordBatchStreamAdapter {
@@ -164,12 +173,14 @@ impl RecordBatchStreamAdapter {
schema,
stream,
metrics: None,
metrics_2: Metrics::Unavailable,
})
}
pub fn try_new_with_metrics(
pub fn try_new_with_metrics_and_df_plan(
stream: DfSendableRecordBatchStream,
metrics: BaselineMetrics,
df_plan: Arc<dyn ExecutionPlan>,
) -> Result<Self> {
let schema =
Arc::new(Schema::try_from(stream.schema()).context(error::SchemaConversionSnafu)?);
@@ -177,14 +188,26 @@ impl RecordBatchStreamAdapter {
schema,
stream,
metrics: Some(metrics),
metrics_2: Metrics::Unresolved(df_plan),
})
}
pub fn set_metrics2(&mut self, plan: Arc<dyn ExecutionPlan>) {
self.metrics_2 = Metrics::Unresolved(plan)
}
}
impl RecordBatchStream for RecordBatchStreamAdapter {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn metrics(&self) -> Option<String> {
match &self.metrics_2 {
Metrics::Resolved(metrics) => Some(metrics.clone()),
Metrics::Unavailable | Metrics::Unresolved(_) => None,
}
}
}
impl Stream for RecordBatchStreamAdapter {
@@ -206,7 +229,17 @@ impl Stream for RecordBatchStreamAdapter {
df_record_batch,
)))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(None) => {
if let Metrics::Unresolved(df_plan) = &self.metrics_2 {
let mut metrics_holder = RecordBatchMetrics::default();
collect_metrics(df_plan, &mut metrics_holder);
if metrics_holder.elapsed_compute != 0 || metrics_holder.memory_usage != 0 {
self.metrics_2 =
Metrics::Resolved(serde_json::to_string(&metrics_holder).unwrap());
}
}
Poll::Ready(None)
}
}
}
@@ -216,6 +249,30 @@ impl Stream for RecordBatchStreamAdapter {
}
}
fn collect_metrics(df_plan: &Arc<dyn ExecutionPlan>, result: &mut RecordBatchMetrics) {
if let Some(metrics) = df_plan.metrics() {
metrics.iter().for_each(|m| match m.value() {
MetricValue::ElapsedCompute(ec) => result.elapsed_compute += ec.value(),
MetricValue::CurrentMemoryUsage(m) => result.memory_usage += m.value(),
_ => {}
});
}
for child in df_plan.children() {
collect_metrics(&child, result);
}
}
/// [`RecordBatchMetrics`] carrys metrics value
/// from datanode to frontend through gRPC
#[derive(serde::Serialize, serde::Deserialize, Default, Debug)]
pub struct RecordBatchMetrics {
// cpu consumption in nanoseconds
pub elapsed_compute: usize,
// memory used by the plan in bytes
pub memory_usage: usize,
}
enum AsyncRecordBatchStreamAdapterState {
Uninit(FutureStream),
Ready(SendableRecordBatchStream),

View File

@@ -20,6 +20,7 @@ pub mod util;
use std::pin::Pin;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
use datafusion::physical_plan::memory::MemoryStream;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::compute::SortOptions;
@@ -39,6 +40,10 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
fn output_ordering(&self) -> Option<&[OrderOption]> {
None
}
fn metrics(&self) -> Option<String> {
None
}
}
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
@@ -206,6 +211,7 @@ pub struct RecordBatchStreamWrapper<S> {
pub schema: SchemaRef,
pub stream: S,
pub output_ordering: Option<Vec<OrderOption>>,
pub metrics: Arc<ArcSwapOption<String>>,
}
impl<S> RecordBatchStreamWrapper<S> {
@@ -215,6 +221,7 @@ impl<S> RecordBatchStreamWrapper<S> {
schema,
stream,
output_ordering: None,
metrics: Default::default(),
}
}
}
@@ -229,6 +236,10 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
fn output_ordering(&self) -> Option<&[OrderOption]> {
self.output_ordering.as_deref()
}
fn metrics(&self) -> Option<String> {
self.metrics.load().as_ref().map(|s| s.as_ref().clone())
}
}
impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamWrapper<S> {

View File

@@ -39,6 +39,8 @@ futures-util.workspace = true
greptime-proto.workspace = true
humantime = "2.1"
lazy_static.workspace = true
meter-core.workspace = true
meter-macros.workspace = true
object-store.workspace = true
once_cell.workspace = true
partition.workspace = true

View File

@@ -410,9 +410,9 @@ impl QueryExecutor for DatafusionQueryEngine {
.map_err(BoxedError::new)
.context(QueryExecutionSnafu))?,
_ => {
let df_plan = Arc::new(DfPhysicalPlanAdapter(plan.clone()));
// merge into a single partition
let plan =
CoalescePartitionsExec::new(Arc::new(DfPhysicalPlanAdapter(plan.clone())));
let plan = CoalescePartitionsExec::new(df_plan.clone());
// CoalescePartitionsExec must produce a single partition
assert_eq!(1, plan.output_partitioning().partition_count());
let df_stream = plan
@@ -420,10 +420,11 @@ impl QueryExecutor for DatafusionQueryEngine {
.context(error::DatafusionSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
let stream = RecordBatchStreamAdapter::try_new(df_stream)
let mut stream = RecordBatchStreamAdapter::try_new(df_stream)
.context(error::ConvertDfRecordBatchStreamSnafu)
.map_err(BoxedError::new)
.context(QueryExecutionSnafu)?;
stream.set_metrics2(df_plan);
Ok(Box::pin(stream))
}
}

View File

@@ -19,16 +19,17 @@ use std::time::Duration;
use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use async_stream::stream;
use common_base::bytes::Bytes;
use common_catalog::parse_catalog_and_schema_from_db_string;
use common_error::ext::BoxedError;
use common_meta::table_name::TableName;
use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
};
use common_telemetry::tracing;
use common_telemetry::tracing_context::TracingContext;
use common_telemetry::{tracing, warn};
use datafusion::physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet, Time,
};
@@ -39,6 +40,7 @@ use datafusion_physical_expr::PhysicalSortExpr;
use datatypes::schema::{Schema, SchemaRef};
use futures_util::StreamExt;
use greptime_proto::v1::region::{QueryRequest, RegionRequestHeader};
use meter_macros::read_meter;
use snafu::ResultExt;
use store_api::storage::RegionId;
use tokio::time::Instant;
@@ -213,6 +215,20 @@ impl MergeScanExec {
// reset poll timer
poll_timer = Instant::now();
}
if let Some(metrics) = stream
.metrics()
.and_then(|m| serde_json::from_str::<RecordBatchMetrics>(&m).ok())
{
let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
read_meter!(
c,
s,
metrics.elapsed_compute as u64,
metrics.memory_usage as u64,
0
);
}
METRIC_MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
}
}));
@@ -221,6 +237,7 @@ impl MergeScanExec {
schema: self.schema.clone(),
stream,
output_ordering: None,
metrics: Default::default(),
}))
}

View File

@@ -83,6 +83,11 @@ impl FlightRecordBatchStream {
}
}
}
// make last package to pass metrics
if let Some(m) = recordbatches.metrics() {
let metrics = FlightMessage::Metrics(m);
let _ = tx.send(Ok(metrics)).await;
}
}
}

View File

@@ -144,6 +144,10 @@ impl RecordBatchStream for StreamWithMetricWrapper {
fn schema(&self) -> SchemaRef {
self.stream.schema()
}
fn metrics(&self) -> Option<String> {
self.stream.metrics()
}
}
#[cfg(test)]