chore: more logs

This commit is contained in:
evenyag
2024-11-05 20:25:05 +08:00
parent c391171f99
commit a1d0dcf2c3
3 changed files with 22 additions and 6 deletions

View File

@@ -88,6 +88,10 @@ impl PartitionMetrics {
Self(Arc::new(Mutex::new(inner)))
}
pub(crate) fn partition(&self) -> usize {
self.0.lock().unwrap().partition
}
pub(crate) fn on_first_poll(&self) {
let mut inner = self.0.lock().unwrap();
inner.first_poll = inner.query_start.elapsed();
@@ -126,6 +130,7 @@ impl PartitionMetrics {
/// Scans memtable ranges at `index`.
pub(crate) fn scan_mem_ranges(
partition: usize,
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
@@ -140,9 +145,10 @@ pub(crate) fn scan_mem_ranges(
let build_cost = build_reader_start.elapsed();
part_metrics.inc_build_reader_cost(build_cost);
common_telemetry::debug!(
"Thread: {:?}, Scan mem range, region_id: {}, time_range: {:?}, index: {:?}, build_cost: {:?}",
"Thread: {:?}, Scan mem range, region_id: {}, partition: {}, time_range: {:?}, index: {:?}, build_cost: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
time_range,
index,
build_cost
@@ -158,6 +164,7 @@ pub(crate) fn scan_mem_ranges(
/// Scans file ranges at `index`.
pub(crate) fn scan_file_ranges(
partition: usize,
stream_ctx: Arc<StreamContext>,
part_metrics: PartitionMetrics,
index: RowGroupIndex,
@@ -176,9 +183,10 @@ pub(crate) fn scan_file_ranges(
part_metrics.inc_build_reader_cost(build_cost);
if read_type == "unordered_scan_files" {
common_telemetry::debug!(
"Thread: {:?}, Scan file range, region_id: {}, file_id: {}, index: {:?}, build_cost: {:?}",
"Thread: {:?}, Scan file range, region_id: {}, partition: {}, file_id: {}, index: {:?}, build_cost: {:?}",
std::thread::current().id(),
stream_ctx.input.mapper.metadata().region_id,
partition,
range.file_handle().file_id(),
index,
build_cost

View File

@@ -361,6 +361,7 @@ fn build_sources(
for index in &range_meta.row_group_indices {
let stream = if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(
part_metrics.partition(),
stream_ctx.clone(),
part_metrics.clone(),
*index,
@@ -373,8 +374,13 @@ fn build_sources(
} else {
"seq_scan_files"
};
let stream =
scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, read_type);
let stream = scan_file_ranges(
part_metrics.partition(),
stream_ctx.clone(),
part_metrics.clone(),
*index,
read_type,
);
Box::pin(stream) as _
};
sources.push(Source::Stream(stream));

View File

@@ -81,6 +81,7 @@ impl UnorderedScan {
/// Scans a [PartitionRange] by its `identifier` and returns a stream.
fn scan_partition_range(
partition: usize,
stream_ctx: Arc<StreamContext>,
part_range_id: usize,
part_metrics: PartitionMetrics,
@@ -90,12 +91,12 @@ impl UnorderedScan {
let range_meta = &stream_ctx.ranges[part_range_id];
for index in &range_meta.row_group_indices {
if stream_ctx.is_mem_range_index(*index) {
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
let stream = scan_mem_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
for await batch in stream {
yield batch;
}
} else {
let stream = scan_file_ranges(stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files");
let stream = scan_file_ranges(partition, stream_ctx.clone(), part_metrics.clone(), *index, "unordered_scan_files");
for await batch in stream {
yield batch;
}
@@ -160,6 +161,7 @@ impl UnorderedScan {
.with_end(Some(part_range.end));
let stream = Self::scan_partition_range(
partition,
stream_ctx.clone(),
part_range.identifier,
part_metrics.clone(),