chore: more logs

This commit is contained in:
evenyag
2024-11-06 15:25:49 +08:00
parent 7c5cd2922a
commit 8137b8ff3d

View File

@@ -394,6 +394,15 @@ impl PartSortStream {
if self.buffer.is_empty() {
return Ok(DfRecordBatch::new_empty(self.schema.clone()));
}
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} sort {} rows",
self.region_id,
self.partition,
self.cur_part_idx,
self.buffer.iter().map(|b| b.len()).sum::<usize>(),
);
let mut sort_columns = Vec::with_capacity(self.buffer.len());
let mut opt = None;
for batch in self.buffer.iter() {
@@ -492,6 +501,13 @@ impl PartSortStream {
let next_range_idx = self.try_find_next_range(&sort_column)?;
let Some(idx) = next_range_idx else {
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} push {} rows",
self.region_id,
self.partition,
self.cur_part_idx,
batch.num_rows(),
);
self.buffer.push(batch);
// keep polling input for next batch
return Ok(None);
@@ -500,6 +516,13 @@ impl PartSortStream {
let this_range = batch.slice(0, idx);
let remaining_range = batch.slice(idx, batch.num_rows() - idx);
if this_range.num_rows() != 0 {
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} push {} rows",
self.region_id,
self.partition,
self.cur_part_idx,
batch.num_rows(),
);
self.buffer.push(this_range);
}
// mark end of current PartitionRange
@@ -515,6 +538,13 @@ impl PartSortStream {
// remaining batch is within the current partition range
// push to the buffer and continue polling
if remaining_range.num_rows() != 0 {
common_telemetry::info!(
"[PartSortStream] Region {} Partition {} part index {} push {} rows",
self.region_id,
self.partition,
self.cur_part_idx,
batch.num_rows(),
);
self.buffer.push(remaining_range);
}
}