diff --git a/Cargo.lock b/Cargo.lock index a3d70a001a..a1818a6b9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5307,6 +5307,7 @@ dependencies = [ "futures", "key-lock", "log-store", + "metrics", "object-store", "serde", "serde_json", diff --git a/src/mito/Cargo.toml b/src/mito/Cargo.toml index f0c2647285..3638187217 100644 --- a/src/mito/Cargo.toml +++ b/src/mito/Cargo.toml @@ -21,6 +21,7 @@ common-query = { path = "../common/query" } common-recordbatch = { path = "../common/recordbatch" } common-datasource = { path = "../common/datasource" } common-telemetry = { path = "../common/telemetry" } +common-test-util = { path = "../common/test-util", optional = true } common-time = { path = "../common/time" } dashmap = "5.4" datafusion.workspace = true @@ -29,6 +30,7 @@ datatypes = { path = "../datatypes" } futures.workspace = true key-lock = "0.1" log-store = { path = "../log-store" } +metrics.workspace = true object-store = { path = "../object-store" } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" @@ -36,7 +38,6 @@ snafu.workspace = true storage = { path = "../storage" } store-api = { path = "../store-api" } table = { path = "../table" } -common-test-util = { path = "../common/test-util", optional = true } tokio.workspace = true [dev-dependencies] diff --git a/src/mito/src/metrics.rs b/src/mito/src/metrics.rs index 63137279c4..1149206432 100644 --- a/src/mito/src/metrics.rs +++ b/src/mito/src/metrics.rs @@ -25,3 +25,7 @@ pub const MITO_CREATE_TABLE_UPDATE_MANIFEST_ELAPSED: &str = pub const MITO_OPEN_TABLE_ELAPSED: &str = "datanode.mito.open_table"; /// Elapsed time of altering tables pub const MITO_ALTER_TABLE_ELAPSED: &str = "datanode.mito.alter_table"; +/// Elapsed time of insertion +pub const MITO_INSERT_ELAPSED: &str = "datanode.mito.insert"; +/// Insert batch size. +pub const MITO_INSERT_BATCH_SIZE: &str = "datanode.mito.insert_batch_size"; diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index 849e5a702d..4b758785c1 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -29,6 +29,7 @@ use common_recordbatch::error::ExternalSnafu; use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream}; use common_telemetry::{info, logging}; use datatypes::schema::Schema; +use metrics::histogram; use object_store::ObjectStore; use snafu::{ensure, OptionExt, ResultExt}; use store_api::manifest::{self, Manifest, ManifestVersion, MetaActionIterator}; @@ -57,6 +58,8 @@ use crate::error::{ }; use crate::manifest::action::*; use crate::manifest::TableManifest; +use crate::metrics::{MITO_INSERT_BATCH_SIZE, MITO_INSERT_ELAPSED}; + #[inline] fn table_manifest_dir(table_dir: &str) -> String { assert!(table_dir.ends_with('/')); @@ -83,6 +86,8 @@ impl Table for MitoTable { } async fn insert(&self, request: InsertRequest) -> TableResult { + let _timer = common_telemetry::timer!(MITO_INSERT_ELAPSED); + if request.columns_values.is_empty() { return Ok(0); } @@ -105,6 +110,8 @@ impl Table for MitoTable { // columns_values is not empty, it's safe to unwrap let rows_num = columns_values.values().next().unwrap().len(); + histogram!(MITO_INSERT_BATCH_SIZE, rows_num as f64); + logging::trace!( "Insert into table {} region {} with data: {:?}", self.table_info().name, diff --git a/src/storage/src/flush.rs b/src/storage/src/flush.rs index a5bd8f9149..2b91774c79 100644 --- a/src/storage/src/flush.rs +++ b/src/storage/src/flush.rs @@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use common_telemetry::{logging, timer}; +use metrics::counter; pub use picker::{FlushPicker, PickerConfig}; pub use scheduler::{ FlushHandle, FlushRegionRequest, FlushRequest, FlushScheduler, FlushSchedulerRef, @@ -32,7 +33,7 @@ use crate::error::Result; use crate::manifest::action::*; use crate::manifest::region::RegionManifest; use crate::memtable::{IterContext, MemtableId, MemtableRef}; -use crate::metrics::FLUSH_ELAPSED; +use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED}; use crate::region::{RegionWriterRef, SharedDataRef}; use crate::sst::{AccessLayerRef, FileId, FileMeta, Source, SstInfo, WriteOptions}; use crate::wal::Wal; @@ -297,6 +298,9 @@ impl FlushJob { .flatten() .collect(); + let flush_bytes = metas.iter().map(|f| f.file_size).sum(); + counter!(FLUSH_BYTES_TOTAL, flush_bytes); + let file_ids = metas.iter().map(|f| f.file_id).collect::>(); logging::info!("Successfully flush memtables, region:{region_id}, files: {file_ids:?}"); Ok(metas) diff --git a/src/storage/src/manifest/region.rs b/src/storage/src/manifest/region.rs index 337b32077f..5938134870 100644 --- a/src/storage/src/manifest/region.rs +++ b/src/storage/src/manifest/region.rs @@ -341,12 +341,13 @@ mod tests { builder.root(&tmp_dir.path().to_string_lossy()); let object_store = ObjectStore::new(builder).unwrap().finish(); + let test_gc_duration = Duration::from_millis(50); let manifest = RegionManifest::with_checkpointer( "/manifest/", object_store, manifest_compress_type(compress), None, - Some(Duration::from_millis(50)), + Some(test_gc_duration), ); manifest.start().await.unwrap(); @@ -492,7 +493,7 @@ mod tests { ); // wait for gc - tokio::time::sleep(Duration::from_millis(60)).await; + tokio::time::sleep(test_gc_duration * 3).await; for v in checkpoint_versions { if v < 4 { diff --git a/src/storage/src/memtable/inserter.rs b/src/storage/src/memtable/inserter.rs index 299f0d64a2..87cd702754 100644 --- a/src/storage/src/memtable/inserter.rs +++ b/src/storage/src/memtable/inserter.rs @@ -17,6 +17,7 @@ use store_api::storage::{OpType, SequenceNumber}; use super::MemtableRef; use crate::error::Result; use crate::memtable::KeyValues; +use crate::metrics::MEMTABLE_WRITE_ELAPSED; use crate::write_batch::{Mutation, Payload}; /// Wraps logic of inserting key/values in [WriteBatch] to [Memtable]. @@ -40,6 +41,8 @@ impl Inserter { /// Won't do schema validation if not configured. Caller (mostly the [`RegionWriter`]) should ensure the /// schemas of `memtable` are consistent with `payload`'s. pub fn insert_memtable(&mut self, payload: &Payload, memtable: &MemtableRef) -> Result<()> { + let _timer = common_telemetry::timer!(MEMTABLE_WRITE_ELAPSED); + if payload.is_empty() { return Ok(()); } diff --git a/src/storage/src/metrics.rs b/src/storage/src/metrics.rs index 4667eb64fb..ce70a14b9a 100644 --- a/src/storage/src/metrics.rs +++ b/src/storage/src/metrics.rs @@ -22,6 +22,8 @@ pub const FLUSH_REQUESTS_TOTAL: &str = "storage.flush.requests_total"; pub const FLUSH_ERRORS_TOTAL: &str = "storage.flush.errors_total"; /// Elapsed time of a flush job. pub const FLUSH_ELAPSED: &str = "storage.flush.elapsed"; +/// Counter of flushed bytes. +pub const FLUSH_BYTES_TOTAL: &str = "storage.flush.bytes_total"; /// Reason to flush. pub const FLUSH_REASON: &str = "reason"; /// Gauge for open regions @@ -32,3 +34,7 @@ pub const LOG_STORE_WRITE_ELAPSED: &str = "storage.logstore.write.elapsed"; pub const COMPACT_ELAPSED: &str = "storage.compact.elapsed"; /// Global write buffer size in bytes. pub const WRITE_BUFFER_BYTES: &str = "storage.write_buffer_bytes"; +/// Elapsed time of inserting memtable. +pub const MEMTABLE_WRITE_ELAPSED: &str = "storage.memtable.write.elapsed"; +/// Elapsed time of preprocessing write batch. +pub const PREPROCESS_ELAPSED: &str = "storage.write.preprocess.elapsed"; diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 473fa02150..fc366c38b6 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -39,7 +39,7 @@ use crate::manifest::action::{ }; use crate::memtable::{Inserter, MemtableBuilderRef, MemtableId, MemtableRef}; use crate::metadata::RegionMetadataRef; -use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL}; +use crate::metrics::{FLUSH_REASON, FLUSH_REQUESTS_TOTAL, PREPROCESS_ELAPSED}; use crate::proto::wal::WalHeader; use crate::region::{ CompactContext, RecoverdMetadata, RecoveredMetadataMap, RegionManifest, SharedDataRef, @@ -670,6 +670,8 @@ impl WriterInner { &mut self, writer_ctx: &WriterContext<'_, S>, ) -> Result<()> { + let _timer = common_telemetry::timer!(PREPROCESS_ELAPSED); + let version_control = writer_ctx.version_control(); // Check whether memtable is full or flush should be triggered. We need to do this first since // switching memtables will clear all mutable memtables.