mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-05 12:52:57 +00:00
feat: divide build_cost to build_part_cost and build_reader_cost
Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -1137,6 +1137,12 @@ impl ScanInput {
|
||||
self.files.len()
|
||||
}
|
||||
|
||||
/// Gets the file handle from a row group index.
|
||||
pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
|
||||
let file_index = index.index - self.num_memtables();
|
||||
&self.files[file_index]
|
||||
}
|
||||
|
||||
pub fn region_metadata(&self) -> &RegionMetadataRef {
|
||||
self.mapper.metadata()
|
||||
}
|
||||
|
||||
@@ -52,35 +52,53 @@ use crate::sst::parquet::flat_format::time_index_column_index;
|
||||
use crate::sst::parquet::reader::{MetadataCacheMetrics, ReaderFilterMetrics, ReaderMetrics};
|
||||
use crate::sst::parquet::row_group::ParquetFetchMetrics;
|
||||
|
||||
/// Per-file scan metrics tracked when explain_verbose is enabled.
|
||||
/// Per-file scan metrics.
|
||||
#[derive(Default, Clone)]
|
||||
pub(crate) struct FileMetrics {
|
||||
pub struct FileScanMetrics {
|
||||
/// Number of ranges (row groups) read from this file.
|
||||
pub(crate) num_ranges: usize,
|
||||
pub num_ranges: usize,
|
||||
/// Number of rows read from this file.
|
||||
pub(crate) num_rows: usize,
|
||||
pub num_rows: usize,
|
||||
/// Time spent building file ranges/parts (file-level preparation).
|
||||
pub build_part_cost: Duration,
|
||||
/// Time spent building readers for this file (accumulated across all ranges).
|
||||
pub(crate) build_cost: Duration,
|
||||
pub build_reader_cost: Duration,
|
||||
/// Time spent scanning this file (accumulated across all ranges).
|
||||
pub(crate) scan_cost: Duration,
|
||||
pub scan_cost: Duration,
|
||||
}
|
||||
|
||||
impl fmt::Debug for FileMetrics {
|
||||
impl fmt::Debug for FileScanMetrics {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{{\"num_ranges\":{}, \"num_rows\":{}, \"build_cost\":\"{:?}\", \"scan_cost\":\"{:?}\"}}",
|
||||
self.num_ranges, self.num_rows, self.build_cost, self.scan_cost
|
||||
)
|
||||
write!(f, "{{\"build_part_cost\":\"{:?}\"", self.build_part_cost)?;
|
||||
|
||||
if self.num_ranges > 0 {
|
||||
write!(f, ", \"num_ranges\":{}", self.num_ranges)?;
|
||||
}
|
||||
if self.num_rows > 0 {
|
||||
write!(f, ", \"num_rows\":{}", self.num_rows)?;
|
||||
}
|
||||
if !self.build_reader_cost.is_zero() {
|
||||
write!(
|
||||
f,
|
||||
", \"build_reader_cost\":\"{:?}\"",
|
||||
self.build_reader_cost
|
||||
)?;
|
||||
}
|
||||
if !self.scan_cost.is_zero() {
|
||||
write!(f, ", \"scan_cost\":\"{:?}\"", self.scan_cost)?;
|
||||
}
|
||||
|
||||
write!(f, "}}")
|
||||
}
|
||||
}
|
||||
|
||||
impl FileMetrics {
|
||||
impl FileScanMetrics {
|
||||
/// Merges another FileMetrics into this one.
|
||||
pub(crate) fn merge_from(&mut self, other: &FileMetrics) {
|
||||
pub(crate) fn merge_from(&mut self, other: &FileScanMetrics) {
|
||||
self.num_ranges += other.num_ranges;
|
||||
self.num_rows += other.num_rows;
|
||||
self.build_cost += other.build_cost;
|
||||
self.build_part_cost += other.build_part_cost;
|
||||
self.build_reader_cost += other.build_reader_cost;
|
||||
self.scan_cost += other.scan_cost;
|
||||
}
|
||||
}
|
||||
@@ -185,7 +203,7 @@ pub(crate) struct ScanMetricsSet {
|
||||
/// Metadata cache metrics.
|
||||
metadata_cache_metrics: Option<MetadataCacheMetrics>,
|
||||
/// Per-file scan metrics, only populated when explain_verbose is true.
|
||||
per_file_metrics: Option<HashMap<RegionFileId, FileMetrics>>,
|
||||
per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
|
||||
}
|
||||
|
||||
impl fmt::Debug for ScanMetricsSet {
|
||||
@@ -483,12 +501,12 @@ impl ScanMetricsSet {
|
||||
}
|
||||
|
||||
/// Merges per-file metrics.
|
||||
fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileMetrics>) {
|
||||
fn merge_per_file_metrics(&mut self, other: &HashMap<RegionFileId, FileScanMetrics>) {
|
||||
let self_file_metrics = self.per_file_metrics.get_or_insert_with(HashMap::new);
|
||||
for (file_id, metrics) in other {
|
||||
self_file_metrics
|
||||
.entry(*file_id)
|
||||
.or_insert_with(FileMetrics::default)
|
||||
.or_default()
|
||||
.merge_from(metrics);
|
||||
}
|
||||
}
|
||||
@@ -786,7 +804,7 @@ impl PartitionMetrics {
|
||||
pub fn merge_reader_metrics(
|
||||
&self,
|
||||
metrics: &ReaderMetrics,
|
||||
per_file_metrics: Option<&HashMap<RegionFileId, FileMetrics>>,
|
||||
per_file_metrics: Option<&HashMap<RegionFileId, FileScanMetrics>>,
|
||||
) {
|
||||
self.0.build_parts_cost.add_duration(metrics.build_cost);
|
||||
|
||||
@@ -1010,11 +1028,30 @@ pub(crate) async fn scan_file_ranges(
|
||||
part_metrics.inc_num_file_ranges(ranges.len());
|
||||
part_metrics.merge_reader_metrics(&reader_metrics, None);
|
||||
|
||||
// Creates initial per-file metrics with build_part_cost.
|
||||
let init_per_file_metrics = if part_metrics.explain_verbose() {
|
||||
let file = stream_ctx.input.file_from_index(index);
|
||||
let file_id = file.file_id();
|
||||
|
||||
let mut map = HashMap::new();
|
||||
map.insert(
|
||||
file_id,
|
||||
FileScanMetrics {
|
||||
build_part_cost: reader_metrics.build_cost,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
Some(map)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(build_file_range_scan_stream(
|
||||
stream_ctx,
|
||||
part_metrics,
|
||||
read_type,
|
||||
ranges,
|
||||
init_per_file_metrics,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -1036,11 +1073,30 @@ pub(crate) async fn scan_flat_file_ranges(
|
||||
part_metrics.inc_num_file_ranges(ranges.len());
|
||||
part_metrics.merge_reader_metrics(&reader_metrics, None);
|
||||
|
||||
// Creates initial per-file metrics with build_part_cost.
|
||||
let init_per_file_metrics = if part_metrics.explain_verbose() {
|
||||
let file = stream_ctx.input.file_from_index(index);
|
||||
let file_id = file.file_id();
|
||||
|
||||
let mut map = HashMap::new();
|
||||
map.insert(
|
||||
file_id,
|
||||
FileScanMetrics {
|
||||
build_part_cost: reader_metrics.build_cost,
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
Some(map)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Ok(build_flat_file_range_scan_stream(
|
||||
stream_ctx,
|
||||
part_metrics,
|
||||
read_type,
|
||||
ranges,
|
||||
init_per_file_metrics,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -1050,6 +1106,7 @@ pub fn build_file_range_scan_stream(
|
||||
part_metrics: PartitionMetrics,
|
||||
read_type: &'static str,
|
||||
ranges: SmallVec<[FileRange; 2]>,
|
||||
mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
|
||||
) -> impl Stream<Item = Result<Batch>> {
|
||||
try_stream! {
|
||||
let fetch_metrics = if part_metrics.explain_verbose() {
|
||||
@@ -1057,11 +1114,6 @@ pub fn build_file_range_scan_stream(
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut per_file_metrics = if part_metrics.explain_verbose() {
|
||||
Some(HashMap::<RegionFileId, FileMetrics>::new())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let reader_metrics = &mut ReaderMetrics {
|
||||
fetch_metrics: fetch_metrics.clone(),
|
||||
..Default::default()
|
||||
@@ -1087,11 +1139,11 @@ pub fn build_file_range_scan_stream(
|
||||
let file_id = range.file_handle().file_id();
|
||||
let file_metrics = file_metrics_map
|
||||
.entry(file_id)
|
||||
.or_insert_with(FileMetrics::default);
|
||||
.or_insert_with(FileScanMetrics::default);
|
||||
|
||||
file_metrics.num_ranges += 1;
|
||||
file_metrics.num_rows += prune_metrics.num_rows;
|
||||
file_metrics.build_cost += build_cost;
|
||||
file_metrics.build_reader_cost += build_cost;
|
||||
file_metrics.scan_cost += prune_metrics.scan_cost;
|
||||
}
|
||||
|
||||
@@ -1112,6 +1164,7 @@ pub fn build_flat_file_range_scan_stream(
|
||||
part_metrics: PartitionMetrics,
|
||||
read_type: &'static str,
|
||||
ranges: SmallVec<[FileRange; 2]>,
|
||||
mut per_file_metrics: Option<HashMap<RegionFileId, FileScanMetrics>>,
|
||||
) -> impl Stream<Item = Result<RecordBatch>> {
|
||||
try_stream! {
|
||||
let fetch_metrics = if part_metrics.explain_verbose() {
|
||||
@@ -1119,11 +1172,6 @@ pub fn build_flat_file_range_scan_stream(
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let mut per_file_metrics = if part_metrics.explain_verbose() {
|
||||
Some(HashMap::<RegionFileId, FileMetrics>::new())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let reader_metrics = &mut ReaderMetrics {
|
||||
fetch_metrics: fetch_metrics.clone(),
|
||||
..Default::default()
|
||||
@@ -1158,11 +1206,11 @@ pub fn build_flat_file_range_scan_stream(
|
||||
let file_id = range.file_handle().file_id();
|
||||
let file_metrics = file_metrics_map
|
||||
.entry(file_id)
|
||||
.or_insert_with(FileMetrics::default);
|
||||
.or_insert_with(FileScanMetrics::default);
|
||||
|
||||
file_metrics.num_ranges += 1;
|
||||
file_metrics.num_rows += prune_metrics.num_rows;
|
||||
file_metrics.build_cost += build_cost;
|
||||
file_metrics.build_reader_cost += build_cost;
|
||||
file_metrics.scan_cost += prune_metrics.scan_cost;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user