chore: logs to debug hang

This commit is contained in:
evenyag
2024-11-07 20:36:12 +08:00
parent fce8c968da
commit 8536a1ec6e
4 changed files with 38 additions and 6 deletions

View File

@@ -666,6 +666,11 @@ impl ScanInput {
reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> {
let file = &self.files[file_index];
common_telemetry::info!(
"ScanInput prune file start, region_id: {}, file: {}",
file.region_id(),
file.file_id()
);
let res = self
.access_layer
.read_sst(file.clone())
@@ -701,6 +706,12 @@ impl ScanInput {
)?;
file_range_ctx.set_compat_batch(Some(compat));
}
common_telemetry::info!(
"ScanInput prune file end, region_id: {}, file: {}, row_groups_num: {}",
file.region_id(),
file.file_id(),
row_groups.len()
);
Ok(FileRangeBuilder {
context: Some(Arc::new(file_range_ctx)),
row_groups,

View File

@@ -181,6 +181,15 @@ pub(crate) fn scan_file_ranges(
) -> impl Stream<Item = Result<Batch>> {
try_stream! {
let mut reader_metrics = ReaderMetrics::default();
if read_type == "unordered_scan_files" {
common_telemetry::debug!(
"Thread: {:?}, Scan file ranges build ranges start, region_id: {}, partition: {}, index: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
index,
);
}
let ranges = stream_ctx
.build_file_ranges(index, &mut reader_metrics)
.await?;

View File

@@ -134,10 +134,11 @@ impl UnorderedScan {
let distinguish_range = self.properties.distinguish_partition_range();
common_telemetry::info!(
"Thread: {:?}, Unordered scan start, region_id: {}, partition: {}, part_ranges: {:?}",
"Thread: {:?}, Unordered scan start, region_id: {}, partition: {}, num_ranges: {}, part_ranges: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
part_ranges.len(),
part_ranges,
);
@@ -145,11 +146,14 @@ impl UnorderedScan {
part_metrics.on_first_poll();
let cache = stream_ctx.input.cache_manager.as_deref();
let ranges_len = part_ranges.len();
// Scans each part.
for part_range in part_ranges {
for (part_idx, part_range) in part_ranges.into_iter().enumerate() {
common_telemetry::debug!(
"Thread: {:?}, Unordered scan range start, region_id: {}, partition: {}, part_range: {:?}, range_meta: {:?}",
"Thread: {:?}, Unordered scan range start {}/{}, region_id: {}, partition: {}, part_range: {:?}, range_meta: {:?}",
std::thread::current().id(),
part_idx,
ranges_len,
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
@@ -207,17 +211,19 @@ impl UnorderedScan {
}
let scan_cost = fetch_start.elapsed();
metrics.scan_cost += scan_cost;
common_telemetry::debug!(
"Thread: {:?}, Unordered scan range end, region_id: {}, partition: {}, part_range: {:?}, scan_cost: {:?}, yieid_cost: {:?}, num_rows: {}",
"Thread: {:?}, Unordered scan range end {}/{}, region_id: {}, partition: {}, part_range: {:?}, scan_cost: {:?}, yieid_cost: {:?}, num_rows: {}",
std::thread::current().id(),
part_idx,
ranges_len,
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
scan_cost,
metrics.scan_cost,
metrics.yield_cost,
metrics.num_rows,
);
metrics.scan_cost += scan_cost;
part_metrics.merge_metrics(&metrics);
}

View File

@@ -591,6 +591,12 @@ impl PartSortStream {
// input stream end, mark and continue
Poll::Ready(None) => {
self.input_complete = true;
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} input complete",
self.region_id,
self.partition,
self.cur_part_idx,
);
continue;
}
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),