From 64ae32def0ea237d847bc0c101fc3571667f9dc4 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 28 Aug 2024 16:52:09 +0800 Subject: [PATCH] feat: remove some redundent clone/conversion on constructing MergeScan stream (#4632) Signed-off-by: Ruihang Xia --- src/query/src/dist_plan/merge_scan.rs | 34 +++++++++++---------------- src/session/src/context.rs | 6 +++++ 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/query/src/dist_plan/merge_scan.rs b/src/query/src/dist_plan/merge_scan.rs index 3bada2533a..c8a4ebcc77 100644 --- a/src/query/src/dist_plan/merge_scan.rs +++ b/src/query/src/dist_plan/merge_scan.rs @@ -41,7 +41,6 @@ use datafusion_physical_expr::EquivalenceProperties; use datatypes::schema::{Schema, SchemaRef}; use futures_util::StreamExt; use greptime_proto::v1::region::RegionRequestHeader; -use greptime_proto::v1::QueryContext; use meter_core::data::ReadItem; use meter_macros::read_meter; use session::context::QueryContextRef; @@ -185,24 +184,25 @@ impl MergeScanExec { context: Arc, partition: usize, ) -> Result { + // prepare states to move let regions = self.regions.clone(); let region_query_handler = self.region_query_handler.clone(); let metric = MergeScanMetric::new(&self.metric); - let schema = Self::arrow_schema_to_schema(self.schema())?; - - let dbname = context.task_id().unwrap_or_default(); - let tracing_context = TracingContext::from_json(context.session_id().as_str()); - let current_catalog = self.query_ctx.current_catalog().to_string(); - let current_schema = self.query_ctx.current_schema().to_string(); - let current_channel = self.query_ctx.channel(); - let timezone = self.query_ctx.timezone().to_string(); - let extensions = self.query_ctx.extensions(); - let target_partition = self.target_partition; - + let schema = self.schema.clone(); + let query_ctx = self.query_ctx.clone(); let sub_stage_metrics_moved = self.sub_stage_metrics.clone(); let plan = self.plan.clone(); + let target_partition = self.target_partition; + let dbname = context.task_id().unwrap_or_default(); + let tracing_context = TracingContext::from_json(context.session_id().as_str()); + let current_channel = self.query_ctx.channel(); + let stream = Box::pin(stream!({ - MERGE_SCAN_REGIONS.observe(regions.len() as f64); + // only report metrics once for each MergeScan + if partition == 0 { + MERGE_SCAN_REGIONS.observe(regions.len() as f64); + } + let _finish_timer = metric.finish_time().timer(); let mut ready_timer = metric.ready_time().timer(); let mut first_consume_timer = Some(metric.first_consume_time().timer()); @@ -217,13 +217,7 @@ impl MergeScanExec { header: Some(RegionRequestHeader { tracing_context: tracing_context.to_w3c(), dbname: dbname.clone(), - query_context: Some(QueryContext { - current_catalog: current_catalog.clone(), - current_schema: current_schema.clone(), - timezone: timezone.clone(), - extensions: extensions.clone(), - channel: current_channel as u32, - }), + query_context: Some(query_ctx.as_ref().into()), }), region_id, plan: plan.clone(), diff --git a/src/session/src/context.rs b/src/session/src/context.rs index 15ea0a8baf..28ecca6a3f 100644 --- a/src/session/src/context.rs +++ b/src/session/src/context.rs @@ -137,6 +137,12 @@ impl From for api::v1::QueryContext { } } +impl From<&QueryContext> for api::v1::QueryContext { + fn from(ctx: &QueryContext) -> Self { + ctx.clone().into() + } +} + impl QueryContext { pub fn arc() -> QueryContextRef { Arc::new(QueryContextBuilder::default().build())