From d68215dc88ed1884000f9cfee67e9207117eb287 Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" Date: Tue, 23 Sep 2025 14:49:33 +0800 Subject: [PATCH] feat/objbench: ### Add `objbench` Binary and Enhance Metrics Collection - **New Binary**: Introduced a new binary `objbench` in `src/cmd/src/bin/objbench.rs` for benchmarking object store operations. - **Metrics Collection**: Enhanced metrics collection by adding a `Metrics` struct in `access_layer.rs` and integrating it into SST writing processes across multiple files, including `write_cache.rs`, `compactor.rs`, `flush Signed-off-by: Lei, HUANG --- Cargo.lock | 2 + src/cmd/Cargo.toml | 6 + src/cmd/src/bin/greptime.rs | 2 + src/cmd/src/bin/objbench.rs | 432 ++++++++++++++++++++++++++ src/mito2/src/access_layer.rs | 63 +++- src/mito2/src/cache/write_cache.rs | 8 +- src/mito2/src/compaction/compactor.rs | 3 +- src/mito2/src/flush.rs | 4 +- src/mito2/src/lib.rs | 6 + src/mito2/src/sst/parquet.rs | 13 +- src/mito2/src/sst/parquet/reader.rs | 1 - src/mito2/src/sst/parquet/writer.rs | 14 +- 12 files changed, 527 insertions(+), 27 deletions(-) create mode 100644 src/cmd/src/bin/objbench.rs diff --git a/Cargo.lock b/Cargo.lock index 047312b125..74168c5874 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1825,6 +1825,8 @@ dependencies = [ "mito2", "moka", "nu-ansi-term", + "object-store", + "parquet", "plugins", "prometheus", "prost 0.13.3", diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index c3328fbc8d..922eecb5ed 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -9,6 +9,10 @@ default-run = "greptime" name = "greptime" path = "src/bin/greptime.rs" +[[bin]] +name = "objbench" +path = "src/bin/objbench.rs" + [features] default = ["servers/pprof", "servers/mem-prof"] tokio-console = ["common-telemetry/tokio-console"] @@ -55,6 +59,8 @@ futures.workspace = true human-panic = "2.0" humantime.workspace = true lazy_static.workspace = true +object-store.workspace = true +parquet = "53" meta-client.workspace = true meta-srv.workspace = true metric-engine.workspace = true diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index 54659833fa..d8c15cad31 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -21,6 +21,8 @@ use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App}; use common_version::version; use servers::install_ring_crypto_provider; +pub mod objbench; + #[derive(Parser)] #[command(name = "greptime", author, version, long_version = version(), about)] #[command(propagate_version = true)] diff --git a/src/cmd/src/bin/objbench.rs b/src/cmd/src/bin/objbench.rs new file mode 100644 index 0000000000..a3e6357831 --- /dev/null +++ b/src/cmd/src/bin/objbench.rs @@ -0,0 +1,432 @@ +// Copyright 2025 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; + +use clap::Parser; +use cmd::error::{self, Result}; +use common_telemetry::info; +use datanode::config::ObjectStoreConfig; +use mito2::config::MitoConfig; +use mito2::read::Source; +use mito2::sst::file::{FileHandle, FileId, FileMeta}; +use mito2::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest}; +use mito2::sst::location; +use mito2::sst::parquet::{WriteOptions, PARQUET_METADATA_KEY}; +use mito2::{build_access_layer, Metrics, OperationType, SstWriteRequest}; +use object_store::ObjectStore; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; + +#[tokio::main] +pub async fn main() { + // common_telemetry::init_default_ut_logging(); + let cmd = Command::parse(); + cmd.run().await.unwrap(); +} + +#[derive(Debug, Parser)] +pub struct Command { + /// Path to the object-store config file (TOML). Must deserialize into datanode::config::ObjectStoreConfig. + #[clap(long, value_name = "FILE")] + pub config: PathBuf, + + /// Source SST file path in object-store (e.g. "region_dir/.parquet"). + #[clap(long, value_name = "PATH")] + pub source: String, + + /// Target SST file path in object-store; its parent directory is used as destination region dir. + #[clap(long, value_name = "PATH")] + pub target: String, +} + +impl Command { + pub async fn run(&self) -> Result<()> { + // Build object store from config + let cfg_str = std::fs::read_to_string(&self.config).map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("failed to read config {}: {e}", self.config.display()), + } + .build() + })?; + let store_cfg: ObjectStoreConfig = toml::from_str(&cfg_str).map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("failed to parse config {}: {e}", self.config.display()), + } + .build() + })?; + + let object_store = build_object_store(&store_cfg).await?; + + // Prepare source identifiers + let (src_region_dir, src_file_id) = split_sst_path(&self.source)?; + + // Load parquet metadata to extract RegionMetadata and file stats + let file_size = object_store + .stat(&self.source) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("stat failed: {e}"), + } + .build() + })? + .content_length(); + let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("read parquet metadata failed: {e}"), + } + .build() + })?; + + let region_meta = extract_region_metadata(&self.source, &parquet_meta)?; + let num_rows = parquet_meta.file_metadata().num_rows() as u64; + let num_row_groups = parquet_meta.num_row_groups() as u64; + + // Build a FileHandle for the source file + let file_meta = FileMeta { + region_id: region_meta.region_id, + file_id: src_file_id, + time_range: Default::default(), + level: 0, + file_size, + available_indexes: Default::default(), + index_file_size: 0, + num_rows, + num_row_groups, + sequence: None, + }; + let src_handle = FileHandle::new(file_meta, new_noop_file_purger()); + + // Build the reader for a single file via ParquetReaderBuilder + let (_src_access_layer, _cache_manager) = + build_access_layer_simple(src_region_dir.clone(), object_store.clone()).await?; + let reader_build_start = Instant::now(); + let reader = mito2::sst::parquet::reader::ParquetReaderBuilder::new( + src_region_dir.clone(), + src_handle.clone(), + object_store.clone(), + ) + .expected_metadata(Some(region_meta.clone())) + .build() + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("build reader failed: {e}"), + } + .build() + })?; + + let reader_build_elapsed = reader_build_start.elapsed(); + let total_rows = reader.parquet_metadata().file_metadata().num_rows(); + + // Prepare target access layer for writing + let (tgt_region_dir, _tgt_file_id_hint) = split_sst_path(&self.target)?; + let (tgt_access_layer, tgt_cache_manager) = + build_access_layer_simple(tgt_region_dir.clone(), object_store.clone()).await?; + + // Build write request + let write_opts = WriteOptions::default(); + let write_req = SstWriteRequest { + op_type: OperationType::Compact, + metadata: region_meta, + source: Source::Reader(Box::new(reader)), + cache_manager: tgt_cache_manager, + storage: None, + max_sequence: None, + index_options: Default::default(), + inverted_index_config: MitoConfig::default().inverted_index, + fulltext_index_config: MitoConfig::default().fulltext_index, + bloom_filter_index_config: MitoConfig::default().bloom_filter_index, + }; + + let mut metrics = Metrics::default(); + let infos = tgt_access_layer + .write_sst(write_req, &write_opts, &mut metrics) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("write_sst failed: {e}"), + } + .build() + })?; + + // Report results + println!( + "Read completed: rows={}, size={} bytes, build_reader={:?}, metrics: {:?}", + total_rows, file_size, reader_build_elapsed, metrics + ); + Ok(()) + } +} + +fn split_sst_path(path: &str) -> Result<(String, FileId)> { + let p = Path::new(path); + let file_name = p.file_name().and_then(|s| s.to_str()).ok_or_else(|| { + error::IllegalConfigSnafu { + msg: "invalid source path".to_string(), + } + .build() + })?; + let uuid_str = file_name.strip_suffix(".parquet").ok_or_else(|| { + error::IllegalConfigSnafu { + msg: "expect .parquet file".to_string(), + } + .build() + })?; + let file_id = FileId::parse_str(uuid_str).map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("invalid file id: {e}"), + } + .build() + })?; + let parent = p + .parent() + .and_then(|s| s.to_str()) + .unwrap_or("") + .to_string(); + Ok((parent, file_id)) +} + +fn extract_region_metadata( + file_path: &str, + meta: &parquet::file::metadata::ParquetMetaData, +) -> Result { + use parquet::format::KeyValue; + let kvs: Option<&Vec> = meta.file_metadata().key_value_metadata(); + let Some(kvs) = kvs else { + return Err(error::IllegalConfigSnafu { + msg: format!("{file_path}: missing parquet key_value metadata"), + } + .build()); + }; + let json = kvs + .iter() + .find(|kv| kv.key == PARQUET_METADATA_KEY) + .and_then(|kv| kv.value.as_ref()) + .ok_or_else(|| { + error::IllegalConfigSnafu { + msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"), + } + .build() + })?; + let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("invalid region metadata json: {e}"), + } + .build() + })?; + Ok(std::sync::Arc::new(region)) +} + +async fn build_object_store(cfg: &ObjectStoreConfig) -> Result { + use datanode::config::ObjectStoreConfig::*; + match cfg { + File(_) => { + use object_store::services::Fs; + let builder = Fs::default(); + Ok(ObjectStore::new(builder) + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("init fs backend failed: {e}"), + } + .build() + })? + .finish()) + } + S3(s3) => { + use common_base::secrets::ExposeSecret; + use object_store::services::S3; + use object_store::util; + let root = util::normalize_dir(&s3.root); + let mut builder = S3::default() + .root(&root) + .bucket(&s3.bucket) + .access_key_id(s3.access_key_id.expose_secret()) + .secret_access_key(s3.secret_access_key.expose_secret()); + if let Some(ep) = &s3.endpoint { + builder = builder.endpoint(ep); + } + if let Some(region) = &s3.region { + builder = builder.region(region); + } + if s3.enable_virtual_host_style { + builder = builder.enable_virtual_host_style(); + } + Ok(ObjectStore::new(builder) + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("init s3 backend failed: {e}"), + } + .build() + })? + .finish()) + } + Oss(oss) => { + use common_base::secrets::ExposeSecret; + use object_store::services::Oss; + use object_store::util; + let root = util::normalize_dir(&oss.root); + let builder = Oss::default() + .root(&root) + .bucket(&oss.bucket) + .endpoint(&oss.endpoint) + .access_key_id(oss.access_key_id.expose_secret()) + .access_key_secret(oss.access_key_secret.expose_secret()); + Ok(ObjectStore::new(builder) + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("init oss backend failed: {e}"), + } + .build() + })? + .finish()) + } + Azblob(az) => { + use common_base::secrets::ExposeSecret; + use object_store::services::Azblob; + use object_store::util; + let root = util::normalize_dir(&az.root); + let mut builder = Azblob::default() + .root(&root) + .container(&az.container) + .endpoint(&az.endpoint) + .account_name(az.account_name.expose_secret()) + .account_key(az.account_key.expose_secret()); + if let Some(token) = &az.sas_token { + builder = builder.sas_token(token); + } + Ok(ObjectStore::new(builder) + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("init azblob backend failed: {e}"), + } + .build() + })? + .finish()) + } + Gcs(gcs) => { + use common_base::secrets::ExposeSecret; + use object_store::services::Gcs; + use object_store::util; + let root = util::normalize_dir(&gcs.root); + let builder = Gcs::default() + .root(&root) + .bucket(&gcs.bucket) + .scope(&gcs.scope) + .credential_path(gcs.credential_path.expose_secret()) + .credential(gcs.credential.expose_secret()) + .endpoint(&gcs.endpoint); + Ok(ObjectStore::new(builder) + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("init gcs backend failed: {e}"), + } + .build() + })? + .finish()) + } + } +} + +async fn build_access_layer_simple( + region_dir: String, + object_store: ObjectStore, +) -> Result<( + std::sync::Arc, + std::sync::Arc, +)> { + // Minimal index aux path setup + let mut mito_cfg = MitoConfig::default(); + // Use a temporary directory as aux path + let data_home = std::env::temp_dir().join("greptime_objbench"); + let _ = std::fs::create_dir_all(&data_home); + let _ = mito_cfg.index.sanitize( + data_home.to_str().unwrap_or("/tmp"), + &mito_cfg.inverted_index, + ); + let access_layer = build_access_layer(®ion_dir, object_store, &mito_cfg) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("build_access_layer failed: {e}"), + } + .build() + })?; + Ok(( + access_layer, + std::sync::Arc::new(mito2::CacheManager::default()), + )) +} + +fn new_noop_file_purger() -> FilePurgerRef { + #[derive(Debug)] + struct Noop; + impl FilePurger for Noop { + fn send_request(&self, _request: PurgeRequest) {} + } + std::sync::Arc::new(Noop) +} + +async fn load_parquet_metadata( + object_store: ObjectStore, + path: &str, + file_size: u64, +) -> std::result::Result< + parquet::file::metadata::ParquetMetaData, + Box, +> { + use parquet::file::metadata::ParquetMetaDataReader; + use parquet::file::FOOTER_SIZE; + let actual_size = if file_size == 0 { + object_store.stat(path).await?.content_length() + } else { + file_size + }; + if actual_size < FOOTER_SIZE as u64 { + return Err("file too small".into()); + } + let prefetch: u64 = 64 * 1024; + let start = actual_size.saturating_sub(prefetch); + let buffer = object_store + .read_with(path) + .range(start..actual_size) + .await? + .to_vec(); + let buffer_len = buffer.len(); + let mut footer = [0; 8]; + footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]); + let metadata_len = ParquetMetaDataReader::decode_footer(&footer)? as u64; + if actual_size - (FOOTER_SIZE as u64) < metadata_len { + return Err("invalid footer/metadata length".into()); + } + if (metadata_len as usize) <= buffer_len - FOOTER_SIZE { + let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE; + let meta = ParquetMetaDataReader::decode_metadata( + &buffer[metadata_start..buffer_len - FOOTER_SIZE], + )?; + Ok(meta) + } else { + let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64; + let data = object_store + .read_with(path) + .range(metadata_start..(actual_size - FOOTER_SIZE as u64)) + .await? + .to_vec(); + let meta = ParquetMetaDataReader::decode_metadata(&data)?; + Ok(meta) + } +} diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index f1a22cf54d..08cc80396f 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; use object_store::services::Fs; use object_store::util::{join_dir, with_instrument_layers}; @@ -42,6 +43,14 @@ pub type AccessLayerRef = Arc; /// SST write results. pub type SstInfoArray = SmallVec<[SstInfo; 2]>; +#[derive(Debug, Default)] +pub struct Metrics { + pub read: Duration, + pub write: Duration, + + pub close: Duration, +} + /// A layer to access SST files under the same directory. pub struct AccessLayer { region_dir: String, @@ -121,10 +130,11 @@ impl AccessLayer { /// Writes a SST with specific `file_id` and `metadata` to the layer. /// /// Returns the info of the SST. If no data written, returns None. - pub(crate) async fn write_sst( + pub async fn write_sst( &self, request: SstWriteRequest, write_opts: &WriteOptions, + metrics: &mut Metrics, ) -> Result { let region_id = request.metadata.region_id; let cache_manager = request.cache_manager.clone(); @@ -168,7 +178,7 @@ impl AccessLayer { ) .await; writer - .write_all(request.source, request.max_sequence, write_opts) + .write_all(request.source, request.max_sequence, write_opts, metrics) .await? }; @@ -189,28 +199,53 @@ impl AccessLayer { } } +/// Helper to build an [AccessLayerRef] with internal index managers. +/// +/// This is a convenience constructor intended for tooling that needs to +/// interact with SSTs without wiring all indexing internals manually. +pub async fn build_access_layer( + region_dir: &str, + object_store: ObjectStore, + config: &crate::config::MitoConfig, +) -> Result { + let puffin_manager_factory = PuffinManagerFactory::new( + &config.index.aux_path, + config.index.staging_size.as_bytes(), + Some(config.index.write_buffer_size.as_bytes() as _), + config.index.staging_ttl, + ) + .await?; + let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path).await?; + Ok(Arc::new(AccessLayer::new( + region_dir, + object_store, + puffin_manager_factory, + intermediate_manager, + ))) +} + /// `OperationType` represents the origin of the `SstWriteRequest`. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub(crate) enum OperationType { +pub enum OperationType { Flush, Compact, } /// Contents to build a SST. -pub(crate) struct SstWriteRequest { - pub(crate) op_type: OperationType, - pub(crate) metadata: RegionMetadataRef, - pub(crate) source: Source, - pub(crate) cache_manager: CacheManagerRef, +pub struct SstWriteRequest { + pub op_type: OperationType, + pub metadata: RegionMetadataRef, + pub source: Source, + pub cache_manager: CacheManagerRef, #[allow(dead_code)] - pub(crate) storage: Option, - pub(crate) max_sequence: Option, + pub storage: Option, + pub max_sequence: Option, /// Configs for index - pub(crate) index_options: IndexOptions, - pub(crate) inverted_index_config: InvertedIndexConfig, - pub(crate) fulltext_index_config: FulltextIndexConfig, - pub(crate) bloom_filter_index_config: BloomFilterConfig, + pub index_options: IndexOptions, + pub inverted_index_config: InvertedIndexConfig, + pub fulltext_index_config: FulltextIndexConfig, + pub bloom_filter_index_config: BloomFilterConfig, } pub(crate) async fn new_fs_cache_store(root: &str) -> Result { diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 974f0caef0..d36d0899d9 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -40,6 +40,7 @@ use crate::sst::index::IndexerBuilderImpl; use crate::sst::parquet::writer::ParquetWriter; use crate::sst::parquet::WriteOptions; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; +use crate::Metrics; /// A cache for uploading files to remote object stores. /// @@ -140,7 +141,12 @@ impl WriteCache { .await; let sst_info = writer - .write_all(write_request.source, write_request.max_sequence, write_opts) + .write_all( + write_request.source, + write_request.max_sequence, + write_opts, + &mut Metrics::default(), + ) .await?; timer.stop_and_record(); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index fb6e7bd03f..7831bdea69 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; -use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest}; +use crate::access_layer::{AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::{new_picker, PickerOutput}; use crate::compaction::{find_ttl, CompactionSstReaderBuilder}; @@ -340,6 +340,7 @@ impl Compactor for DefaultCompactor { bloom_filter_index_config, }, &write_opts, + &mut Metrics::default(), ) .await? .into_iter() diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index ffd6e896a3..9482db8740 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -25,7 +25,7 @@ use store_api::storage::RegionId; use strum::IntoStaticStr; use tokio::sync::{mpsc, watch}; -use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest}; +use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest}; use crate::cache::CacheManagerRef; use crate::config::MitoConfig; use crate::error::{ @@ -366,7 +366,7 @@ impl RegionFlushTask { let ssts_written = self .access_layer - .write_sst(write_request, &write_opts) + .write_sst(write_request, &write_opts, &mut Metrics::default()) .await?; if ssts_written.is_empty() { // No data written. diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index cd0cb37102..87988c78fc 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -44,6 +44,12 @@ mod time_provider; pub mod wal; mod worker; +// Public re-exports for tooling convenience +pub use access_layer::{ + build_access_layer, AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, +}; +pub use cache::{CacheManager, CacheManagerRef}; + #[cfg_attr(doc, aquamarine::aquamarine)] /// # Mito developer document /// diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 14496312e3..a297c9a4ea 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -109,6 +109,7 @@ mod tests { new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata, }; use crate::test_util::{check_reader_result, TestEnv}; + use crate::Metrics; const FILE_DIR: &str = "/"; @@ -165,7 +166,7 @@ mod tests { .await; let info = writer - .write_all(source, None, &write_opts) + .write_all(source, None, &write_opts, &mut Metrics::default()) .await .unwrap() .remove(0); @@ -222,7 +223,7 @@ mod tests { .await; writer - .write_all(source, None, &write_opts) + .write_all(source, None, &write_opts, &mut Metrics::default()) .await .unwrap() .remove(0); @@ -293,7 +294,7 @@ mod tests { .await; let sst_info = writer - .write_all(source, None, &write_opts) + .write_all(source, None, &write_opts, &mut Metrics::default()) .await .unwrap() .remove(0); @@ -334,7 +335,7 @@ mod tests { ) .await; writer - .write_all(source, None, &write_opts) + .write_all(source, None, &write_opts, &mut Metrics::default()) .await .unwrap() .remove(0); @@ -389,7 +390,7 @@ mod tests { ) .await; writer - .write_all(source, None, &write_opts) + .write_all(source, None, &write_opts, &mut Metrics::default()) .await .unwrap() .remove(0); @@ -427,7 +428,7 @@ mod tests { .await; writer - .write_all(source, None, &write_opts) + .write_all(source, None, &write_opts, &mut Metrics::default()) .await .unwrap() .remove(0); diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index d34aaf2229..c761ecdaa3 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -1117,7 +1117,6 @@ impl ParquetReader { self.context.read_format().metadata() } - #[cfg(test)] pub fn parquet_metadata(&self) -> Arc { self.context.reader_builder().parquet_meta.clone() } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index 3aad380eb5..9ff7d7df5f 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -19,6 +19,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Instant; use common_time::Timestamp; use datatypes::arrow::datatypes::SchemaRef; @@ -45,6 +46,7 @@ use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::helper::parse_parquet_metadata; use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY}; use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY}; +use crate::Metrics; /// Parquet SST writer. pub struct ParquetWriter { @@ -156,13 +158,14 @@ where mut source: Source, override_sequence: Option, // override the `sequence` field from `Source` opts: &WriteOptions, + metrics: &mut Metrics, ) -> Result { let write_format = WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence); let mut stats = SourceStats::default(); while let Some(res) = self - .write_next_batch(&mut source, &write_format, opts) + .write_next_batch(&mut source, &write_format, opts, metrics) .await .transpose() { @@ -189,9 +192,10 @@ where return Ok(smallvec![]); }; + let close_start = Instant::now(); arrow_writer.flush().await.context(WriteParquetSnafu)?; - let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?; + metrics.close += close_start.elapsed(); let file_size = self.bytes_written.load(Ordering::Relaxed) as u64; // Safety: num rows > 0 so we must have min/max. @@ -238,17 +242,23 @@ where source: &mut Source, write_format: &WriteFormat, opts: &WriteOptions, + metrics: &mut Metrics, ) -> Result> { + let read_start = Instant::now(); let Some(batch) = source.next_batch().await? else { return Ok(None); }; + metrics.read += read_start.elapsed(); let arrow_batch = write_format.convert_batch(&batch)?; + + let write_start = Instant::now(); self.maybe_init_writer(write_format.arrow_schema(), opts) .await? .write(&arrow_batch) .await .context(WriteParquetSnafu)?; + metrics.write += write_start.elapsed(); Ok(Some(batch)) }