pretty print

This commit is contained in:
Lei, HUANG
2025-09-23 16:01:50 +08:00
parent 53c58494fd
commit 93e8510b2a
3 changed files with 68 additions and 13 deletions

11
Cargo.lock generated
View File

@@ -1791,6 +1791,7 @@ dependencies = [
"clap 4.5.19",
"cli",
"client",
"colored",
"common-base",
"common-catalog",
"common-config",
@@ -1860,6 +1861,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "colored"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.59.0",
]
[[package]]
name = "combine"
version = "4.6.7"

View File

@@ -24,6 +24,7 @@ workspace = true
async-trait.workspace = true
auth.workspace = true
base64.workspace = true
colored = "2.0"
cache.workspace = true
catalog.workspace = true
chrono.workspace = true

View File

@@ -17,7 +17,7 @@ use std::time::Instant;
use clap::Parser;
use cmd::error::{self, Result};
use common_telemetry::info;
use colored::Colorize;
use datanode::config::ObjectStoreConfig;
use mito2::config::MitoConfig;
use mito2::read::Source;
@@ -32,7 +32,10 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef};
pub async fn main() {
// common_telemetry::init_default_ut_logging();
let cmd = Command::parse();
cmd.run().await.unwrap();
if let Err(e) = cmd.run().await {
eprintln!("{}: {}", "Error".red().bold(), e);
std::process::exit(1);
}
}
#[derive(Debug, Parser)]
@@ -59,6 +62,9 @@ impl Command {
if self.verbose {
common_telemetry::init_default_ut_logging();
}
println!("{}", "Starting objbench...".cyan().bold());
// Build object store from config
let cfg_str = std::fs::read_to_string(&self.config).map_err(|e| {
error::IllegalConfigSnafu {
@@ -74,11 +80,14 @@ impl Command {
})?;
let object_store = build_object_store(&store_cfg).await?;
println!("{} Object store initialized", "".green());
// Prepare source identifiers
let (src_region_dir, src_file_id) = split_sst_path(&self.source)?;
println!("{} Source path parsed: {}", "".green(), self.source);
// Load parquet metadata to extract RegionMetadata and file stats
println!("{}", "Loading parquet metadata...".yellow());
let file_size = object_store
.stat(&self.source)
.await
@@ -102,6 +111,13 @@ impl Command {
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
let num_row_groups = parquet_meta.num_row_groups() as u64;
println!(
"{} Metadata loaded - rows: {}, size: {} bytes",
"".green(),
num_rows,
file_size
);
// Build a FileHandle for the source file
let file_meta = FileMeta {
region_id: region_meta.region_id,
@@ -118,6 +134,7 @@ impl Command {
let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
// Build the reader for a single file via ParquetReaderBuilder
println!("{}", "Building reader...".yellow());
let (_src_access_layer, _cache_manager) =
build_access_layer_simple(src_region_dir.clone(), object_store.clone()).await?;
let reader_build_start = Instant::now();
@@ -138,8 +155,10 @@ impl Command {
let reader_build_elapsed = reader_build_start.elapsed();
let total_rows = reader.parquet_metadata().file_metadata().num_rows();
println!("{} Reader built in {:?}", "".green(), reader_build_elapsed);
// Prepare target access layer for writing
println!("{}", "Preparing target access layer...".yellow());
let (tgt_access_layer, tgt_cache_manager) =
build_access_layer_simple(self.target.clone(), object_store.clone()).await?;
@@ -158,7 +177,10 @@ impl Command {
bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
};
// Write SST
println!("{}", "Writing SST...".yellow());
let mut metrics = Metrics::default();
let write_start = Instant::now();
let infos = tgt_access_layer
.write_sst(write_req, &write_opts, &mut metrics)
.await
@@ -169,20 +191,41 @@ impl Command {
.build()
})?;
let write_elapsed = write_start.elapsed();
assert_eq!(infos.len(), 1);
let dst_file_id = infos[0].file_id;
let dst_file_path = format!("{}/{}", self.target, dst_file_id.as_parquet(),);
// Report results
println!(
"Write complete, dest file: {}, rows={}, size={} bytes, build_reader={:?}, metrics: {:?}",
dst_file_path,
total_rows, file_size, reader_build_elapsed, metrics
);
let dst_file_path = format!("{}{}", self.target, dst_file_id.as_parquet(),);
object_store
.delete(&dst_file_path)
.await
.expect(&format!("Failed to delete dest file: {}", dst_file_path));
// Report results with ANSI colors
println!("\n{} {}", "Write complete!".green().bold(), "".green());
println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
println!(
" {}: {}",
"File size".bold(),
format!("{} bytes", file_size).cyan()
);
println!(
" {}: {:?}",
"Reader build time".bold(),
reader_build_elapsed
);
println!(" {}: {:?}", "Write time".bold(), write_elapsed);
// Print metrics in a formatted way
println!(" {}: {:?}", "Metrics".bold(), metrics);
// Cleanup
println!("\n{}", "Cleaning up...".yellow());
object_store.delete(&dst_file_path).await.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
}
.build()
})?;
println!("{} Temporary file deleted", "".green());
println!("\n{}", "Benchmark completed successfully!".green().bold());
Ok(())
}
}