mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 20:32:56 +00:00
feat: collect reader metrics from prune reader (#5152)
This commit is contained in:
@@ -27,7 +27,7 @@ use crate::cache::{
|
||||
use crate::error::Result;
|
||||
use crate::read::{Batch, BatchReader, BoxedBatchReader};
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::parquet::reader::RowGroupReader;
|
||||
use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
|
||||
|
||||
/// Reader to keep the last row for each time series.
|
||||
/// It assumes that batches from the input reader are
|
||||
@@ -115,6 +115,14 @@ impl RowGroupLastRowCachedReader {
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the underlying reader metrics if uncached.
|
||||
pub(crate) fn metrics(&self) -> Option<&ReaderMetrics> {
|
||||
match self {
|
||||
RowGroupLastRowCachedReader::Hit(_) => None,
|
||||
RowGroupLastRowCachedReader::Miss(reader) => Some(reader.metrics()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates new Hit variant and updates metrics.
|
||||
fn new_hit(value: Arc<SelectorResultValue>) -> Self {
|
||||
selector_result_cache_hit();
|
||||
@@ -234,6 +242,10 @@ impl RowGroupLastRowReader {
|
||||
});
|
||||
cache.put_selector_result(self.key, value);
|
||||
}
|
||||
|
||||
fn metrics(&self) -> &ReaderMetrics {
|
||||
self.reader.metrics()
|
||||
}
|
||||
}
|
||||
|
||||
/// Push last row into `yielded_batches`.
|
||||
|
||||
@@ -72,11 +72,21 @@ impl PruneReader {
|
||||
self.source = source;
|
||||
}
|
||||
|
||||
pub(crate) fn metrics(&mut self) -> &ReaderMetrics {
|
||||
/// Merge metrics with the inner reader and return the merged metrics.
|
||||
pub(crate) fn metrics(&self) -> ReaderMetrics {
|
||||
let mut metrics = self.metrics.clone();
|
||||
match &self.source {
|
||||
Source::RowGroup(r) => r.metrics(),
|
||||
Source::LastRow(_) => &self.metrics,
|
||||
Source::RowGroup(r) => {
|
||||
metrics.merge_from(r.metrics());
|
||||
}
|
||||
Source::LastRow(r) => {
|
||||
if let Some(inner_metrics) = r.metrics() {
|
||||
metrics.merge_from(inner_metrics);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
metrics
|
||||
}
|
||||
|
||||
pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
|
||||
@@ -181,8 +181,9 @@ pub(crate) fn scan_file_ranges(
|
||||
}
|
||||
yield batch;
|
||||
}
|
||||
if let Source::PruneReader(mut reader) = source {
|
||||
reader_metrics.merge_from(reader.metrics());
|
||||
if let Source::PruneReader(reader) = source {
|
||||
let prune_metrics = reader.metrics();
|
||||
reader_metrics.merge_from(&prune_metrics);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -918,10 +918,10 @@ enum ReaderState {
|
||||
|
||||
impl ReaderState {
|
||||
/// Returns the metrics of the reader.
|
||||
fn metrics(&mut self) -> &ReaderMetrics {
|
||||
fn metrics(&self) -> ReaderMetrics {
|
||||
match self {
|
||||
ReaderState::Readable(reader) => reader.metrics(),
|
||||
ReaderState::Exhausted(m) => m,
|
||||
ReaderState::Exhausted(m) => m.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user