feat: add per-partition convert, result cache metrics (#7539)

* fix: show convert cost in explain analyze verbose

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: increase puffin metadata cache metric

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: add result cache hit/miss to filter metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: print flat format in debug

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update sqlness test

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: make scan cost contains part/reader build cost

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect divider cost

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove unused field in ScannerMetrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: collect metadata read bytes

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: collect read metrics in get_parquet_meta_data

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-01-13 17:17:09 +08:00
committed by GitHub
parent 279908984d
commit 4b3bd7317b
17 changed files with 234 additions and 42 deletions

View File

@@ -350,7 +350,10 @@ impl CacheManager {
// Try to get metadata from write cache
let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
if let Some(write_cache) = &self.write_cache
&& let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
&& let Some(metadata) = write_cache
.file_cache()
.get_parquet_meta_data(key, metrics)
.await
{
metrics.file_cache_hit += 1;
let metadata = Arc::new(metadata);

View File

@@ -43,6 +43,7 @@ use crate::metrics::{
use crate::region::opener::RegionLoadCacheTask;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::metadata::MetadataLoader;
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// Subdirectory of cached files for write.
///
@@ -566,7 +567,11 @@ impl FileCache {
/// Get the parquet metadata in file cache.
/// If the file is not in the cache or fail to load metadata, return None.
pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
pub(crate) async fn get_parquet_meta_data(
&self,
key: IndexKey,
cache_metrics: &mut MetadataCacheMetrics,
) -> Option<ParquetMetaData> {
// Check if file cache contains the key
if let Some(index_value) = self.inner.parquet_index.get(&key).await {
// Load metadata from file cache
@@ -575,7 +580,7 @@ impl FileCache {
let file_size = index_value.file_size as u64;
let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
match metadata_loader.load().await {
match metadata_loader.load(cache_metrics).await {
Ok(metadata) => {
CACHE_HIT
.with_label_values(&[key.file_type.metric_label()])

View File

@@ -1058,10 +1058,6 @@ impl<T: BatchReader + ?Sized> BatchReader for Box<T> {
/// Local metrics for scanners.
#[derive(Debug, Default)]
pub(crate) struct ScannerMetrics {
/// Duration to prepare the scan task.
prepare_scan_cost: Duration,
/// Duration to build the (merge) reader.
build_reader_cost: Duration,
/// Duration to scan data.
scan_cost: Duration,
/// Duration while waiting for `yield`.
@@ -1070,10 +1066,6 @@ pub(crate) struct ScannerMetrics {
num_batches: usize,
/// Number of rows returned.
num_rows: usize,
/// Number of mem ranges scanned.
num_mem_ranges: usize,
/// Number of file ranges scanned.
num_file_ranges: usize,
}
#[cfg(test)]

View File

@@ -1498,6 +1498,7 @@ impl StreamContext {
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
.finish()?;
}
write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
#[cfg(feature = "enterprise")]
self.format_extension_ranges(f)?;

View File

@@ -115,6 +115,8 @@ pub(crate) struct ScanMetricsSet {
scan_cost: Duration,
/// Duration while waiting for `yield`.
yield_cost: Duration,
/// Duration to convert [`Batch`]es.
convert_cost: Option<Time>,
/// Duration of the scan.
total_cost: Duration,
/// Number of rows returned.
@@ -165,6 +167,18 @@ pub(crate) struct ScanMetricsSet {
rows_vector_filtered: usize,
/// Number of rows filtered by precise filter.
rows_precise_filtered: usize,
/// Number of index result cache hits for fulltext index.
fulltext_index_cache_hit: usize,
/// Number of index result cache misses for fulltext index.
fulltext_index_cache_miss: usize,
/// Number of index result cache hits for inverted index.
inverted_index_cache_hit: usize,
/// Number of index result cache misses for inverted index.
inverted_index_cache_miss: usize,
/// Number of index result cache hits for bloom filter index.
bloom_filter_cache_hit: usize,
/// Number of index result cache misses for bloom filter index.
bloom_filter_cache_miss: usize,
/// Number of record batches read from SST.
num_sst_record_batches: usize,
/// Number of batches decoded from SST.
@@ -187,6 +201,8 @@ pub(crate) struct ScanMetricsSet {
distributor_scan_cost: Duration,
/// Duration of the series distributor to yield.
distributor_yield_cost: Duration,
/// Duration spent in divider operations.
distributor_divider_cost: Duration,
/// Merge metrics.
merge_metrics: MergeMetrics,
@@ -247,6 +263,7 @@ impl fmt::Debug for ScanMetricsSet {
build_reader_cost,
scan_cost,
yield_cost,
convert_cost,
total_cost,
num_rows,
num_batches,
@@ -266,6 +283,12 @@ impl fmt::Debug for ScanMetricsSet {
rows_bloom_filtered,
rows_vector_filtered,
rows_precise_filtered,
fulltext_index_cache_hit,
fulltext_index_cache_miss,
inverted_index_cache_hit,
inverted_index_cache_miss,
bloom_filter_cache_hit,
bloom_filter_cache_miss,
num_sst_record_batches,
num_sst_batches,
num_sst_rows,
@@ -276,6 +299,7 @@ impl fmt::Debug for ScanMetricsSet {
num_distributor_batches,
distributor_scan_cost,
distributor_yield_cost,
distributor_divider_cost,
merge_metrics,
dedup_metrics,
stream_eof,
@@ -313,6 +337,12 @@ impl fmt::Debug for ScanMetricsSet {
\"first_poll\":\"{first_poll:?}\""
)?;
// Write convert_cost if present
if let Some(time) = convert_cost {
let duration = Duration::from_nanos(time.value() as u64);
write!(f, ", \"convert_cost\":\"{duration:?}\"")?;
}
// Write non-zero filter counters
if *rg_fulltext_filtered > 0 {
write!(f, ", \"rg_fulltext_filtered\":{rg_fulltext_filtered}")?;
@@ -344,6 +374,36 @@ impl fmt::Debug for ScanMetricsSet {
if *rows_precise_filtered > 0 {
write!(f, ", \"rows_precise_filtered\":{rows_precise_filtered}")?;
}
if *fulltext_index_cache_hit > 0 {
write!(
f,
", \"fulltext_index_cache_hit\":{fulltext_index_cache_hit}"
)?;
}
if *fulltext_index_cache_miss > 0 {
write!(
f,
", \"fulltext_index_cache_miss\":{fulltext_index_cache_miss}"
)?;
}
if *inverted_index_cache_hit > 0 {
write!(
f,
", \"inverted_index_cache_hit\":{inverted_index_cache_hit}"
)?;
}
if *inverted_index_cache_miss > 0 {
write!(
f,
", \"inverted_index_cache_miss\":{inverted_index_cache_miss}"
)?;
}
if *bloom_filter_cache_hit > 0 {
write!(f, ", \"bloom_filter_cache_hit\":{bloom_filter_cache_hit}")?;
}
if *bloom_filter_cache_miss > 0 {
write!(f, ", \"bloom_filter_cache_miss\":{bloom_filter_cache_miss}")?;
}
// Write non-zero distributor metrics
if *num_series_send_timeout > 0 {
@@ -370,6 +430,12 @@ impl fmt::Debug for ScanMetricsSet {
", \"distributor_yield_cost\":\"{distributor_yield_cost:?}\""
)?;
}
if !distributor_divider_cost.is_zero() {
write!(
f,
", \"distributor_divider_cost\":\"{distributor_divider_cost:?}\""
)?;
}
// Write non-zero memtable metrics
if *mem_rows > 0 {
@@ -478,27 +544,25 @@ impl ScanMetricsSet {
self
}
/// Attaches the `convert_cost` to the metrics set.
fn with_convert_cost(mut self, time: Time) -> Self {
self.convert_cost = Some(time);
self
}
/// Merges the local scanner metrics.
fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
let ScannerMetrics {
prepare_scan_cost,
build_reader_cost,
scan_cost,
yield_cost,
num_batches,
num_rows,
num_mem_ranges,
num_file_ranges,
} = other;
self.prepare_scan_cost += *prepare_scan_cost;
self.build_reader_cost += *build_reader_cost;
self.scan_cost += *scan_cost;
self.yield_cost += *yield_cost;
self.num_rows += *num_rows;
self.num_batches += *num_batches;
self.num_mem_ranges += *num_mem_ranges;
self.num_file_ranges += *num_file_ranges;
}
/// Merges the local reader metrics.
@@ -519,6 +583,12 @@ impl ScanMetricsSet {
rows_bloom_filtered,
rows_vector_filtered,
rows_precise_filtered,
fulltext_index_cache_hit,
fulltext_index_cache_miss,
inverted_index_cache_hit,
inverted_index_cache_miss,
bloom_filter_cache_hit,
bloom_filter_cache_miss,
inverted_index_apply_metrics,
bloom_filter_apply_metrics,
fulltext_index_apply_metrics,
@@ -548,6 +618,13 @@ impl ScanMetricsSet {
self.rows_vector_filtered += *rows_vector_filtered;
self.rows_precise_filtered += *rows_precise_filtered;
self.fulltext_index_cache_hit += *fulltext_index_cache_hit;
self.fulltext_index_cache_miss += *fulltext_index_cache_miss;
self.inverted_index_cache_hit += *inverted_index_cache_hit;
self.inverted_index_cache_miss += *inverted_index_cache_miss;
self.bloom_filter_cache_hit += *bloom_filter_cache_hit;
self.bloom_filter_cache_miss += *bloom_filter_cache_miss;
self.num_sst_record_batches += *num_record_batches;
self.num_sst_batches += *num_batches;
self.num_sst_rows += *num_rows;
@@ -598,6 +675,7 @@ impl ScanMetricsSet {
num_batches,
scan_cost,
yield_cost,
divider_cost,
} = distributor_metrics;
self.num_series_send_timeout += *num_series_send_timeout;
@@ -606,6 +684,7 @@ impl ScanMetricsSet {
self.num_distributor_batches += *num_batches;
self.distributor_scan_cost += *scan_cost;
self.distributor_yield_cost += *yield_cost;
self.distributor_divider_cost += *divider_cost;
}
/// Observes metrics.
@@ -622,6 +701,11 @@ impl ScanMetricsSet {
READ_STAGE_ELAPSED
.with_label_values(&["yield"])
.observe(self.yield_cost.as_secs_f64());
if let Some(time) = &self.convert_cost {
READ_STAGE_ELAPSED
.with_label_values(&["convert"])
.observe(Duration::from_nanos(time.value() as u64).as_secs_f64());
}
READ_STAGE_ELAPSED
.with_label_values(&["total"])
.observe(self.total_cost.as_secs_f64());
@@ -746,21 +830,19 @@ impl Drop for PartitionMetricsInner {
if self.explain_verbose {
common_telemetry::info!(
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
self.scanner_type,
self.region_id,
self.partition,
metrics,
self.convert_cost,
);
} else {
common_telemetry::debug!(
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}, convert_batch_costs: {}",
"{} finished, region_id: {}, partition: {}, scan_metrics: {:?}",
self.scanner_type,
self.region_id,
self.partition,
metrics,
self.convert_cost,
);
}
}
@@ -807,7 +889,10 @@ impl PartitionMetrics {
let partition_str = partition.to_string();
let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
in_progress_scan.inc();
let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
let convert_cost = MetricBuilder::new(metrics_set).subset_time("convert_cost", partition);
let metrics = ScanMetricsSet::default()
.with_prepare_scan_cost(query_start.elapsed())
.with_convert_cost(convert_cost.clone());
let inner = PartitionMetricsInner {
region_id,
partition,
@@ -822,7 +907,7 @@ impl PartitionMetrics {
.subset_time("build_reader_cost", partition),
scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
convert_cost: MetricBuilder::new(metrics_set).subset_time("convert_cost", partition),
convert_cost,
elapsed_compute: MetricBuilder::new(metrics_set).elapsed_compute(partition),
};
Self(Arc::new(inner))
@@ -874,9 +959,6 @@ impl PartitionMetrics {
/// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
self.0
.build_reader_cost
.add_duration(metrics.build_reader_cost);
self.0.scan_cost.add_duration(metrics.scan_cost);
self.record_elapsed_compute(metrics.scan_cost);
self.0.yield_cost.add_duration(metrics.yield_cost);
@@ -956,6 +1038,8 @@ pub(crate) struct SeriesDistributorMetrics {
pub(crate) scan_cost: Duration,
/// Duration of the series distributor to yield.
pub(crate) yield_cost: Duration,
/// Duration spent in divider operations.
pub(crate) divider_cost: Duration,
}
/// Scans memtable ranges at `index`.

View File

@@ -410,6 +410,9 @@ impl SeqScan {
let stream = try_stream! {
part_metrics.on_first_poll();
// Start fetch time before building sources so scan cost contains
// build part cost.
let mut fetch_start = Instant::now();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
@@ -431,8 +434,6 @@ impl SeqScan {
file_scan_semaphore.clone(),
).await?;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let mut reader =
Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
.await?;
@@ -441,6 +442,12 @@ impl SeqScan {
.with_start(Some(part_range.start))
.with_end(Some(part_range.end));
let mut metrics = ScannerMetrics {
scan_cost: fetch_start.elapsed(),
..Default::default()
};
fetch_start = Instant::now();
while let Some(batch) = reader.next_batch().await? {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
@@ -448,6 +455,7 @@ impl SeqScan {
debug_assert!(!batch.is_empty());
if batch.is_empty() {
fetch_start = Instant::now();
continue;
}
@@ -476,6 +484,7 @@ impl SeqScan {
}
metrics.scan_cost += fetch_start.elapsed();
fetch_start = Instant::now();
part_metrics.merge_metrics(&metrics);
}
@@ -516,6 +525,9 @@ impl SeqScan {
let stream = try_stream! {
part_metrics.on_first_poll();
// Start fetch time before building sources so scan cost contains
// build part cost.
let mut fetch_start = Instant::now();
let range_builder_list = Arc::new(RangeBuilderList::new(
stream_ctx.input.num_memtables(),
@@ -534,12 +546,16 @@ impl SeqScan {
file_scan_semaphore.clone(),
).await?;
let mut metrics = ScannerMetrics::default();
let mut fetch_start = Instant::now();
let mut reader =
Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
.await?;
let mut metrics = ScannerMetrics {
scan_cost: fetch_start.elapsed(),
..Default::default()
};
fetch_start = Instant::now();
while let Some(record_batch) = reader.try_next().await? {
metrics.scan_cost += fetch_start.elapsed();
metrics.num_batches += 1;
@@ -547,6 +563,7 @@ impl SeqScan {
debug_assert!(record_batch.num_rows() > 0);
if record_batch.num_rows() == 0 {
fetch_start = Instant::now();
continue;
}
@@ -558,6 +575,7 @@ impl SeqScan {
}
metrics.scan_cost += fetch_start.elapsed();
fetch_start = Instant::now();
part_metrics.merge_metrics(&metrics);
}

View File

@@ -446,6 +446,9 @@ impl SeriesDistributor {
&self.metrics_list,
);
part_metrics.on_first_poll();
// Start fetch time before building sources so scan cost contains
// build part cost.
let mut fetch_start = Instant::now();
let range_builder_list = Arc::new(RangeBuilderList::new(
self.stream_ctx.input.num_memtables(),
@@ -478,7 +481,6 @@ impl SeriesDistributor {
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();
let mut fetch_start = Instant::now();
let mut divider = FlatSeriesBatchDivider::default();
while let Some(record_batch) = reader.try_next().await? {
@@ -493,7 +495,10 @@ impl SeriesDistributor {
}
// Use divider to split series
if let Some(series_batch) = divider.push(record_batch) {
let divider_start = Instant::now();
let series_batch = divider.push(record_batch);
metrics.divider_cost += divider_start.elapsed();
if let Some(series_batch) = series_batch {
let yield_start = Instant::now();
self.senders
.send_batch(SeriesBatch::Flat(series_batch))
@@ -504,7 +509,10 @@ impl SeriesDistributor {
}
// Send any remaining batch in the divider
if let Some(series_batch) = divider.finish() {
let divider_start = Instant::now();
let series_batch = divider.finish();
metrics.divider_cost += divider_start.elapsed();
if let Some(series_batch) = series_batch {
let yield_start = Instant::now();
self.senders
.send_batch(SeriesBatch::Flat(series_batch))
@@ -536,6 +544,9 @@ impl SeriesDistributor {
&self.metrics_list,
);
part_metrics.on_first_poll();
// Start fetch time before building sources so scan cost contains
// build part cost.
let mut fetch_start = Instant::now();
let range_builder_list = Arc::new(RangeBuilderList::new(
self.stream_ctx.input.num_memtables(),
@@ -568,7 +579,6 @@ impl SeriesDistributor {
)
.await?;
let mut metrics = SeriesDistributorMetrics::default();
let mut fetch_start = Instant::now();
let mut current_series = PrimaryKeySeriesBatch::default();
while let Some(batch) = reader.next_batch().await? {

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::result::Result as StdResult;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use bytes::Bytes;
use futures::FutureExt;
@@ -24,6 +25,7 @@ use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
use snafu::{IntoError as _, ResultExt};
use crate::error::{self, Result};
use crate::sst::parquet::reader::MetadataCacheMetrics;
/// The estimated size of the footer and metadata need to read from the end of parquet file.
const DEFAULT_PREFETCH_SIZE: u64 = 64 * 1024;
@@ -65,24 +67,33 @@ impl<'a> MetadataLoader<'a> {
Ok(file_size)
}
pub async fn load(&self) -> Result<ParquetMetaData> {
pub async fn load(&self, cache_metrics: &mut MetadataCacheMetrics) -> Result<ParquetMetaData> {
let path = self.file_path;
let file_size = self.get_file_size().await?;
let reader =
ParquetMetaDataReader::new().with_prefetch_hint(Some(DEFAULT_PREFETCH_SIZE as usize));
let num_reads = AtomicUsize::new(0);
let bytes_read = AtomicU64::new(0);
let fetch = ObjectStoreFetch {
object_store: &self.object_store,
file_path: self.file_path,
num_reads: &num_reads,
bytes_read: &bytes_read,
};
reader
let metadata = reader
.load_and_finish(fetch, file_size)
.await
.map_err(|e| match unbox_external_error(e) {
Ok(os_err) => error::OpenDalSnafu {}.into_error(os_err),
Err(parquet_err) => error::ReadParquetSnafu { path }.into_error(parquet_err),
})
})?;
cache_metrics.num_reads = num_reads.into_inner();
cache_metrics.bytes_read = bytes_read.into_inner();
Ok(metadata)
}
}
@@ -100,10 +111,13 @@ fn unbox_external_error(e: ParquetError) -> StdResult<object_store::Error, Parqu
struct ObjectStoreFetch<'a> {
object_store: &'a ObjectStore,
file_path: &'a str,
num_reads: &'a AtomicUsize,
bytes_read: &'a AtomicU64,
}
impl MetadataFetch for ObjectStoreFetch<'_> {
fn fetch(&mut self, range: std::ops::Range<u64>) -> BoxFuture<'_, ParquetResult<Bytes>> {
let bytes_to_read = range.end - range.start;
async move {
let data = self
.object_store
@@ -111,6 +125,8 @@ impl MetadataFetch for ObjectStoreFetch<'_> {
.range(range)
.await
.map_err(|e| ParquetError::External(Box::new(e)))?;
self.num_reads.fetch_add(1, Ordering::Relaxed);
self.bytes_read.fetch_add(bytes_to_read, Ordering::Relaxed);
Ok(data.to_bytes())
}
.boxed()

View File

@@ -498,7 +498,7 @@ impl ParquetReaderBuilder {
// Cache miss, load metadata directly.
let metadata_loader = MetadataLoader::new(self.object_store.clone(), file_path, file_size);
let metadata = metadata_loader.load().await?;
let metadata = metadata_loader.load(cache_metrics).await?;
let metadata = Arc::new(metadata);
// Cache the metadata.
@@ -649,11 +649,13 @@ impl ParquetReaderBuilder {
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
metrics.fulltext_index_cache_hit += 1;
pruned = true;
continue;
}
// Slow path: apply the index from the file.
metrics.fulltext_index_cache_miss += 1;
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply_fine(
@@ -721,11 +723,13 @@ impl ParquetReaderBuilder {
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_INVERTED);
metrics.inverted_index_cache_hit += 1;
pruned = true;
continue;
}
// Slow path: apply the index from the file.
metrics.inverted_index_cache_miss += 1;
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let apply_res = index_applier
.apply(
@@ -789,11 +793,13 @@ impl ParquetReaderBuilder {
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_BLOOM);
metrics.bloom_filter_cache_hit += 1;
pruned = true;
continue;
}
// Slow path: apply the index from the file.
metrics.bloom_filter_cache_miss += 1;
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
@@ -914,11 +920,13 @@ impl ParquetReaderBuilder {
&& all_required_row_groups_searched(output, result)
{
apply_selection_and_update_metrics(output, result, metrics, INDEX_TYPE_FULLTEXT);
metrics.fulltext_index_cache_hit += 1;
pruned = true;
continue;
}
// Slow path: apply the index from the file.
metrics.fulltext_index_cache_miss += 1;
let file_size_hint = self.file_handle.meta_ref().index_file_size();
let rgs = parquet_meta.row_groups().iter().enumerate().map(|(i, rg)| {
(
@@ -1132,6 +1140,19 @@ pub(crate) struct ReaderFilterMetrics {
/// Number of rows filtered by precise filter.
pub(crate) rows_precise_filtered: usize,
/// Number of index result cache hits for fulltext index.
pub(crate) fulltext_index_cache_hit: usize,
/// Number of index result cache misses for fulltext index.
pub(crate) fulltext_index_cache_miss: usize,
/// Number of index result cache hits for inverted index.
pub(crate) inverted_index_cache_hit: usize,
/// Number of index result cache misses for inverted index.
pub(crate) inverted_index_cache_miss: usize,
/// Number of index result cache hits for bloom filter index.
pub(crate) bloom_filter_cache_hit: usize,
/// Number of index result cache misses for bloom filter index.
pub(crate) bloom_filter_cache_miss: usize,
/// Optional metrics for inverted index applier.
pub(crate) inverted_index_apply_metrics: Option<InvertedIndexApplyMetrics>,
/// Optional metrics for bloom filter index applier.
@@ -1157,6 +1178,13 @@ impl ReaderFilterMetrics {
self.rows_vector_filtered += other.rows_vector_filtered;
self.rows_precise_filtered += other.rows_precise_filtered;
self.fulltext_index_cache_hit += other.fulltext_index_cache_hit;
self.fulltext_index_cache_miss += other.fulltext_index_cache_miss;
self.inverted_index_cache_hit += other.inverted_index_cache_hit;
self.inverted_index_cache_miss += other.inverted_index_cache_miss;
self.bloom_filter_cache_hit += other.bloom_filter_cache_hit;
self.bloom_filter_cache_miss += other.bloom_filter_cache_miss;
// Merge optional applier metrics
if let Some(other_metrics) = &other.inverted_index_apply_metrics {
self.inverted_index_apply_metrics
@@ -1302,6 +1330,10 @@ pub(crate) struct MetadataCacheMetrics {
pub(crate) cache_miss: usize,
/// Duration to load parquet metadata.
pub(crate) metadata_load_cost: Duration,
/// Number of read operations performed.
pub(crate) num_reads: usize,
/// Total bytes read from storage.
pub(crate) bytes_read: u64,
}
impl std::fmt::Debug for MetadataCacheMetrics {
@@ -1311,6 +1343,8 @@ impl std::fmt::Debug for MetadataCacheMetrics {
file_cache_hit,
cache_miss,
metadata_load_cost,
num_reads,
bytes_read,
} = self;
if self.is_empty() {
@@ -1329,6 +1363,12 @@ impl std::fmt::Debug for MetadataCacheMetrics {
if *cache_miss > 0 {
write!(f, ", \"cache_miss\":{}", cache_miss)?;
}
if *num_reads > 0 {
write!(f, ", \"num_reads\":{}", num_reads)?;
}
if *bytes_read > 0 {
write!(f, ", \"bytes_read\":{}", bytes_read)?;
}
write!(f, "}}")
}
@@ -1346,6 +1386,8 @@ impl MetadataCacheMetrics {
self.file_cache_hit += other.file_cache_hit;
self.cache_miss += other.cache_miss;
self.metadata_load_cost += other.metadata_load_cost;
self.num_reads += other.num_reads;
self.bytes_read += other.bytes_read;
}
}

View File

@@ -520,9 +520,10 @@ async fn edit_region(
{
// Triggers the filling of the parquet metadata cache.
// The parquet file is already downloaded.
let mut cache_metrics = Default::default();
let _ = write_cache
.file_cache()
.get_parquet_meta_data(index_key)
.get_parquet_meta_data(index_key, &mut cache_metrics)
.await;
listener.on_file_cache_filled(index_key.file_id);

View File

@@ -25,6 +25,7 @@ pub type PuffinMetadataCacheRef = Arc<PuffinMetadataCache>;
/// A cache for storing the metadata of the index files.
pub struct PuffinMetadataCache {
cache: moka::sync::Cache<String, Arc<FileMetadata>>,
cache_bytes: IntGaugeVec,
}
fn puffin_metadata_weight(k: &str, v: &Arc<FileMetadata>) -> u32 {
@@ -38,13 +39,14 @@ impl PuffinMetadataCache {
cache: moka::sync::CacheBuilder::new(capacity)
.name("puffin_metadata")
.weigher(|k: &String, v| puffin_metadata_weight(k, v))
.eviction_listener(|k, v, _cause| {
.eviction_listener(move |k, v, _cause| {
let size = puffin_metadata_weight(&k, &v);
cache_bytes
.with_label_values(&[PUFFIN_METADATA_TYPE])
.sub(size.into());
})
.build(),
cache_bytes: cache_bytes.clone(),
}
}
@@ -55,6 +57,10 @@ impl PuffinMetadataCache {
/// Puts the metadata into the cache.
pub fn put_metadata(&self, file_id: String, metadata: Arc<FileMetadata>) {
let size = puffin_metadata_weight(&file_id, &metadata);
self.cache_bytes
.with_label_values(&[PUFFIN_METADATA_TYPE])
.add(size.into());
self.cache.insert(file_id, metadata);
}

View File

@@ -90,6 +90,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
-- SQLNESS REPLACE (Duration.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 10, '5s') test;
+-+-+-+

View File

@@ -44,6 +44,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
-- SQLNESS REPLACE (Duration.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 10, '5s') test;
DROP TABLE test;

View File

@@ -63,6 +63,7 @@ TQL EVAL (0, 100, '15s') test{host=~"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 0, '1s') test{host=~".*"};
+-+-+-+
@@ -86,6 +87,7 @@ TQL ANALYZE VERBOSE (0, 0, '1s') test{host=~".*"};
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 0, '1s') test{host=~".+"};
+-+-+-+
@@ -109,6 +111,7 @@ TQL ANALYZE VERBOSE (0, 0, '1s') test{host=~".+"};
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 0, '1s') test{host!~".*"};
+-+-+-+
@@ -134,6 +137,7 @@ TQL ANALYZE VERBOSE (0, 0, '1s') test{host!~".*"};
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 0, '1s') test{host!~".+"};
+-+-+-+

View File

@@ -26,6 +26,7 @@ TQL EVAL (0, 100, '15s') test{host=~"(10\\.0\\.160\\.237:8080|10\\.0\\.160\\.237
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 0, '1s') test{host=~".*"};
-- SQLNESS REPLACE (metrics.*) REDACTED
@@ -35,6 +36,7 @@ TQL ANALYZE VERBOSE (0, 0, '1s') test{host=~".*"};
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 0, '1s') test{host=~".+"};
-- SQLNESS REPLACE (metrics.*) REDACTED
@@ -44,6 +46,7 @@ TQL ANALYZE VERBOSE (0, 0, '1s') test{host=~".+"};
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 0, '1s') test{host!~".*"};
-- SQLNESS REPLACE (metrics.*) REDACTED
@@ -53,6 +56,7 @@ TQL ANALYZE VERBOSE (0, 0, '1s') test{host!~".*"};
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 0, '1s') test{host!~".+"};
DROP TABLE test;

View File

@@ -90,6 +90,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
-- SQLNESS REPLACE (Duration.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 10, '5s') test;
+-+-+-+
@@ -208,6 +209,7 @@ TQL ANALYZE FORMAT JSON (0, 10, '5s') test;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (Duration.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
+-+-+-+

View File

@@ -44,6 +44,7 @@ TQL ANALYZE ('1970-01-01T00:00:00'::timestamp, '1970-01-01T00:00:00'::timestamp
-- SQLNESS REPLACE (Duration.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (Hash.*) REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE (0, 10, '5s') test;
DROP TABLE test;
@@ -89,6 +90,7 @@ TQL ANALYZE FORMAT JSON (0, 10, '5s') test;
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE (Duration.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
-- SQLNESS REPLACE (flat_format.*) REDACTED
TQL ANALYZE VERBOSE FORMAT JSON (0, 10, '5s') test;
-- analyze with TEXT format (should be same as default)