feat: Adds some metrics for write path and flush (#1726)

* feat: more metrics

* feat: Add preprocess elapsed

* chore(storage): rename metric

* test: fix tests
This commit is contained in:
Yingwen
2023-06-05 21:35:44 +08:00
committed by GitHub
parent 166fb8871e
commit 1b4976b077
9 changed files with 34 additions and 5 deletions

1
Cargo.lock generated
View File

@@ -5307,6 +5307,7 @@ dependencies = [
"futures",
"key-lock",
"log-store",
"metrics",
"object-store",
"serde",
"serde_json",

View File

@@ -21,6 +21,7 @@ common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-datasource = { path = "../common/datasource" }
common-telemetry = { path = "../common/telemetry" }
common-test-util = { path = "../common/test-util", optional = true }
common-time = { path = "../common/time" }
dashmap = "5.4"
datafusion.workspace = true
@@ -29,6 +30,7 @@ datatypes = { path = "../datatypes" }
futures.workspace = true
key-lock = "0.1"
log-store = { path = "../log-store" }
metrics.workspace = true
object-store = { path = "../object-store" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
@@ -36,7 +38,6 @@ snafu.workspace = true
storage = { path = "../storage" }
store-api = { path = "../store-api" }
table = { path = "../table" }
common-test-util = { path = "../common/test-util", optional = true }
tokio.workspace = true
[dev-dependencies]

View File

@@ -25,3 +25,7 @@ pub const MITO_CREATE_TABLE_UPDATE_MANIFEST_ELAPSED: &str =
pub const MITO_OPEN_TABLE_ELAPSED: &str = "datanode.mito.open_table";
/// Elapsed time of altering tables
pub const MITO_ALTER_TABLE_ELAPSED: &str = "datanode.mito.alter_table";
/// Elapsed time of insertion
pub const MITO_INSERT_ELAPSED: &str = "datanode.mito.insert";
/// Insert batch size.
pub const MITO_INSERT_BATCH_SIZE: &str = "datanode.mito.insert_batch_size";

View File

@@ -29,6 +29,7 @@ use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_telemetry::{info, logging};
use datatypes::schema::Schema;
use metrics::histogram;
use object_store::ObjectStore;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator};
@@ -57,6 +58,8 @@ use crate::error::{
};
use crate::manifest::action::*;
use crate::manifest::TableManifest;
use crate::metrics::{MITO_INSERT_BATCH_SIZE, MITO_INSERT_ELAPSED};
#[inline]
fn table_manifest_dir(table_dir: &str) -> String {
assert!(table_dir.ends_with('/'));
@@ -83,6 +86,8 @@ impl<R: Region> Table for MitoTable<R> {
}
async fn insert(&self, request: InsertRequest) -> TableResult<usize> {
let _timer = common_telemetry::timer!(MITO_INSERT_ELAPSED);
if request.columns_values.is_empty() {
return Ok(0);
}
@@ -105,6 +110,8 @@ impl<R: Region> Table for MitoTable<R> {
// columns_values is not empty, it's safe to unwrap
let rows_num = columns_values.values().next().unwrap().len();
histogram!(MITO_INSERT_BATCH_SIZE, rows_num as f64);
logging::trace!(
"Insert into table {} region {} with data: {:?}",
self.table_info().name,

View File

@@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use common_telemetry::{logging, timer};
use metrics::counter;
pub use picker::{FlushPicker, PickerConfig};
pub use scheduler::{
FlushHandle, FlushRegionRequest, FlushRequest, FlushScheduler, FlushSchedulerRef,
@@ -32,7 +33,7 @@ use crate::error::Result;
use crate::manifest::action::*;
use crate::manifest::region::RegionManifest;
use crate::memtable::{IterContext, MemtableId, MemtableRef};
use crate::metrics::FLUSH_ELAPSED;
use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED};
use crate::region::{RegionWriterRef, SharedDataRef};
use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions};
use crate::wal::Wal;
@@ -297,6 +298,9 @@ impl<S: LogStore> FlushJob<S> {
.flatten()
.collect();
let flush_bytes = metas.iter().map(|f| f.file_size).sum();
counter!(FLUSH_BYTES_TOTAL, flush_bytes);
let file_ids = metas.iter().map(|f| f.file_id).collect::<Vec<_>>();
logging::info!("Successfully flush memtables, region:{region_id}, files: {file_ids:?}");
Ok(metas)

View File

@@ -341,12 +341,13 @@ mod tests {
builder.root(&tmp_dir.path().to_string_lossy());
let object_store = ObjectStore::new(builder).unwrap().finish();
let test_gc_duration = Duration::from_millis(50);
let manifest = RegionManifest::with_checkpointer(
"/manifest/",
object_store,
manifest_compress_type(compress),
None,
Some(Duration::from_millis(50)),
Some(test_gc_duration),
);
manifest.start().await.unwrap();
@@ -492,7 +493,7 @@ mod tests {
);
// wait for gc
tokio::time::sleep(Duration::from_millis(60)).await;
tokio::time::sleep(test_gc_duration * 3).await;
for v in checkpoint_versions {
if v < 4 {

View File

@@ -17,6 +17,7 @@ use store_api::storage::{OpType, SequenceNumber};
use super::MemtableRef;
use crate::error::Result;
use crate::memtable::KeyValues;
use crate::metrics::MEMTABLE_WRITE_ELAPSED;
use crate::write_batch::{Mutation, Payload};
/// Wraps logic of inserting key/values in [WriteBatch] to [Memtable].
@@ -40,6 +41,8 @@ impl Inserter {
/// Won't do schema validation if not configured. Caller (mostly the [`RegionWriter`]) should ensure the
/// schemas of `memtable` are consistent with `payload`'s.
pub fn insert_memtable(&mut self, payload: &Payload, memtable: &MemtableRef) -> Result<()> {
let _timer = common_telemetry::timer!(MEMTABLE_WRITE_ELAPSED);
if payload.is_empty() {
return Ok(());
}

View File

@@ -22,6 +22,8 @@ pub const FLUSH_REQUESTS_TOTAL: &str = "storage.flush.requests_total";
pub const FLUSH_ERRORS_TOTAL: &str = "storage.flush.errors_total";
/// Elapsed time of a flush job.
pub const FLUSH_ELAPSED: &str = "storage.flush.elapsed";
/// Counter of flushed bytes.
pub const FLUSH_BYTES_TOTAL: &str = "storage.flush.bytes_total";
/// Reason to flush.
pub const FLUSH_REASON: &str = "reason";
/// Gauge for open regions
@@ -32,3 +34,7 @@ pub const LOG_STORE_WRITE_ELAPSED: &str = "storage.logstore.write.elapsed";
pub const COMPACT_ELAPSED: &str = "storage.compact.elapsed";
/// Global write buffer size in bytes.
pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes";
/// Elapsed time of inserting memtable.
pub const MEMTABLE_WRITE_ELAPSED: &str = "storage.memtable.write.elapsed";
/// Elapsed time of preprocessing write batch.
pub const PREPROCESS_ELAPSED: &str = "storage.write.preprocess.elapsed";

View File

@@ -39,7 +39,7 @@ use crate::manifest::action::{
};
use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::metadata::RegionMetadataRef;
use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL};
use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL, PREPROCESS_ELAPSED};
use crate::proto::wal::WalHeader;
use crate::region::{
CompactContext, RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef,
@@ -670,6 +670,8 @@ impl WriterInner {
&mut self,
writer_ctx: &WriterContext<'_, S>,
) -> Result<()> {
let _timer = common_telemetry::timer!(PREPROCESS_ELAPSED);
let version_control = writer_ctx.version_control();
// Check whether memtable is full or flush should be triggered. We need to do this first since
// switching memtables will clear all mutable memtables.