mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 06:30:05 +00:00
chore: add metrics for active series and field builders (#6332)
* chore/series-metrics: ### Add Metrics for Active Series and Values in Memtable - **`simple_bulk_memtable.rs`**: Implemented `Drop` trait for `SimpleBulkMemtable` to decrement `MEMTABLE_ACTIVE_SERIES_COUNT` and `MEMTABLE_ACTIVE_VALUES_COUNT` upon dropping. - **`time_series.rs`**: - Introduced `SeriesMap` with `Drop` implementation to manage active series and values count. - Updated `SeriesSet` and `Iter` to use `SeriesMap`. - Added `num_values` method in `Series` to calculate the number of values. - **`metrics.rs`**: Added `MEMTABLE_ACTIVE_SERIES_COUNT` and `MEMTABLE_ACTIVE_VALUES_COUNT` metrics to track active series and values in `TimeSeriesMemtable`. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/series-metrics: - Add metrics for active series and field builders - Update dashboard Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * chore/series-metrics: **Add Series Count Tracking in Memtables** - **`flush.rs`**: Updated `RegionFlushTask` to track and log the series count during memtable flush operations. - **`memtable.rs`**: Introduced `series_count` in `MemtableStats` and added a method to retrieve it. - **`partition_tree.rs`, `partition.rs`, `tree.rs`**: Implemented series count calculation in `PartitionTreeMemtable` and its components. - **`simple_bulk_memtable.rs`, `time_series.rs`**: Integrated series count tracking in `SimpleBulkMemtable` and `TimeSeriesMemtable` implementations. Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * Update src/mito2/src/memtable.rs Co-authored-by: Yingwen <realevenyag@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
File diff suppressed because it is too large
Load Diff
@@ -70,6 +70,7 @@
|
|||||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||||
|
| Active Series and Field Builders Count | `sum by(instance, pod) (greptime_mito_memtable_active_series_count)`<br/>`sum by(instance, pod) (greptime_mito_memtable_field_builder_count)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]-series` |
|
||||||
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||||
# OpenDAL
|
# OpenDAL
|
||||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||||
|
|||||||
@@ -612,6 +612,21 @@ groups:
|
|||||||
type: prometheus
|
type: prometheus
|
||||||
uid: ${metrics}
|
uid: ${metrics}
|
||||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||||
|
- title: Active Series and Field Builders Count
|
||||||
|
type: timeseries
|
||||||
|
description: Compaction oinput output bytes
|
||||||
|
unit: none
|
||||||
|
queries:
|
||||||
|
- expr: sum by(instance, pod) (greptime_mito_memtable_active_series_count)
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '[{{instance}}]-[{{pod}}]-series'
|
||||||
|
- expr: sum by(instance, pod) (greptime_mito_memtable_field_builder_count)
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '[{{instance}}]-[{{pod}}]-field_builders'
|
||||||
- title: Region Worker Convert Requests
|
- title: Region Worker Convert Requests
|
||||||
type: timeseries
|
type: timeseries
|
||||||
description: Per-stage elapsed time for region worker to decode requests.
|
description: Per-stage elapsed time for region worker to decode requests.
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -70,6 +70,7 @@
|
|||||||
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
| Inflight Flush | `greptime_mito_inflight_flush_count` | `timeseries` | Ongoing flush task count | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]` |
|
||||||
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
| Compaction Input/Output Bytes | `sum by(instance, pod) (greptime_mito_compaction_input_bytes)`<br/>`sum by(instance, pod) (greptime_mito_compaction_output_bytes)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `bytes` | `[{{instance}}]-[{{pod}}]-input` |
|
||||||
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
| Region Worker Handle Bulk Insert Requests | `histogram_quantile(0.95, sum by(le,instance, stage, pod) (rate(greptime_region_worker_handle_write_bucket[$__rate_interval])))`<br/>`sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_sum[$__rate_interval]))/sum by(instance, stage, pod) (rate(greptime_region_worker_handle_write_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to handle bulk insert region requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||||
|
| Active Series and Field Builders Count | `sum by(instance, pod) (greptime_mito_memtable_active_series_count)`<br/>`sum by(instance, pod) (greptime_mito_memtable_field_builder_count)` | `timeseries` | Compaction oinput output bytes | `prometheus` | `none` | `[{{instance}}]-[{{pod}}]-series` |
|
||||||
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
| Region Worker Convert Requests | `histogram_quantile(0.95, sum by(le, instance, stage, pod) (rate(greptime_datanode_convert_region_request_bucket[$__rate_interval])))`<br/>`sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_sum[$__rate_interval]))/sum by(le,instance, stage, pod) (rate(greptime_datanode_convert_region_request_count[$__rate_interval]))` | `timeseries` | Per-stage elapsed time for region worker to decode requests. | `prometheus` | `s` | `[{{instance}}]-[{{pod}}]-[{{stage}}]-P95` |
|
||||||
# OpenDAL
|
# OpenDAL
|
||||||
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
| Title | Query | Type | Description | Datasource | Unit | Legend Format |
|
||||||
|
|||||||
@@ -612,6 +612,21 @@ groups:
|
|||||||
type: prometheus
|
type: prometheus
|
||||||
uid: ${metrics}
|
uid: ${metrics}
|
||||||
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
legendFormat: '[{{instance}}]-[{{pod}}]-[{{stage}}]-AVG'
|
||||||
|
- title: Active Series and Field Builders Count
|
||||||
|
type: timeseries
|
||||||
|
description: Compaction oinput output bytes
|
||||||
|
unit: none
|
||||||
|
queries:
|
||||||
|
- expr: sum by(instance, pod) (greptime_mito_memtable_active_series_count)
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '[{{instance}}]-[{{pod}}]-series'
|
||||||
|
- expr: sum by(instance, pod) (greptime_mito_memtable_field_builder_count)
|
||||||
|
datasource:
|
||||||
|
type: prometheus
|
||||||
|
uid: ${metrics}
|
||||||
|
legendFormat: '[{{instance}}]-[{{pod}}]-field_builders'
|
||||||
- title: Region Worker Convert Requests
|
- title: Region Worker Convert Requests
|
||||||
type: timeseries
|
type: timeseries
|
||||||
description: Per-stage elapsed time for region worker to decode requests.
|
description: Per-stage elapsed time for region worker to decode requests.
|
||||||
|
|||||||
@@ -340,13 +340,16 @@ impl RegionFlushTask {
|
|||||||
let memtables = version.memtables.immutables();
|
let memtables = version.memtables.immutables();
|
||||||
let mut file_metas = Vec::with_capacity(memtables.len());
|
let mut file_metas = Vec::with_capacity(memtables.len());
|
||||||
let mut flushed_bytes = 0;
|
let mut flushed_bytes = 0;
|
||||||
|
let mut series_count = 0;
|
||||||
for mem in memtables {
|
for mem in memtables {
|
||||||
if mem.is_empty() {
|
if mem.is_empty() {
|
||||||
// Skip empty memtables.
|
// Skip empty memtables.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
let max_sequence = mem.stats().max_sequence();
|
let stats = mem.stats();
|
||||||
|
let max_sequence = stats.max_sequence();
|
||||||
|
series_count += stats.series_count();
|
||||||
let iter = mem.iter(None, None, None)?;
|
let iter = mem.iter(None, None, None)?;
|
||||||
let source = Source::Iter(iter);
|
let source = Source::Iter(iter);
|
||||||
|
|
||||||
@@ -396,10 +399,11 @@ impl RegionFlushTask {
|
|||||||
|
|
||||||
let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
|
let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect();
|
||||||
info!(
|
info!(
|
||||||
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, cost: {:?}s",
|
"Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}s",
|
||||||
self.region_id,
|
self.region_id,
|
||||||
self.reason.as_str(),
|
self.reason.as_str(),
|
||||||
file_ids,
|
file_ids,
|
||||||
|
series_count,
|
||||||
timer.stop_and_record(),
|
timer.stop_and_record(),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -86,6 +86,8 @@ pub struct MemtableStats {
|
|||||||
num_ranges: usize,
|
num_ranges: usize,
|
||||||
/// The maximum sequence number in the memtable.
|
/// The maximum sequence number in the memtable.
|
||||||
max_sequence: SequenceNumber,
|
max_sequence: SequenceNumber,
|
||||||
|
/// Number of estimated timeseries in memtable.
|
||||||
|
series_count: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MemtableStats {
|
impl MemtableStats {
|
||||||
@@ -120,6 +122,11 @@ impl MemtableStats {
|
|||||||
pub fn max_sequence(&self) -> SequenceNumber {
|
pub fn max_sequence(&self) -> SequenceNumber {
|
||||||
self.max_sequence
|
self.max_sequence
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Series count in memtable.
|
||||||
|
pub fn series_count(&self) -> usize {
|
||||||
|
self.series_count
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
|
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
|
||||||
|
|||||||
@@ -227,6 +227,7 @@ impl Memtable for PartitionTreeMemtable {
|
|||||||
num_rows: 0,
|
num_rows: 0,
|
||||||
num_ranges: 0,
|
num_ranges: 0,
|
||||||
max_sequence: 0,
|
max_sequence: 0,
|
||||||
|
series_count: 0,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -241,12 +242,14 @@ impl Memtable for PartitionTreeMemtable {
|
|||||||
.expect("Timestamp column must have timestamp type");
|
.expect("Timestamp column must have timestamp type");
|
||||||
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
|
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
|
||||||
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
|
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
|
||||||
|
let series_count = self.tree.series_count();
|
||||||
MemtableStats {
|
MemtableStats {
|
||||||
estimated_bytes,
|
estimated_bytes,
|
||||||
time_range: Some((min_timestamp, max_timestamp)),
|
time_range: Some((min_timestamp, max_timestamp)),
|
||||||
num_rows: self.num_rows.load(Ordering::Relaxed),
|
num_rows: self.num_rows.load(Ordering::Relaxed),
|
||||||
num_ranges: 1,
|
num_ranges: 1,
|
||||||
max_sequence: self.max_sequence.load(Ordering::Relaxed),
|
max_sequence: self.max_sequence.load(Ordering::Relaxed),
|
||||||
|
series_count,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -308,6 +308,10 @@ impl Partition {
|
|||||||
.map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
|
.map(|meta| meta.column_schema.name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME)
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn series_count(&self) -> usize {
|
||||||
|
self.inner.read().unwrap().series_count()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) struct PartitionStats {
|
pub(crate) struct PartitionStats {
|
||||||
@@ -577,4 +581,9 @@ impl Inner {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns count of timeseries.
|
||||||
|
fn series_count(&self) -> usize {
|
||||||
|
self.pk_to_pk_id.len()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -439,6 +439,16 @@ impl PartitionTree {
|
|||||||
metrics.partitions_after_pruning = pruned.len();
|
metrics.partitions_after_pruning = pruned.len();
|
||||||
pruned
|
pruned
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns all series count in all partitions.
|
||||||
|
pub(crate) fn series_count(&self) -> usize {
|
||||||
|
self.partitions
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.values()
|
||||||
|
.map(|p| p.series_count())
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ use crate::memtable::{
|
|||||||
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange,
|
AllocTracker, BoxedBatchIterator, IterBuilder, KeyValues, Memtable, MemtableId, MemtableRange,
|
||||||
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
|
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
|
||||||
};
|
};
|
||||||
|
use crate::metrics::MEMTABLE_ACTIVE_SERIES_COUNT;
|
||||||
use crate::read::dedup::LastNonNullIter;
|
use crate::read::dedup::LastNonNullIter;
|
||||||
use crate::read::scan_region::PredicateGroup;
|
use crate::read::scan_region::PredicateGroup;
|
||||||
use crate::read::Batch;
|
use crate::read::Batch;
|
||||||
@@ -52,6 +53,12 @@ pub struct SimpleBulkMemtable {
|
|||||||
series: RwLock<Series>,
|
series: RwLock<Series>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Drop for SimpleBulkMemtable {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
MEMTABLE_ACTIVE_SERIES_COUNT.dec();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl SimpleBulkMemtable {
|
impl SimpleBulkMemtable {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
id: MemtableId,
|
id: MemtableId,
|
||||||
@@ -278,6 +285,7 @@ impl Memtable for SimpleBulkMemtable {
|
|||||||
num_rows: 0,
|
num_rows: 0,
|
||||||
num_ranges: 0,
|
num_ranges: 0,
|
||||||
max_sequence: 0,
|
max_sequence: 0,
|
||||||
|
series_count: 0,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
let ts_type = self
|
let ts_type = self
|
||||||
@@ -296,6 +304,7 @@ impl Memtable for SimpleBulkMemtable {
|
|||||||
num_rows,
|
num_rows,
|
||||||
num_ranges: 1,
|
num_ranges: 1,
|
||||||
max_sequence: self.max_sequence.load(Ordering::Relaxed),
|
max_sequence: self.max_sequence.load(Ordering::Relaxed),
|
||||||
|
series_count: 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -55,7 +55,10 @@ use crate::memtable::{
|
|||||||
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
|
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
|
||||||
PredicateGroup,
|
PredicateGroup,
|
||||||
};
|
};
|
||||||
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
use crate::metrics::{
|
||||||
|
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT, MEMTABLE_ACTIVE_SERIES_COUNT, READ_ROWS_TOTAL,
|
||||||
|
READ_STAGE_ELAPSED,
|
||||||
|
};
|
||||||
use crate::read::dedup::LastNonNullIter;
|
use crate::read::dedup::LastNonNullIter;
|
||||||
use crate::read::{Batch, BatchBuilder, BatchColumn};
|
use crate::read::{Batch, BatchBuilder, BatchColumn};
|
||||||
use crate::region::options::MergeMode;
|
use crate::region::options::MergeMode;
|
||||||
@@ -316,7 +319,7 @@ impl Memtable for TimeSeriesMemtable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn is_empty(&self) -> bool {
|
fn is_empty(&self) -> bool {
|
||||||
self.series_set.series.read().unwrap().is_empty()
|
self.series_set.series.read().unwrap().0.is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn freeze(&self) -> Result<()> {
|
fn freeze(&self) -> Result<()> {
|
||||||
@@ -336,6 +339,7 @@ impl Memtable for TimeSeriesMemtable {
|
|||||||
num_rows: 0,
|
num_rows: 0,
|
||||||
num_ranges: 0,
|
num_ranges: 0,
|
||||||
max_sequence: 0,
|
max_sequence: 0,
|
||||||
|
series_count: 0,
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
let ts_type = self
|
let ts_type = self
|
||||||
@@ -348,12 +352,14 @@ impl Memtable for TimeSeriesMemtable {
|
|||||||
.expect("Timestamp column must have timestamp type");
|
.expect("Timestamp column must have timestamp type");
|
||||||
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
|
let max_timestamp = ts_type.create_timestamp(self.max_timestamp.load(Ordering::Relaxed));
|
||||||
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
|
let min_timestamp = ts_type.create_timestamp(self.min_timestamp.load(Ordering::Relaxed));
|
||||||
|
let series_count = self.series_set.series.read().unwrap().0.len();
|
||||||
MemtableStats {
|
MemtableStats {
|
||||||
estimated_bytes,
|
estimated_bytes,
|
||||||
time_range: Some((min_timestamp, max_timestamp)),
|
time_range: Some((min_timestamp, max_timestamp)),
|
||||||
num_rows: self.num_rows.load(Ordering::Relaxed),
|
num_rows: self.num_rows.load(Ordering::Relaxed),
|
||||||
num_ranges: 1,
|
num_ranges: 1,
|
||||||
max_sequence: self.max_sequence.load(Ordering::Relaxed),
|
max_sequence: self.max_sequence.load(Ordering::Relaxed),
|
||||||
|
series_count,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -368,13 +374,27 @@ impl Memtable for TimeSeriesMemtable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
|
#[derive(Default)]
|
||||||
|
struct SeriesMap(BTreeMap<Vec<u8>, Arc<RwLock<Series>>>);
|
||||||
|
|
||||||
|
impl Drop for SeriesMap {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let num_series = self.0.len();
|
||||||
|
let num_field_builders = self
|
||||||
|
.0
|
||||||
|
.values()
|
||||||
|
.map(|v| v.read().unwrap().active.num_field_builders())
|
||||||
|
.sum::<usize>();
|
||||||
|
MEMTABLE_ACTIVE_SERIES_COUNT.sub(num_series as i64);
|
||||||
|
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.sub(num_field_builders as i64);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct SeriesSet {
|
pub(crate) struct SeriesSet {
|
||||||
pub(crate) region_metadata: RegionMetadataRef,
|
region_metadata: RegionMetadataRef,
|
||||||
pub(crate) series: Arc<SeriesRwLockMap>,
|
series: Arc<RwLock<SeriesMap>>,
|
||||||
pub(crate) codec: Arc<DensePrimaryKeyCodec>,
|
codec: Arc<DensePrimaryKeyCodec>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SeriesSet {
|
impl SeriesSet {
|
||||||
@@ -390,7 +410,7 @@ impl SeriesSet {
|
|||||||
impl SeriesSet {
|
impl SeriesSet {
|
||||||
/// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
|
/// Push [KeyValue] to SeriesSet with given primary key and return key/value allocated memory size.
|
||||||
fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
|
fn push_to_series(&self, primary_key: Vec<u8>, kv: &KeyValue) -> (usize, usize) {
|
||||||
if let Some(series) = self.series.read().unwrap().get(&primary_key) {
|
if let Some(series) = self.series.read().unwrap().0.get(&primary_key) {
|
||||||
let value_allocated = series.write().unwrap().push(
|
let value_allocated = series.write().unwrap().push(
|
||||||
kv.timestamp(),
|
kv.timestamp(),
|
||||||
kv.sequence(),
|
kv.sequence(),
|
||||||
@@ -401,7 +421,7 @@ impl SeriesSet {
|
|||||||
};
|
};
|
||||||
|
|
||||||
let mut indices = self.series.write().unwrap();
|
let mut indices = self.series.write().unwrap();
|
||||||
match indices.entry(primary_key) {
|
match indices.0.entry(primary_key) {
|
||||||
Entry::Vacant(v) => {
|
Entry::Vacant(v) => {
|
||||||
let key_len = v.key().len();
|
let key_len = v.key().len();
|
||||||
let mut series = Series::new(&self.region_metadata);
|
let mut series = Series::new(&self.region_metadata);
|
||||||
@@ -425,7 +445,7 @@ impl SeriesSet {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
|
fn get_series(&self, primary_key: &[u8]) -> Option<Arc<RwLock<Series>>> {
|
||||||
self.series.read().unwrap().get(primary_key).cloned()
|
self.series.read().unwrap().0.get(primary_key).cloned()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Iterates all series in [SeriesSet].
|
/// Iterates all series in [SeriesSet].
|
||||||
@@ -492,7 +512,7 @@ struct Metrics {
|
|||||||
|
|
||||||
struct Iter {
|
struct Iter {
|
||||||
metadata: RegionMetadataRef,
|
metadata: RegionMetadataRef,
|
||||||
series: Arc<SeriesRwLockMap>,
|
series: Arc<RwLock<SeriesMap>>,
|
||||||
projection: HashSet<ColumnId>,
|
projection: HashSet<ColumnId>,
|
||||||
last_key: Option<Vec<u8>>,
|
last_key: Option<Vec<u8>>,
|
||||||
predicate: Vec<SimpleFilterEvaluator>,
|
predicate: Vec<SimpleFilterEvaluator>,
|
||||||
@@ -508,7 +528,7 @@ impl Iter {
|
|||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub(crate) fn try_new(
|
pub(crate) fn try_new(
|
||||||
metadata: RegionMetadataRef,
|
metadata: RegionMetadataRef,
|
||||||
series: Arc<SeriesRwLockMap>,
|
series: Arc<RwLock<SeriesMap>>,
|
||||||
projection: HashSet<ColumnId>,
|
projection: HashSet<ColumnId>,
|
||||||
predicate: Option<Predicate>,
|
predicate: Option<Predicate>,
|
||||||
pk_schema: arrow::datatypes::SchemaRef,
|
pk_schema: arrow::datatypes::SchemaRef,
|
||||||
@@ -565,10 +585,10 @@ impl Iterator for Iter {
|
|||||||
let start = Instant::now();
|
let start = Instant::now();
|
||||||
let map = self.series.read().unwrap();
|
let map = self.series.read().unwrap();
|
||||||
let range = match &self.last_key {
|
let range = match &self.last_key {
|
||||||
None => map.range::<Vec<u8>, _>(..),
|
None => map.0.range::<Vec<u8>, _>(..),
|
||||||
Some(last_key) => {
|
Some(last_key) => map
|
||||||
map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
|
.0
|
||||||
}
|
.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded)),
|
||||||
};
|
};
|
||||||
|
|
||||||
// TODO(hl): maybe yield more than one time series to amortize range overhead.
|
// TODO(hl): maybe yield more than one time series to amortize range overhead.
|
||||||
@@ -668,6 +688,7 @@ pub(crate) struct Series {
|
|||||||
|
|
||||||
impl Series {
|
impl Series {
|
||||||
pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
|
pub(crate) fn with_capacity(region_metadata: &RegionMetadataRef, builder_cap: usize) -> Self {
|
||||||
|
MEMTABLE_ACTIVE_SERIES_COUNT.inc();
|
||||||
Self {
|
Self {
|
||||||
pk_cache: None,
|
pk_cache: None,
|
||||||
active: ValueBuilder::new(region_metadata, builder_cap),
|
active: ValueBuilder::new(region_metadata, builder_cap),
|
||||||
@@ -788,7 +809,6 @@ impl ValueBuilder {
|
|||||||
.map(|c| c.column_schema.data_type.clone())
|
.map(|c| c.column_schema.data_type.clone())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let fields = (0..field_types.len()).map(|_| None).collect();
|
let fields = (0..field_types.len()).map(|_| None).collect();
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
timestamp: Vec::with_capacity(capacity),
|
timestamp: Vec::with_capacity(capacity),
|
||||||
timestamp_type,
|
timestamp_type,
|
||||||
@@ -799,6 +819,11 @@ impl ValueBuilder {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns number of field builders.
|
||||||
|
pub fn num_field_builders(&self) -> usize {
|
||||||
|
self.fields.iter().flatten().count()
|
||||||
|
}
|
||||||
|
|
||||||
/// Pushes a new row to `ValueBuilder`.
|
/// Pushes a new row to `ValueBuilder`.
|
||||||
/// We don't need primary keys since they've already be encoded.
|
/// We don't need primary keys since they've already be encoded.
|
||||||
/// Returns the size of field values.
|
/// Returns the size of field values.
|
||||||
@@ -842,6 +867,7 @@ impl ValueBuilder {
|
|||||||
mutable_vector.push_nulls(num_rows - 1);
|
mutable_vector.push_nulls(num_rows - 1);
|
||||||
let _ = mutable_vector.push(field_value);
|
let _ = mutable_vector.push(field_value);
|
||||||
self.fields[idx] = Some(mutable_vector);
|
self.fields[idx] = Some(mutable_vector);
|
||||||
|
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.inc();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -1028,6 +1054,7 @@ impl From<ValueBuilder> for Values {
|
|||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(i, v)| {
|
.map(|(i, v)| {
|
||||||
if let Some(v) = v {
|
if let Some(v) = v {
|
||||||
|
MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT.dec();
|
||||||
v.finish()
|
v.finish()
|
||||||
} else {
|
} else {
|
||||||
let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
|
let mut single_null = value.field_types[i].create_mutable_vector(num_rows);
|
||||||
|
|||||||
@@ -414,6 +414,18 @@ lazy_static! {
|
|||||||
"greptime_mito_compaction_output_bytes",
|
"greptime_mito_compaction_output_bytes",
|
||||||
"mito compaction output file size",
|
"mito compaction output file size",
|
||||||
).unwrap();
|
).unwrap();
|
||||||
|
|
||||||
|
/// Active series count in TimeSeriesMemtable
|
||||||
|
pub static ref MEMTABLE_ACTIVE_SERIES_COUNT: IntGauge = register_int_gauge!(
|
||||||
|
"greptime_mito_memtable_active_series_count",
|
||||||
|
"active time series count in TimeSeriesMemtable",
|
||||||
|
).unwrap();
|
||||||
|
|
||||||
|
/// Active field builder count in TimeSeriesMemtable
|
||||||
|
pub static ref MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT: IntGauge = register_int_gauge!(
|
||||||
|
"greptime_mito_memtable_field_builder_count",
|
||||||
|
"active field builder count in TimeSeriesMemtable",
|
||||||
|
).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Stager notifier to collect metrics.
|
/// Stager notifier to collect metrics.
|
||||||
|
|||||||
Reference in New Issue
Block a user