From a1d0dcf2c35d0fa0342fcc4846540e41484c8465 Mon Sep 17 00:00:00 2001 From: evenyag Date: Tue, 5 Nov 2024 20:25:05 +0800 Subject: [PATCH] chore: more logs --- src/mito2/src/read/scan_util.rs | 12 ++++++++++-- src/mito2/src/read/seq_scan.rs | 10 ++++++++-- src/mito2/src/read/unordered_scan.rs | 6 ++++-- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index a952ec20c6..583796b9e1 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -88,6 +88,10 @@ impl PartitionMetrics { Self(Arc::new(Mutex::new(inner))) } + pub(crate) fn partition(&self) -> usize { + self.0.lock().unwrap().partition + } + pub(crate) fn on_first_poll(&self) { let mut inner = self.0.lock().unwrap(); inner.first_poll = inner.query_start.elapsed(); @@ -126,6 +130,7 @@ impl PartitionMetrics { /// Scans memtable ranges at `index`. pub(crate) fn scan_mem_ranges( + partition: usize, stream_ctx: Arc, part_metrics: PartitionMetrics, index: RowGroupIndex, @@ -140,9 +145,10 @@ pub(crate) fn scan_mem_ranges( let build_cost = build_reader_start.elapsed(); part_metrics.inc_build_reader_cost(build_cost); common_telemetry::debug!( - "Thread: {:?}, Scan mem range, region_id: {}, time_range: {:?}, index: {:?}, build_cost: {:?}", + "Thread: {:?}, Scan mem range, region_id: {}, partition: {}, time_range: {:?}, index: {:?}, build_cost: {:?}", std::thread::current().id(), stream_ctx.input.mapper.metadata().region_id, + partition, time_range, index, build_cost @@ -158,6 +164,7 @@ pub(crate) fn scan_mem_ranges( /// Scans file ranges at `index`. pub(crate) fn scan_file_ranges( + partition: usize, stream_ctx: Arc, part_metrics: PartitionMetrics, index: RowGroupIndex, @@ -176,9 +183,10 @@ pub(crate) fn scan_file_ranges( part_metrics.inc_build_reader_cost(build_cost); if read_type == "unordered_scan_files" { common_telemetry::debug!( - "Thread: {:?}, Scan file range, region_id: {}, file_id: {}, index: {:?}, build_cost: {:?}", + "Thread: {:?}, Scan file range, region_id: {}, partition: {}, file_id: {}, index: {:?}, build_cost: {:?}", std::thread::current().id(), stream_ctx.input.mapper.metadata().region_id, + partition, range.file_handle().file_id(), index, build_cost diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 9b7a71a36c..b32f5003d6 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -361,6 +361,7 @@ fn build_sources( for index in &range_meta.row_group_indices { let stream = if stream_ctx.is_mem_range_index(*index) { let stream = scan_mem_ranges( + part_metrics.partition(), stream_ctx.clone(), part_metrics.clone(), *index, @@ -373,8 +374,13 @@ fn build_sources( } else { "seq_scan_files" }; - let stream = - scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, read_type); + let stream = scan_file_ranges( + part_metrics.partition(), + stream_ctx.clone(), + part_metrics.clone(), + *index, + read_type, + ); Box::pin(stream) as _ }; sources.push(Source::Stream(stream)); diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 6f8af9befa..7bdea6c643 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -81,6 +81,7 @@ impl UnorderedScan { /// Scans a [PartitionRange] by its `identifier` and returns a stream. fn scan_partition_range( + partition: usize, stream_ctx: Arc, part_range_id: usize, part_metrics: PartitionMetrics, @@ -90,12 +91,12 @@ impl UnorderedScan { let range_meta = &stream_ctx.ranges[part_range_id]; for index in &range_meta.row_group_indices { if stream_ctx.is_mem_range_index(*index) { - let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range); + let stream = scan_mem_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range); for await batch in stream { yield batch; } } else { - let stream = scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files"); + let stream = scan_file_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files"); for await batch in stream { yield batch; } @@ -160,6 +161,7 @@ impl UnorderedScan { .with_end(Some(part_range.end)); let stream = Self::scan_partition_range( + partition, stream_ctx.clone(), part_range.identifier, part_metrics.clone(),