diff --git a/src/mito2/src/read/scan_util.rs b/src/mito2/src/read/scan_util.rs index 2adea77b0e..a952ec20c6 100644 --- a/src/mito2/src/read/scan_util.rs +++ b/src/mito2/src/read/scan_util.rs @@ -137,7 +137,16 @@ pub(crate) fn scan_mem_ranges( for range in ranges { let build_reader_start = Instant::now(); let iter = range.build_iter(time_range)?; - part_metrics.inc_build_reader_cost(build_reader_start.elapsed()); + let build_cost = build_reader_start.elapsed(); + part_metrics.inc_build_reader_cost(build_cost); + common_telemetry::debug!( + "Thread: {:?}, Scan mem range, region_id: {}, time_range: {:?}, index: {:?}, build_cost: {:?}", + std::thread::current().id(), + stream_ctx.input.mapper.metadata().region_id, + time_range, + index, + build_cost + ); let mut source = Source::Iter(iter); while let Some(batch) = source.next_batch().await? { @@ -166,7 +175,14 @@ pub(crate) fn scan_file_ranges( let build_cost = build_reader_start.elapsed(); part_metrics.inc_build_reader_cost(build_cost); if read_type == "unordered_scan_files" { - common_telemetry::debug!("Scan file range, region_id: {}, file_id: {}, build_cost: {:?}", stream_ctx.input.mapper.metadata().region_id, range.file_handle().file_id(), build_cost); + common_telemetry::debug!( + "Thread: {:?}, Scan file range, region_id: {}, file_id: {}, index: {:?}, build_cost: {:?}", + std::thread::current().id(), + stream_ctx.input.mapper.metadata().region_id, + range.file_handle().file_id(), + index, + build_cost + ); } let compat_batch = range.compat_batch(); let mut source = Source::PruneReader(reader); diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index b7d49c4316..d7663349e0 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -138,8 +138,13 @@ impl UnorderedScan { let cache = stream_ctx.input.cache_manager.as_deref(); // Scans each part. for part_range in part_ranges { - let mut metrics = ScannerMetrics::default(); - let mut fetch_start = Instant::now(); + common_telemetry::debug!( + "Thread: {:?}, Unordered scan range start, region_id: {}, partition: {}, part_range: {:?}", + std::thread::current().id(), + stream_ctx.input.mapper.metadata().region_id, + partition, + part_range, + ); #[cfg(debug_assertions)] let mut checker = crate::read::BatchChecker::default() .with_start(Some(part_range.start)) @@ -150,6 +155,8 @@ impl UnorderedScan { part_range.identifier, part_metrics.clone(), ); + let mut metrics = ScannerMetrics::default(); + let mut fetch_start = Instant::now(); for await batch in stream { let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?; metrics.scan_cost += fetch_start.elapsed(); @@ -189,7 +196,16 @@ impl UnorderedScan { } let scan_cost = fetch_start.elapsed(); - common_telemetry::debug!("Unordered scan range, region_id: {}, scan_cost: {:?}", stream_ctx.input.mapper.metadata().region_id, scan_cost); + common_telemetry::debug!( + "Thread: {:?}, Unordered scan range end, region_id: {}, partition: {}, part_range: {:?}, scan_cost: {:?}, yieid_cost: {:?}, num_rows: {}", + std::thread::current().id(), + stream_ctx.input.mapper.metadata().region_id, + partition, + part_range, + scan_cost, + metrics.yield_cost, + metrics.num_rows, + ); metrics.scan_cost += scan_cost; part_metrics.merge_metrics(&metrics); } diff --git a/src/query/src/part_sort.rs b/src/query/src/part_sort.rs index 3df31a64e5..e01f49f222 100644 --- a/src/query/src/part_sort.rs +++ b/src/query/src/part_sort.rs @@ -94,8 +94,13 @@ impl PartSortExec { self.input.execute(partition, context.clone())?; if partition >= self.partition_ranges.len() { + common_telemetry::error!( + "to_stream: Partition index out of range: {} >= {}", + partition, + self.partition_ranges.len() + ); internal_err!( - "Partition index out of range: {} >= {}", + "to_stream: Partition index out of range: {} >= {}", partition, self.partition_ranges.len() )?; @@ -275,8 +280,14 @@ impl PartSortStream { min_max_idx: (usize, usize), ) -> datafusion_common::Result<()> { if self.cur_part_idx >= self.partition_ranges.len() { + common_telemetry::error!( + "check_in_range: Partition index out of range: {} >= {}, ranges: {:?}", + self.cur_part_idx, + self.partition_ranges.len(), + self.partition_ranges, + ); internal_err!( - "Partition index out of range: {} >= {}", + "check_in_range: Partition index out of range: {} >= {}", self.cur_part_idx, self.partition_ranges.len() )?; @@ -308,8 +319,14 @@ impl PartSortStream { // check if the current partition index is out of range if self.cur_part_idx >= self.partition_ranges.len() { + common_telemetry::error!( + "try_find_next_range: Partition index out of range: {} >= {}, ranges: {:?}", + self.cur_part_idx, + self.partition_ranges.len(), + self.partition_ranges, + ); internal_err!( - "Partition index out of range: {} >= {}", + "try_find_next_range: Partition index out of range: {} >= {}", self.cur_part_idx, self.partition_ranges.len() )?;