chore: more logs

This commit is contained in:
evenyag
2024-11-05 13:01:45 +08:00
parent a9f3c4b17c
commit 6bdac25f0a
3 changed files with 57 additions and 8 deletions

View File

@@ -137,7 +137,16 @@ pub(crate) fn scan_mem_ranges(
for range in ranges {
let build_reader_start = Instant::now();
let iter = range.build_iter(time_range)?;
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
common_telemetry::debug!(
"Thread: {:?}, Scan mem range, region_id: {}, time_range: {:?}, index: {:?}, build_cost: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
time_range,
index,
build_cost
);
let mut source = Source::Iter(iter);
while let Some(batch) = source.next_batch().await? {
@@ -166,7 +175,14 @@ pub(crate) fn scan_file_ranges(
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
if read_type == "unordered_scan_files" {
common_telemetry::debug!("Scan file range, region_id: {}, file_id: {}, build_cost: {:?}", stream_ctx.input.mapper.metadata().region_id, range.file_handle().file_id(), build_cost);
common_telemetry::debug!(
"Thread: {:?}, Scan file range, region_id: {}, file_id: {}, index: {:?}, build_cost: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
range.file_handle().file_id(),
index,
build_cost
);
}
let compat_batch = range.compat_batch();
let mut source = Source::PruneReader(reader);

View File

@@ -138,8 +138,13 @@ impl UnorderedScan {
let cache = stream_ctx.input.cache_manager.as_deref();
// Scans each part.
for part_range in part_ranges {
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
common_telemetry::debug!(
"Thread: {:?}, Unordered scan range start, region_id: {}, partition: {}, part_range: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
);
#[cfg(debug_assertions)]
let mut checker = crate::read::BatchChecker::default()
.with_start(Some(part_range.start))
@@ -150,6 +155,8 @@ impl UnorderedScan {
part_range.identifier,
part_metrics.clone(),
);
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
for await batch in stream {
let batch = batch.map_err(BoxedError::new).context(ExternalSnafu)?;
metrics.scan_cost += fetch_start.elapsed();
@@ -189,7 +196,16 @@ impl UnorderedScan {
}
let scan_cost = fetch_start.elapsed();
common_telemetry::debug!("Unordered scan range, region_id: {}, scan_cost: {:?}", stream_ctx.input.mapper.metadata().region_id, scan_cost);
common_telemetry::debug!(
"Thread: {:?}, Unordered scan range end, region_id: {}, partition: {}, part_range: {:?}, scan_cost: {:?}, yieid_cost: {:?}, num_rows: {}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
part_range,
scan_cost,
metrics.yield_cost,
metrics.num_rows,
);
metrics.scan_cost += scan_cost;
part_metrics.merge_metrics(&metrics);
}

View File

@@ -94,8 +94,13 @@ impl PartSortExec {
self.input.execute(partition, context.clone())?;
if partition >= self.partition_ranges.len() {
common_telemetry::error!(
"to_stream: Partition index out of range: {} >= {}",
partition,
self.partition_ranges.len()
);
internal_err!(
"Partition index out of range: {} >= {}",
"to_stream: Partition index out of range: {} >= {}",
partition,
self.partition_ranges.len()
)?;
@@ -275,8 +280,14 @@ impl PartSortStream {
min_max_idx: (usize, usize),
) -> datafusion_common::Result<()> {
if self.cur_part_idx >= self.partition_ranges.len() {
common_telemetry::error!(
"check_in_range: Partition index out of range: {} >= {}, ranges: {:?}",
self.cur_part_idx,
self.partition_ranges.len(),
self.partition_ranges,
);
internal_err!(
"Partition index out of range: {} >= {}",
"check_in_range: Partition index out of range: {} >= {}",
self.cur_part_idx,
self.partition_ranges.len()
)?;
@@ -308,8 +319,14 @@ impl PartSortStream {
// check if the current partition index is out of range
if self.cur_part_idx >= self.partition_ranges.len() {
common_telemetry::error!(
"try_find_next_range: Partition index out of range: {} >= {}, ranges: {:?}",
self.cur_part_idx,
self.partition_ranges.len(),
self.partition_ranges,
);
internal_err!(
"Partition index out of range: {} >= {}",
"try_find_next_range: Partition index out of range: {} >= {}",
self.cur_part_idx,
self.partition_ranges.len()
)?;