mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-20 06:50:37 +00:00
feat(mito): Add metrics to read path (#2701)
* feat: SST metrics and debug log * feat: add parquet metrics * feat: iter memtable metrics * feat: memtable metrics and read elapsed * feat: merge metrics * feat: seq scan metrics * chore: typo * test: fix merge test * feat: fix compiler errors * feat: scan region log level * feat: add build cost to seq scan metrics * feat: adjust memtable log level * fix: correct merge metrics
This commit is contained in:
@@ -17,9 +17,10 @@ use std::collections::{BTreeMap, Bound, HashSet};
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use api::v1::OpType;
|
||||
use common_telemetry::{debug, error};
|
||||
use common_telemetry::{debug, error, trace};
|
||||
use common_time::Timestamp;
|
||||
use datafusion::physical_plan::PhysicalExpr;
|
||||
use datafusion_common::ScalarValue;
|
||||
@@ -47,6 +48,7 @@ use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
|
||||
MemtableRef, MemtableStats,
|
||||
};
|
||||
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
|
||||
@@ -319,6 +321,7 @@ impl SeriesSet {
|
||||
pk_schema: primary_key_schema,
|
||||
primary_key_builders,
|
||||
codec: self.codec.clone(),
|
||||
metrics: Metrics::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -346,6 +349,21 @@ fn primary_key_builders(
|
||||
(builders, Arc::new(arrow::datatypes::Schema::new(fields)))
|
||||
}
|
||||
|
||||
/// Metrics for reading the memtable.
|
||||
#[derive(Debug, Default)]
|
||||
struct Metrics {
|
||||
/// Total series in the memtable.
|
||||
total_series: usize,
|
||||
/// Number of series pruned.
|
||||
num_pruned_series: usize,
|
||||
/// Number of rows read.
|
||||
num_rows: usize,
|
||||
/// Number of batch read.
|
||||
num_batches: usize,
|
||||
/// Duration to scan the memtable.
|
||||
scan_cost: Duration,
|
||||
}
|
||||
|
||||
struct Iter {
|
||||
metadata: RegionMetadataRef,
|
||||
series: Arc<SeriesRwLockMap>,
|
||||
@@ -355,12 +373,30 @@ struct Iter {
|
||||
pk_schema: arrow::datatypes::SchemaRef,
|
||||
primary_key_builders: Vec<Box<dyn MutableVector>>,
|
||||
codec: Arc<McmpRowCodec>,
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
impl Drop for Iter {
|
||||
fn drop(&mut self) {
|
||||
debug!(
|
||||
"Iter {} time series memtable, metrics: {:?}",
|
||||
self.metadata.region_id, self.metrics
|
||||
);
|
||||
|
||||
READ_ROWS_TOTAL
|
||||
.with_label_values(&["time_series_memtable"])
|
||||
.inc_by(self.metrics.num_rows as u64);
|
||||
READ_STAGE_ELAPSED
|
||||
.with_label_values(&["scan_memtable"])
|
||||
.observe(self.metrics.scan_cost.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for Iter {
|
||||
type Item = Result<Batch>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
let start = Instant::now();
|
||||
let map = self.series.read().unwrap();
|
||||
let range = match &self.last_key {
|
||||
None => map.range::<Vec<u8>, _>(..),
|
||||
@@ -371,7 +407,10 @@ impl Iterator for Iter {
|
||||
|
||||
// TODO(hl): maybe yield more than one time series to amortize range overhead.
|
||||
for (primary_key, series) in range {
|
||||
self.metrics.total_series += 1;
|
||||
|
||||
let mut series = series.write().unwrap();
|
||||
let start = Instant::now();
|
||||
if !self.predicate.is_empty()
|
||||
&& !prune_primary_key(
|
||||
&self.codec,
|
||||
@@ -383,15 +422,23 @@ impl Iterator for Iter {
|
||||
)
|
||||
{
|
||||
// read next series
|
||||
self.metrics.num_pruned_series += 1;
|
||||
continue;
|
||||
}
|
||||
self.last_key = Some(primary_key.clone());
|
||||
|
||||
let values = series.compact(&self.metadata);
|
||||
return Some(
|
||||
values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)),
|
||||
);
|
||||
let batch =
|
||||
values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection));
|
||||
|
||||
// Update metrics.
|
||||
self.metrics.num_batches += 1;
|
||||
self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
return Some(batch);
|
||||
}
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
@@ -410,12 +457,7 @@ fn prune_primary_key(
|
||||
}
|
||||
|
||||
if let Some(rb) = series.pk_cache.as_ref() {
|
||||
let res = prune_inner(predicate, rb).unwrap_or(true);
|
||||
debug!(
|
||||
"Prune primary key: {:?}, predicate: {:?}, res: {:?}",
|
||||
rb, predicate, res
|
||||
);
|
||||
res
|
||||
prune_inner(predicate, rb).unwrap_or(true)
|
||||
} else {
|
||||
let rb = match pk_to_record_batch(codec, pk, builders, pk_schema) {
|
||||
Ok(rb) => rb,
|
||||
@@ -425,7 +467,6 @@ fn prune_primary_key(
|
||||
}
|
||||
};
|
||||
let res = prune_inner(predicate, &rb).unwrap_or(true);
|
||||
debug!("Prune primary key: {:?}, res: {:?}", rb, res);
|
||||
series.update_pk_cache(rb);
|
||||
res
|
||||
}
|
||||
@@ -452,9 +493,11 @@ fn prune_inner(predicates: &[Arc<dyn PhysicalExpr>], primary_key: &RecordBatch)
|
||||
unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key);
|
||||
}
|
||||
};
|
||||
debug!(
|
||||
trace!(
|
||||
"Evaluate primary key {:?} against filter: {:?}, result: {:?}",
|
||||
primary_key, expr, result
|
||||
primary_key,
|
||||
expr,
|
||||
result
|
||||
);
|
||||
if !result {
|
||||
return Ok(false);
|
||||
|
||||
@@ -105,4 +105,21 @@ lazy_static! {
|
||||
/// Counter of failed compaction task.
|
||||
pub static ref COMPACTION_FAILURE_COUNT: IntCounter =
|
||||
register_int_counter!("mito_compaction_failure_total", "mito compaction failure total").unwrap();
|
||||
// ------- End of compaction metrics.
|
||||
|
||||
// Query metrics.
|
||||
/// Timer of different stages in query.
|
||||
pub static ref READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
|
||||
"mito_read_stage_elapsed",
|
||||
"mito read stage elapsed",
|
||||
&[STAGE_LABEL]
|
||||
)
|
||||
.unwrap();
|
||||
/// Counter of rows read.
|
||||
pub static ref READ_ROWS_TOTAL: IntCounterVec =
|
||||
register_int_counter_vec!("mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();
|
||||
/// Counter of filtered rows during merge.
|
||||
pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec =
|
||||
register_int_counter_vec!("mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap();
|
||||
// ------- End of query metrics.
|
||||
}
|
||||
|
||||
@@ -17,12 +17,15 @@
|
||||
use std::cmp::Ordering;
|
||||
use std::collections::BinaryHeap;
|
||||
use std::mem;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::debug;
|
||||
use common_time::Timestamp;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::metrics::{MERGE_FILTER_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::read::{Batch, BatchReader, BoxedBatchReader, Source};
|
||||
|
||||
/// Minimum batch size to output.
|
||||
@@ -51,11 +54,14 @@ pub struct MergeReader {
|
||||
/// Suggested size of each batch. The batch returned by the reader can have more rows than the
|
||||
/// batch size.
|
||||
batch_size: usize,
|
||||
/// Local metrics.
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BatchReader for MergeReader {
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
let start = Instant::now();
|
||||
while !self.hot.is_empty() && self.batch_merger.num_rows() < self.batch_size {
|
||||
if let Some(current_key) = self.batch_merger.primary_key() {
|
||||
// If the hottest node has a different key, we have finish collecting current key.
|
||||
@@ -68,28 +74,55 @@ impl BatchReader for MergeReader {
|
||||
if self.hot.len() == 1 {
|
||||
// No need to do merge sort if only one batch in the hot heap.
|
||||
self.fetch_batch_from_hottest().await?;
|
||||
self.metrics.num_fetch_by_batches += 1;
|
||||
} else {
|
||||
// We could only fetch rows that less than the next node from the hottest node.
|
||||
self.fetch_rows_from_hottest().await?;
|
||||
self.metrics.num_fetch_by_rows += 1;
|
||||
}
|
||||
}
|
||||
|
||||
if self.batch_merger.is_empty() {
|
||||
// Nothing fetched.
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
// Update deleted rows num.
|
||||
self.metrics.num_deleted_rows = self.batch_merger.num_deleted_rows();
|
||||
Ok(None)
|
||||
} else {
|
||||
self.batch_merger.merge_batches()
|
||||
let batch = self.batch_merger.merge_batches()?;
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
self.metrics.num_output_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
|
||||
Ok(batch)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for MergeReader {
|
||||
fn drop(&mut self) {
|
||||
debug!("Merge reader finished, metrics: {:?}", self.metrics);
|
||||
|
||||
MERGE_FILTER_ROWS_TOTAL
|
||||
.with_label_values(&["dedup"])
|
||||
.inc_by(self.metrics.num_duplicate_rows as u64);
|
||||
MERGE_FILTER_ROWS_TOTAL
|
||||
.with_label_values(&["delete"])
|
||||
.inc_by(self.metrics.num_deleted_rows as u64);
|
||||
READ_STAGE_ELAPSED
|
||||
.with_label_values(&["merge"])
|
||||
.observe(self.metrics.scan_cost.as_secs_f64());
|
||||
}
|
||||
}
|
||||
|
||||
impl MergeReader {
|
||||
/// Creates and initializes a new [MergeReader].
|
||||
pub async fn new(sources: Vec<Source>, batch_size: usize) -> Result<MergeReader> {
|
||||
let start = Instant::now();
|
||||
let mut metrics = Metrics::default();
|
||||
|
||||
let mut cold = BinaryHeap::with_capacity(sources.len());
|
||||
let hot = BinaryHeap::with_capacity(sources.len());
|
||||
for source in sources {
|
||||
let node = Node::new(source).await?;
|
||||
let node = Node::new(source, &mut metrics).await?;
|
||||
if !node.is_eof() {
|
||||
// Ensure `cold` don't have eof nodes.
|
||||
cold.push(node);
|
||||
@@ -101,10 +134,12 @@ impl MergeReader {
|
||||
cold,
|
||||
batch_merger: BatchMerger::new(),
|
||||
batch_size,
|
||||
metrics,
|
||||
};
|
||||
// Initializes the reader.
|
||||
reader.refill_hot();
|
||||
|
||||
reader.metrics.scan_cost += start.elapsed();
|
||||
Ok(reader)
|
||||
}
|
||||
|
||||
@@ -132,7 +167,7 @@ impl MergeReader {
|
||||
assert_eq!(1, self.hot.len());
|
||||
|
||||
let mut hottest = self.hot.pop().unwrap();
|
||||
let batch = hottest.fetch_batch().await?;
|
||||
let batch = hottest.fetch_batch(&mut self.metrics).await?;
|
||||
self.batch_merger.push(batch)?;
|
||||
self.reheap(hottest)
|
||||
}
|
||||
@@ -161,12 +196,12 @@ impl MergeReader {
|
||||
// value directly.
|
||||
match timestamps.binary_search(&next_min_ts.value()) {
|
||||
Ok(pos) => {
|
||||
// They have duplicate timestamps. Outputs timestamps before the duplciated timestamp.
|
||||
// They have duplicate timestamps. Outputs timestamps before the duplicated timestamp.
|
||||
// Batch itself doesn't contain duplicate timestamps so timestamps before `pos`
|
||||
// must be less than `next_min_ts`.
|
||||
self.batch_merger.push(top.slice(0, pos))?;
|
||||
// This keep the duplicate timestamp in the node.
|
||||
top_node.skip_rows(pos).await?;
|
||||
top_node.skip_rows(pos, &mut self.metrics).await?;
|
||||
// The merge window should contain this timestamp so only nodes in the hot heap
|
||||
// have this timestamp.
|
||||
self.filter_first_duplicate_timestamp_in_hot(top_node, next_min_ts)
|
||||
@@ -175,7 +210,7 @@ impl MergeReader {
|
||||
Err(pos) => {
|
||||
// No duplicate timestamp. Outputs timestamp before `pos`.
|
||||
self.batch_merger.push(top.slice(0, pos))?;
|
||||
top_node.skip_rows(pos).await?;
|
||||
top_node.skip_rows(pos, &mut self.metrics).await?;
|
||||
self.reheap(top_node)?;
|
||||
}
|
||||
}
|
||||
@@ -211,16 +246,18 @@ impl MergeReader {
|
||||
|
||||
if max_seq < next_first_seq {
|
||||
// The next node has larger seq.
|
||||
max_seq_node.skip_rows(1).await?;
|
||||
max_seq_node.skip_rows(1, &mut self.metrics).await?;
|
||||
self.metrics.num_duplicate_rows += 1;
|
||||
if !max_seq_node.is_eof() {
|
||||
self.cold.push(max_seq_node);
|
||||
}
|
||||
max_seq_node = next_node;
|
||||
max_seq = next_first_seq;
|
||||
} else {
|
||||
next_node.skip_rows(1).await?;
|
||||
next_node.skip_rows(1, &mut self.metrics).await?;
|
||||
self.metrics.num_duplicate_rows += 1;
|
||||
if !next_node.is_eof() {
|
||||
// If the next node is
|
||||
// If the next node has smaller seq, skip that row.
|
||||
self.cold.push(next_node);
|
||||
}
|
||||
}
|
||||
@@ -315,12 +352,33 @@ impl Default for MergeReaderBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for the merge reader.
|
||||
#[derive(Debug, Default)]
|
||||
struct Metrics {
|
||||
/// Total scan cost of the reader.
|
||||
scan_cost: Duration,
|
||||
/// Number of times to fetch batches.
|
||||
num_fetch_by_batches: usize,
|
||||
/// Number of times to fetch rows.
|
||||
num_fetch_by_rows: usize,
|
||||
/// Number of input rows.
|
||||
num_input_rows: usize,
|
||||
/// Number of skipped duplicate rows.
|
||||
num_duplicate_rows: usize,
|
||||
/// Number of output rows.
|
||||
num_output_rows: usize,
|
||||
/// Number of deleted rows.
|
||||
num_deleted_rows: usize,
|
||||
}
|
||||
|
||||
/// Helper to collect and merge small batches for same primary key.
|
||||
struct BatchMerger {
|
||||
/// Buffered non-empty batches to merge.
|
||||
batches: Vec<Batch>,
|
||||
/// Number of rows in the batch.
|
||||
num_rows: usize,
|
||||
/// Number of rows deleted.
|
||||
num_deleted_rows: usize,
|
||||
}
|
||||
|
||||
impl BatchMerger {
|
||||
@@ -329,6 +387,7 @@ impl BatchMerger {
|
||||
BatchMerger {
|
||||
batches: Vec::new(),
|
||||
num_rows: 0,
|
||||
num_deleted_rows: 0,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -337,6 +396,11 @@ impl BatchMerger {
|
||||
self.num_rows
|
||||
}
|
||||
|
||||
/// Returns the number of rows deleted.
|
||||
fn num_deleted_rows(&self) -> usize {
|
||||
self.num_deleted_rows
|
||||
}
|
||||
|
||||
/// Returns true if the merger is empty.
|
||||
fn is_empty(&self) -> bool {
|
||||
self.num_rows() == 0
|
||||
@@ -360,7 +424,9 @@ impl BatchMerger {
|
||||
.map(|b| b.primary_key() == batch.primary_key())
|
||||
.unwrap_or(true));
|
||||
|
||||
let num_rows = batch.num_rows();
|
||||
batch.filter_deleted()?;
|
||||
self.num_deleted_rows += num_rows - batch.num_rows();
|
||||
if batch.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
@@ -402,9 +468,11 @@ impl Node {
|
||||
/// Initialize a node.
|
||||
///
|
||||
/// It tries to fetch one batch from the `source`.
|
||||
async fn new(mut source: Source) -> Result<Node> {
|
||||
async fn new(mut source: Source, metrics: &mut Metrics) -> Result<Node> {
|
||||
// Ensures batch is not empty.
|
||||
let current_batch = source.next_batch().await?.map(CompareFirst);
|
||||
metrics.num_input_rows += current_batch.as_ref().map(|b| b.0.num_rows()).unwrap_or(0);
|
||||
|
||||
Ok(Node {
|
||||
source,
|
||||
current_batch,
|
||||
@@ -437,10 +505,15 @@ impl Node {
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the node has reached EOF.
|
||||
async fn fetch_batch(&mut self) -> Result<Batch> {
|
||||
async fn fetch_batch(&mut self, metrics: &mut Metrics) -> Result<Batch> {
|
||||
let current = self.current_batch.take().unwrap();
|
||||
// Ensures batch is not empty.
|
||||
self.current_batch = self.source.next_batch().await?.map(CompareFirst);
|
||||
metrics.num_input_rows += self
|
||||
.current_batch
|
||||
.as_ref()
|
||||
.map(|b| b.0.num_rows())
|
||||
.unwrap_or(0);
|
||||
Ok(current.0)
|
||||
}
|
||||
|
||||
@@ -468,13 +541,14 @@ impl Node {
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the node is EOF.
|
||||
async fn skip_rows(&mut self, num_to_skip: usize) -> Result<()> {
|
||||
async fn skip_rows(&mut self, num_to_skip: usize, metrics: &mut Metrics) -> Result<()> {
|
||||
let batch = self.current_batch();
|
||||
debug_assert!(batch.num_rows() >= num_to_skip);
|
||||
|
||||
let remaining = batch.num_rows() - num_to_skip;
|
||||
if remaining == 0 {
|
||||
// Nothing remains, we need to fetch next batch to ensure the batch is not empty.
|
||||
self.fetch_batch().await?;
|
||||
self.fetch_batch(metrics).await?;
|
||||
} else {
|
||||
debug_assert!(!batch.is_empty());
|
||||
self.current_batch = Some(CompareFirst(batch.slice(num_to_skip, remaining)));
|
||||
@@ -610,6 +684,10 @@ mod tests {
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(8, reader.metrics.num_input_rows);
|
||||
assert_eq!(6, reader.metrics.num_output_rows);
|
||||
assert_eq!(2, reader.metrics.num_deleted_rows);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -722,6 +800,11 @@ mod tests {
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
assert_eq!(11, reader.metrics.num_input_rows);
|
||||
assert_eq!(7, reader.metrics.num_output_rows);
|
||||
assert_eq!(2, reader.metrics.num_deleted_rows);
|
||||
assert_eq!(2, reader.metrics.num_duplicate_rows);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -1051,6 +1134,11 @@ mod tests {
|
||||
.push(new_batch(b"k1", &[2], &[10], &[OpType::Put], &[22]))
|
||||
.unwrap();
|
||||
assert_eq!(2, merger.num_rows());
|
||||
merger
|
||||
.push(new_batch(b"k1", &[3], &[10], &[OpType::Delete], &[23]))
|
||||
.unwrap();
|
||||
assert_eq!(2, merger.num_rows());
|
||||
|
||||
let batch = merger.merge_batches().unwrap().unwrap();
|
||||
assert_eq!(2, batch.num_rows());
|
||||
assert_eq!(
|
||||
@@ -1064,5 +1152,6 @@ mod tests {
|
||||
)
|
||||
);
|
||||
assert!(merger.is_empty());
|
||||
assert_eq!(1, merger.num_deleted_rows());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,19 +15,22 @@
|
||||
//! Sequential scan.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_stream::try_stream;
|
||||
use common_error::ext::BoxedError;
|
||||
use common_recordbatch::error::ExternalSnafu;
|
||||
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
|
||||
use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream};
|
||||
use common_telemetry::debug;
|
||||
use common_time::range::TimestampRange;
|
||||
use snafu::ResultExt;
|
||||
use table::predicate::Predicate;
|
||||
|
||||
use crate::access_layer::AccessLayerRef;
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::cache::{CacheManager, CacheManagerRef};
|
||||
use crate::error::Result;
|
||||
use crate::memtable::MemtableRef;
|
||||
use crate::metrics::READ_STAGE_ELAPSED;
|
||||
use crate::read::compat::{self, CompatReader};
|
||||
use crate::read::merge::MergeReaderBuilder;
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
@@ -105,22 +108,27 @@ impl SeqScan {
|
||||
|
||||
/// Builds a stream for the query.
|
||||
pub async fn build_stream(&self) -> Result<SendableRecordBatchStream> {
|
||||
let start = Instant::now();
|
||||
// Scans all memtables and SSTs. Builds a merge reader to merge results.
|
||||
let mut reader = self.build_reader().await?;
|
||||
let mut metrics = Metrics {
|
||||
scan_cost: start.elapsed(),
|
||||
};
|
||||
|
||||
// Creates a stream to poll the batch reader and convert batch into record batch.
|
||||
let mapper = self.mapper.clone();
|
||||
let cache_manager = self.cache_manager.clone();
|
||||
let stream = try_stream! {
|
||||
let cache = cache_manager.as_ref().map(|cache| cache.as_ref());
|
||||
while let Some(batch) = reader
|
||||
.next_batch()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
while let Some(batch) =
|
||||
Self::fetch_record_batch(&mut reader, &mapper, cache, &mut metrics).await?
|
||||
{
|
||||
yield mapper.convert(&batch, cache)?;
|
||||
yield batch;
|
||||
}
|
||||
|
||||
debug!("Seq scan finished, region_id: {:?}, metrics: {:?}", mapper.metadata().region_id, metrics);
|
||||
// Update metrics.
|
||||
READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.scan_cost.as_secs_f64());
|
||||
};
|
||||
let stream = Box::pin(RecordBatchStreamAdaptor::new(
|
||||
self.mapper.output_schema(),
|
||||
@@ -160,6 +168,39 @@ impl SeqScan {
|
||||
}
|
||||
Ok(Box::new(builder.build().await?))
|
||||
}
|
||||
|
||||
/// Fetch a batch from the reader and convert it into a record batch.
|
||||
async fn fetch_record_batch(
|
||||
reader: &mut dyn BatchReader,
|
||||
mapper: &ProjectionMapper,
|
||||
cache: Option<&CacheManager>,
|
||||
metrics: &mut Metrics,
|
||||
) -> common_recordbatch::error::Result<Option<RecordBatch>> {
|
||||
let start = Instant::now();
|
||||
|
||||
let Some(batch) = reader
|
||||
.next_batch()
|
||||
.await
|
||||
.map_err(BoxedError::new)
|
||||
.context(ExternalSnafu)?
|
||||
else {
|
||||
metrics.scan_cost += start.elapsed();
|
||||
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let record_batch = mapper.convert(&batch, cache)?;
|
||||
metrics.scan_cost += start.elapsed();
|
||||
|
||||
Ok(Some(record_batch))
|
||||
}
|
||||
}
|
||||
|
||||
/// Metrics for [SeqScan].
|
||||
#[derive(Debug, Default)]
|
||||
struct Metrics {
|
||||
/// Duration to scan data.
|
||||
scan_cost: Duration,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -16,9 +16,11 @@
|
||||
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_compat::{Compat, CompatExt};
|
||||
use async_trait::async_trait;
|
||||
use common_telemetry::debug;
|
||||
use common_time::range::TimestampRange;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use object_store::{ObjectStore, Reader};
|
||||
@@ -38,6 +40,7 @@ use crate::error::{
|
||||
ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, OpenDalSnafu, ReadParquetSnafu,
|
||||
Result,
|
||||
};
|
||||
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::sst::file::FileHandle;
|
||||
use crate::sst::parquet::format::ReadFormat;
|
||||
@@ -112,6 +115,8 @@ impl ParquetReaderBuilder {
|
||||
///
|
||||
/// This needs to perform IO operation.
|
||||
pub async fn build(&self) -> Result<ParquetReader> {
|
||||
let start = Instant::now();
|
||||
|
||||
let file_path = self.file_handle.file_path(&self.file_dir);
|
||||
// Now we create a reader to read the whole file.
|
||||
let reader = self
|
||||
@@ -172,6 +177,7 @@ impl ParquetReaderBuilder {
|
||||
.context(ReadParquetSnafu { path: &file_path })?;
|
||||
|
||||
let reader_builder = RowGroupReaderBuilder {
|
||||
file_handle: self.file_handle.clone(),
|
||||
file_path,
|
||||
parquet_meta,
|
||||
file_reader: reader,
|
||||
@@ -179,13 +185,19 @@ impl ParquetReaderBuilder {
|
||||
field_levels,
|
||||
};
|
||||
|
||||
let metrics = Metrics {
|
||||
read_row_groups: row_groups.len(),
|
||||
build_cost: start.elapsed(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Ok(ParquetReader {
|
||||
_file_handle: self.file_handle.clone(),
|
||||
row_groups,
|
||||
read_format,
|
||||
reader_builder,
|
||||
current_reader: None,
|
||||
batches: VecDeque::new(),
|
||||
metrics,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -247,8 +259,29 @@ impl ParquetReaderBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Parquet reader metrics.
|
||||
#[derive(Debug, Default)]
|
||||
struct Metrics {
|
||||
/// Number of row groups to read.
|
||||
read_row_groups: usize,
|
||||
/// Duration to build the parquet reader.
|
||||
build_cost: Duration,
|
||||
/// Duration to scan the reader.
|
||||
scan_cost: Duration,
|
||||
/// Number of record batches read.
|
||||
num_record_batches: usize,
|
||||
/// Number of batches decoded.
|
||||
num_batches: usize,
|
||||
/// Number of rows read.
|
||||
num_rows: usize,
|
||||
}
|
||||
|
||||
/// Builder to build a [ParquetRecordBatchReader] for a row group.
|
||||
struct RowGroupReaderBuilder {
|
||||
/// SST file to read.
|
||||
///
|
||||
/// Holds the file handle to avoid the file purge purge it.
|
||||
file_handle: FileHandle,
|
||||
/// Path of the file.
|
||||
file_path: String,
|
||||
/// Metadata of the parquet file.
|
||||
@@ -294,10 +327,6 @@ impl RowGroupReaderBuilder {
|
||||
|
||||
/// Parquet batch reader to read our SST format.
|
||||
pub struct ParquetReader {
|
||||
/// SST file to read.
|
||||
///
|
||||
/// Holds the file handle to avoid the file purge purge it.
|
||||
_file_handle: FileHandle,
|
||||
/// Indices of row groups to read.
|
||||
row_groups: VecDeque<usize>,
|
||||
/// Helper to read record batches.
|
||||
@@ -310,24 +339,60 @@ pub struct ParquetReader {
|
||||
current_reader: Option<ParquetRecordBatchReader>,
|
||||
/// Buffered batches to return.
|
||||
batches: VecDeque<Batch>,
|
||||
/// Local metrics.
|
||||
metrics: Metrics,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl BatchReader for ParquetReader {
|
||||
async fn next_batch(&mut self) -> Result<Option<Batch>> {
|
||||
let start = Instant::now();
|
||||
if let Some(batch) = self.batches.pop_front() {
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
self.metrics.num_rows += batch.num_rows();
|
||||
return Ok(Some(batch));
|
||||
}
|
||||
|
||||
// We need to fetch next record batch and convert it to batches.
|
||||
let Some(record_batch) = self.fetch_next_record_batch().await? else {
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
return Ok(None);
|
||||
};
|
||||
self.metrics.num_record_batches += 1;
|
||||
|
||||
self.read_format
|
||||
.convert_record_batch(&record_batch, &mut self.batches)?;
|
||||
self.metrics.num_batches += self.batches.len();
|
||||
|
||||
Ok(self.batches.pop_front())
|
||||
let batch = self.batches.pop_front();
|
||||
self.metrics.scan_cost += start.elapsed();
|
||||
self.metrics.num_rows += batch.as_ref().map(|b| b.num_rows()).unwrap_or(0);
|
||||
Ok(batch)
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for ParquetReader {
|
||||
fn drop(&mut self) {
|
||||
debug!(
|
||||
"Read parquet {} {}, range: {:?}, {}/{} row groups, metrics: {:?}",
|
||||
self.reader_builder.file_handle.region_id(),
|
||||
self.reader_builder.file_handle.file_id(),
|
||||
self.reader_builder.file_handle.time_range(),
|
||||
self.metrics.read_row_groups,
|
||||
self.reader_builder.parquet_meta.num_row_groups(),
|
||||
self.metrics
|
||||
);
|
||||
|
||||
// Report metrics.
|
||||
READ_STAGE_ELAPSED
|
||||
.with_label_values(&["build_parquet_reader"])
|
||||
.observe(self.metrics.build_cost.as_secs_f64());
|
||||
READ_STAGE_ELAPSED
|
||||
.with_label_values(&["scan_row_groups"])
|
||||
.observe(self.metrics.scan_cost.as_secs_f64());
|
||||
READ_ROWS_TOTAL
|
||||
.with_label_values(&["parquet"])
|
||||
.inc_by(self.metrics.num_rows as u64);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user