diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index 6efa0c67b3..ef60fea1cb 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; @@ -28,6 +29,7 @@ use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig}; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; +use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED}; use crate::read::Source; use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, FileId, FileMeta, RegionFileId}; @@ -43,6 +45,87 @@ pub type AccessLayerRef = Arc; /// SST write results. pub type SstInfoArray = SmallVec<[SstInfo; 2]>; +/// Write operation type. +#[derive(Eq, PartialEq, Debug)] +pub enum WriteType { + /// Writes from flush + Flush, + /// Writes from compaction. + Compaction, +} + +#[derive(Debug)] +pub struct Metrics { + pub(crate) write_type: WriteType, + pub(crate) iter_source: Duration, + pub(crate) write_batch: Duration, + pub(crate) update_index: Duration, + pub(crate) upload_parquet: Duration, + pub(crate) upload_puffin: Duration, +} + +impl Metrics { + pub(crate) fn new(write_type: WriteType) -> Self { + Self { + write_type, + iter_source: Default::default(), + write_batch: Default::default(), + update_index: Default::default(), + upload_parquet: Default::default(), + upload_puffin: Default::default(), + } + } + + pub(crate) fn merge(mut self, other: Self) -> Self { + assert_eq!(self.write_type, other.write_type); + self.iter_source += other.iter_source; + self.write_batch += other.write_batch; + self.update_index += other.update_index; + self.upload_parquet += other.upload_parquet; + self.upload_puffin += other.upload_puffin; + self + } + + pub(crate) fn observe(self) { + match self.write_type { + WriteType::Flush => { + FLUSH_ELAPSED + .with_label_values(&["iter_source"]) + .observe(self.iter_source.as_secs_f64()); + FLUSH_ELAPSED + .with_label_values(&["write_batch"]) + .observe(self.write_batch.as_secs_f64()); + FLUSH_ELAPSED + .with_label_values(&["update_index"]) + .observe(self.update_index.as_secs_f64()); + FLUSH_ELAPSED + .with_label_values(&["upload_parquet"]) + .observe(self.upload_parquet.as_secs_f64()); + FLUSH_ELAPSED + .with_label_values(&["upload_puffin"]) + .observe(self.upload_puffin.as_secs_f64()); + } + WriteType::Compaction => { + COMPACTION_STAGE_ELAPSED + .with_label_values(&["iter_source"]) + .observe(self.iter_source.as_secs_f64()); + COMPACTION_STAGE_ELAPSED + .with_label_values(&["write_batch"]) + .observe(self.write_batch.as_secs_f64()); + COMPACTION_STAGE_ELAPSED + .with_label_values(&["update_index"]) + .observe(self.update_index.as_secs_f64()); + COMPACTION_STAGE_ELAPSED + .with_label_values(&["upload_parquet"]) + .observe(self.upload_parquet.as_secs_f64()); + COMPACTION_STAGE_ELAPSED + .with_label_values(&["upload_puffin"]) + .observe(self.upload_puffin.as_secs_f64()); + } + }; + } +} + /// A layer to access SST files under the same directory. pub struct AccessLayer { table_dir: String, @@ -145,11 +228,12 @@ impl AccessLayer { &self, request: SstWriteRequest, write_opts: &WriteOptions, - ) -> Result { + write_type: WriteType, + ) -> Result<(SstInfoArray, Metrics)> { let region_id = request.metadata.region_id; let cache_manager = request.cache_manager.clone(); - let sst_info = if let Some(write_cache) = cache_manager.write_cache() { + let (sst_info, metrics) = if let Some(write_cache) = cache_manager.write_cache() { // Write to the write cache. write_cache .write_and_upload_sst( @@ -162,6 +246,7 @@ impl AccessLayer { remote_store: self.object_store.clone(), }, write_opts, + write_type, ) .await? } else { @@ -190,12 +275,15 @@ impl AccessLayer { request.metadata, indexer_builder, path_provider, + Metrics::new(write_type), ) .await .with_file_cleaner(cleaner); - writer + let ssts = writer .write_all(request.source, request.max_sequence, write_opts) - .await? + .await?; + let metrics = writer.into_metrics(); + (ssts, metrics) }; // Put parquet metadata to cache manager. @@ -210,7 +298,7 @@ impl AccessLayer { } } - Ok(sst_info) + Ok((sst_info, metrics)) } } diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 82bf5b94eb..0132a3de1c 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -15,7 +15,7 @@ //! A write-through cache for remote object stores. use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use common_base::readable_size::ReadableSize; use common_telemetry::{debug, info}; @@ -25,14 +25,13 @@ use snafu::ResultExt; use store_api::storage::RegionId; use crate::access_layer::{ - new_fs_cache_store, FilePathProvider, RegionFilePathFactory, SstInfoArray, SstWriteRequest, - TempFileCleaner, WriteCachePathProvider, + new_fs_cache_store, FilePathProvider, Metrics, RegionFilePathFactory, SstInfoArray, + SstWriteRequest, TempFileCleaner, WriteCachePathProvider, WriteType, }; use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, IndexValue}; use crate::error::{self, Result}; use crate::metrics::{ - FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, - WRITE_CACHE_DOWNLOAD_ELAPSED, + UPLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL, WRITE_CACHE_DOWNLOAD_ELAPSED, }; use crate::sst::file::RegionFileId; use crate::sst::index::intermediate::IntermediateManager; @@ -108,11 +107,8 @@ impl WriteCache { write_request: SstWriteRequest, upload_request: SstUploadRequest, write_opts: &WriteOptions, - ) -> Result { - let timer = FLUSH_ELAPSED - .with_label_values(&["write_sst"]) - .start_timer(); - + write_type: WriteType, + ) -> Result<(SstInfoArray, Metrics)> { let region_id = write_request.metadata.region_id; let store = self.file_cache.local_store(); @@ -138,6 +134,7 @@ impl WriteCache { write_request.metadata, indexer, path_provider.clone(), + Metrics::new(write_type), ) .await .with_file_cleaner(cleaner); @@ -145,12 +142,11 @@ impl WriteCache { let sst_info = writer .write_all(write_request.source, write_request.max_sequence, write_opts) .await?; - - timer.stop_and_record(); + let mut metrics = writer.into_metrics(); // Upload sst file to remote object store. if sst_info.is_empty() { - return Ok(sst_info); + return Ok((sst_info, metrics)); } let mut upload_tracker = UploadTracker::new(region_id); @@ -161,10 +157,12 @@ impl WriteCache { let parquet_path = upload_request .dest_path_provider .build_sst_file_path(RegionFileId::new(region_id, sst.file_id)); + let start = Instant::now(); if let Err(e) = self.upload(parquet_key, &parquet_path, remote_store).await { err = Some(e); break; } + metrics.upload_parquet += start.elapsed(); upload_tracker.push_uploaded_file(parquet_path); if sst.index_metadata.file_size > 0 { @@ -172,10 +170,12 @@ impl WriteCache { let puffin_path = upload_request .dest_path_provider .build_index_file_path(RegionFileId::new(region_id, sst.file_id)); + let start = Instant::now(); if let Err(e) = self.upload(puffin_key, &puffin_path, remote_store).await { err = Some(e); break; } + metrics.upload_puffin += start.elapsed(); upload_tracker.push_uploaded_file(puffin_path); } } @@ -188,7 +188,7 @@ impl WriteCache { return Err(err); } - Ok(sst_info) + Ok((sst_info, metrics)) } /// Removes a file from the cache by `index_key`. @@ -298,13 +298,7 @@ impl WriteCache { let file_type = index_key.file_type; let cache_path = self.file_cache.cache_file_path(index_key); - let timer = FLUSH_ELAPSED - .with_label_values(&[match file_type { - FileType::Parquet => "upload_parquet", - FileType::Puffin => "upload_puffin", - }]) - .start_timer(); - + let start = Instant::now(); let cached_value = self .file_cache .local_store() @@ -348,11 +342,11 @@ impl WriteCache { UPLOAD_BYTES_TOTAL.inc_by(bytes_written); debug!( - "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}s", + "Successfully upload file to remote, region: {}, file: {}, upload_path: {}, cost: {:?}", region_id, file_id, upload_path, - timer.stop_and_record() + start.elapsed(), ); let index_value = IndexValue { @@ -496,11 +490,11 @@ mod tests { }; // Write to cache and upload sst to mock remote store - let sst_info = write_cache - .write_and_upload_sst(write_request, upload_request, &write_opts) + let (mut sst_infos, _) = write_cache + .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush) .await - .unwrap() - .remove(0); //todo(hl): we assume it only creates one file. + .unwrap(); + let sst_info = sst_infos.remove(0); let file_id = sst_info.file_id; let sst_upload_path = @@ -591,11 +585,11 @@ mod tests { remote_store: mock_store.clone(), }; - let sst_info = write_cache - .write_and_upload_sst(write_request, upload_request, &write_opts) + let (mut sst_infos, _) = write_cache + .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush) .await - .unwrap() - .remove(0); + .unwrap(); + let sst_info = sst_infos.remove(0); let write_parquet_metadata = sst_info.file_metadata.unwrap(); // Read metadata from write cache @@ -671,7 +665,7 @@ mod tests { }; write_cache - .write_and_upload_sst(write_request, upload_request, &write_opts) + .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush) .await .unwrap_err(); let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 172c94a9df..e9259baf65 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -29,7 +29,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_request::PathType; use store_api::storage::RegionId; -use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest}; +use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::{new_picker, PickerOutput}; use crate::compaction::{find_ttl, CompactionSstReaderBuilder}; @@ -352,7 +352,7 @@ impl Compactor for DefaultCompactor { } .build_sst_reader() .await?; - let output_files = sst_layer + let (sst_infos, metrics) = sst_layer .write_sst( SstWriteRequest { op_type: OperationType::Compact, @@ -367,8 +367,10 @@ impl Compactor for DefaultCompactor { bloom_filter_index_config, }, &write_opts, + WriteType::Compaction, ) - .await? + .await?; + let output_files = sst_infos .into_iter() .map(|sst_info| FileMeta { region_id, @@ -386,9 +388,10 @@ impl Compactor for DefaultCompactor { let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(","); info!( - "Region {} compaction inputs: [{}], outputs: [{}]", - region_id, input_file_names, output_file_names + "Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}", + region_id, input_file_names, output_file_names, metrics ); + metrics.observe(); Ok(output_files) }); } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index e497c12357..dee6901c9a 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -25,7 +25,7 @@ use store_api::storage::RegionId; use strum::IntoStaticStr; use tokio::sync::{mpsc, watch}; -use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest}; +use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType}; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ @@ -345,6 +345,7 @@ impl RegionFlushTask { let mut file_metas = Vec::with_capacity(memtables.len()); let mut flushed_bytes = 0; let mut series_count = 0; + let mut flush_metrics = Metrics::new(WriteType::Flush); for mem in memtables { if mem.is_empty() { // Skip empty memtables. @@ -399,14 +400,15 @@ impl RegionFlushTask { bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(), }; - let ssts_written = self + let (ssts_written, metrics) = self .access_layer - .write_sst(write_request, &write_opts) + .write_sst(write_request, &write_opts, WriteType::Flush) .await?; if ssts_written.is_empty() { // No data written. continue; } + flush_metrics = flush_metrics.merge(metrics); file_metas.extend(ssts_written.into_iter().map(|sst_info| { flushed_bytes += sst_info.file_size; @@ -431,13 +433,15 @@ impl RegionFlushTask { let file_ids: Vec<_> = file_metas.iter().map(|f| f.file_id).collect(); info!( - "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}s", + "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, cost: {:?}, metrics: {:?}", self.region_id, self.reason.as_str(), file_ids, series_count, timer.stop_and_record(), + flush_metrics, ); + flush_metrics.observe(); let edit = RegionEdit { files_to_add: file_metas, diff --git a/src/mito2/src/metrics.rs b/src/mito2/src/metrics.rs index 745fc3bec4..39ae53f5ab 100644 --- a/src/mito2/src/metrics.rs +++ b/src/mito2/src/metrics.rs @@ -119,6 +119,14 @@ lazy_static! { // Compaction metrics /// Timer of different stages in compaction. + /// - pick + /// - merge (in parallel) + /// - iter_source + /// - write_batch + /// - update_index + /// - upload_parquet + /// - upload puffin + /// - write_manifest pub static ref COMPACTION_STAGE_ELAPSED: HistogramVec = register_histogram_vec!( "greptime_mito_compaction_stage_elapsed", "mito compaction stage elapsed", diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 0d7b0aa739..c591fb233c 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -106,7 +106,9 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; - use crate::access_layer::{FilePathProvider, OperationType, RegionFilePathFactory}; + use crate::access_layer::{ + FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType, + }; use crate::cache::{CacheManager, CacheStrategy, PageKey}; use crate::read::{BatchBuilder, BatchReader}; use crate::region::options::{IndexOptions, InvertedIndexOptions}; @@ -177,6 +179,7 @@ mod tests { metadata.clone(), NoopIndexBuilder, file_path, + Metrics::new(WriteType::Flush), ) .await; @@ -239,6 +242,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, + Metrics::new(WriteType::Flush), ) .await; @@ -318,6 +322,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, + Metrics::new(WriteType::Flush), ) .await; @@ -365,6 +370,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, + Metrics::new(WriteType::Flush), ) .await; writer @@ -422,6 +428,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, + Metrics::new(WriteType::Flush), ) .await; writer @@ -464,6 +471,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, + Metrics::new(WriteType::Flush), ) .await; @@ -617,6 +625,7 @@ mod tests { metadata.clone(), NoopIndexBuilder, path_provider, + Metrics::new(WriteType::Flush), ) .await; @@ -692,6 +701,7 @@ mod tests { metadata.clone(), indexer_builder, file_path.clone(), + Metrics::new(WriteType::Flush), ) .await; @@ -979,6 +989,7 @@ mod tests { metadata.clone(), NoopIndexBuilder, file_path, + Metrics::new(WriteType::Flush), ) .await; diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index fcceae23ec..eb79bbd5fe 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -20,6 +20,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Instant; use common_telemetry::debug; use common_time::Timestamp; @@ -38,7 +39,7 @@ use store_api::storage::SequenceNumber; use tokio::io::AsyncWrite; use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt}; -use crate::access_layer::{FilePathProvider, SstInfoArray, TempFileCleaner}; +use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner}; use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu}; use crate::read::{Batch, Source}; use crate::sst::file::{FileId, RegionFileId}; @@ -65,6 +66,8 @@ pub struct ParquetWriter, /// Cleaner to remove temp files on failure. file_cleaner: Option, + /// Write metrics + metrics: Metrics, } pub trait WriterFactory { @@ -100,12 +103,14 @@ where metadata: RegionMetadataRef, indexer_builder: I, path_provider: P, + metrics: Metrics, ) -> ParquetWriter { ParquetWriter::new( ObjectStoreWriterFactory { object_store }, metadata, indexer_builder, path_provider, + metrics, ) .await } @@ -128,6 +133,7 @@ where metadata: RegionMetadataRef, indexer_builder: I, path_provider: P, + metrics: Metrics, ) -> ParquetWriter { let init_file = FileId::random(); let indexer = indexer_builder.build(init_file).await; @@ -142,6 +148,7 @@ where current_indexer: Some(indexer), bytes_written: Arc::new(AtomicUsize::new(0)), file_cleaner: None, + metrics, } } @@ -234,12 +241,14 @@ where match res { Ok(mut batch) => { stats.update(&batch); + let start = Instant::now(); // safety: self.current_indexer must be set when first batch has been written. self.current_indexer .as_mut() .unwrap() .update(&mut batch) .await; + self.metrics.update_index += start.elapsed(); if let Some(max_file_size) = opts.max_file_size && self.bytes_written.load(Ordering::Relaxed) > max_file_size { @@ -286,16 +295,21 @@ where write_format: &WriteFormat, opts: &WriteOptions, ) -> Result> { + let start = Instant::now(); let Some(batch) = source.next_batch().await? else { return Ok(None); }; + self.metrics.iter_source += start.elapsed(); let arrow_batch = write_format.convert_batch(&batch)?; + + let start = Instant::now(); self.maybe_init_writer(write_format.arrow_schema(), opts) .await? .write(&arrow_batch) .await .context(WriteParquetSnafu)?; + self.metrics.write_batch += start.elapsed(); Ok(Some(batch)) } @@ -340,6 +354,11 @@ where Ok(self.writer.as_mut().unwrap()) } } + + /// Consumes write and return the collected metrics. + pub fn into_metrics(self) -> Metrics { + self.metrics + } } #[derive(Default)]