Compare commits

..

13 Commits

Author SHA1 Message Date
evenyag
0d5b423eb7 feat: opendal metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-09-29 18:23:41 +08:00
evenyag
26bdb6a413 fix: disable on compaction
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-09-26 17:40:00 +08:00
evenyag
2fe21469f8 chore: also print infos
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-09-26 17:29:37 +08:00
evenyag
3aa67c7af4 feat: add series num to metrics
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-09-26 17:08:27 +08:00
evenyag
e0d3e6ae97 chore: disable fulltext index
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-09-26 17:01:53 +08:00
evenyag
2ce476dc42 feat: add prof-file flag to get flamegraph
Signed-off-by: evenyag <realevenyag@gmail.com>
2025-09-26 15:36:15 +08:00
Lei, HUANG
69a816fa0c feat/objbench:
### Update Metrics and Command Output

 - **`objbench.rs`**:
   - Renamed "Write time" to "Total time" in output.
   - Enhanced metrics output to include a sum of all metrics.

 - **`access_layer.rs`**:
   - Split `index` duration into `index_update` and `index_finish`.
   - Added a `sum` method to `Metrics` to calculate the total duration.

 - **`writer.rs`**:
   - Updated metrics to use `index_update` and `index_finish` for more granular tracking of index operations.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-09-24 20:46:27 +08:00
Lei, HUANG
dcf5a62014 feat/objbench:
### Add Metrics for Indexing and Conversion in `access_layer.rs` and `writer.rs`

 - **Enhancements in `access_layer.rs`:**
   - Added new metrics `convert` and `index` to the `Metrics` struct to track conversion and indexing durations.

 - **Updates in `writer.rs`:**
   - Implemented tracking of indexing duration by measuring the time taken for `update` in the indexer.
   - Added measurement of conversion duration for `convert_batch` to enhance performance monitoring.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-09-24 18:14:00 +08:00
Lei, HUANG
f3aa967aae fix storage config
Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-09-24 16:01:19 +08:00
Lei, HUANG
93e8510b2a pretty print 2025-09-23 16:01:50 +08:00
Lei, HUANG
53c58494fd feat/objbench:
### Add verbose logging and file deletion in `objbench.rs`

 - **Verbose Logging**: Introduced a `--verbose` flag in `Command` to enable detailed logging using `common_telemetry::init_default_ut_logging()`.
 - **File Deletion**: Implemented automatic deletion of the destination file after processing in `Command::run()`.

 ### Update tests in `parquet.rs`

 - Removed unused parameters in test functions to streamline the code.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-09-23 15:42:21 +08:00
Lei, HUANG
741c5e2fb1 feat/objbench:
### Update `objbench.rs` and `parquet.rs` for Improved File Handling

 - **`objbench.rs`:**
   - Simplified target access layer initialization by directly using `self.target`.
   - Added assertion to ensure single file info and constructed destination file path for reporting.
   - Enhanced logging to include destination file path in write completion message.

 - **`parquet.rs`:**
   - Updated test cases to include `None` for additional parameter in function calls.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-09-23 15:26:09 +08:00
Lei, HUANG
d68215dc88 feat/objbench:
### Add `objbench` Binary and Enhance Metrics Collection

 - **New Binary**: Introduced a new binary `objbench` in `src/cmd/src/bin/objbench.rs` for benchmarking object store operations.
 - **Metrics Collection**: Enhanced metrics collection by adding a `Metrics` struct in `access_layer.rs` and integrating it into SST writing processes across multiple files, including `write_cache.rs`, `compactor.rs`, `flush

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
2025-09-23 14:49:33 +08:00
12 changed files with 870 additions and 37 deletions

14
Cargo.lock generated
View File

@@ -1791,6 +1791,7 @@ dependencies = [
"clap 4.5.19",
"cli",
"client",
"colored",
"common-base",
"common-catalog",
"common-config",
@@ -1825,7 +1826,10 @@ dependencies = [
"mito2",
"moka",
"nu-ansi-term",
"object-store",
"parquet",
"plugins",
"pprof",
"prometheus",
"prost 0.13.3",
"query",
@@ -1858,6 +1862,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0"
[[package]]
name = "colored"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.59.0",
]
[[package]]
name = "combine"
version = "4.6.7"

View File

@@ -9,6 +9,10 @@ default-run = "greptime"
name = "greptime"
path = "src/bin/greptime.rs"
[[bin]]
name = "objbench"
path = "src/bin/objbench.rs"
[features]
default = ["servers/pprof", "servers/mem-prof"]
tokio-console = ["common-telemetry/tokio-console"]
@@ -20,6 +24,7 @@ workspace = true
async-trait.workspace = true
auth.workspace = true
base64.workspace = true
colored = "2.0"
cache.workspace = true
catalog.workspace = true
chrono.workspace = true
@@ -55,6 +60,9 @@ futures.workspace = true
human-panic = "2.0"
humantime.workspace = true
lazy_static.workspace = true
object-store.workspace = true
parquet = "53"
pprof = "0.14"
meta-client.workspace = true
meta-srv.workspace = true
metric-engine.workspace = true

View File

@@ -21,6 +21,8 @@ use cmd::{cli, datanode, flownode, frontend, metasrv, standalone, App};
use common_version::version;
use servers::install_ring_crypto_provider;
pub mod objbench;
#[derive(Parser)]
#[command(name = "greptime", author, version, long_version = version(), about)]
#[command(propagate_version = true)]

602
src/cmd/src/bin/objbench.rs Normal file
View File

@@ -0,0 +1,602 @@
// Copyright 2025 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::{Path, PathBuf};
use std::time::Instant;
use clap::Parser;
use cmd::error::{self, Result};
use colored::Colorize;
use datanode::config::ObjectStoreConfig;
use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
use mito2::read::Source;
use mito2::sst::file::{FileHandle, FileId, FileMeta};
use mito2::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
use mito2::sst::parquet::{WriteOptions, PARQUET_METADATA_KEY};
use mito2::{build_access_layer, Metrics, OperationType, SstWriteRequest};
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
#[tokio::main]
pub async fn main() {
// common_telemetry::init_default_ut_logging();
let cmd = Command::parse();
if let Err(e) = cmd.run().await {
eprintln!("{}: {}", "Error".red().bold(), e);
std::process::exit(1);
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
pub struct StorageConfigWrapper {
storage: StorageConfig,
}
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
pub struct StorageConfig {
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: ObjectStoreConfig,
}
#[derive(Debug, Parser)]
pub struct Command {
/// Path to the object-store config file (TOML). Must deserialize into datanode::config::ObjectStoreConfig.
#[clap(long, value_name = "FILE")]
pub config: PathBuf,
/// Source SST file path in object-store (e.g. "region_dir/<uuid>.parquet").
#[clap(long, value_name = "PATH")]
pub source: String,
/// Target SST file path in object-store; its parent directory is used as destination region dir.
#[clap(long, value_name = "PATH")]
pub target: String,
/// Verbose output
#[clap(short, long, default_value_t = false)]
pub verbose: bool,
/// Output file path for pprof flamegraph (enables profiling)
#[clap(long, value_name = "FILE")]
pub pprof_file: Option<PathBuf>,
}
impl Command {
pub async fn run(&self) -> Result<()> {
if self.verbose {
common_telemetry::init_default_ut_logging();
}
println!("{}", "Starting objbench...".cyan().bold());
// Build object store from config
let cfg_str = std::fs::read_to_string(&self.config).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to read config {}: {e}", self.config.display()),
}
.build()
})?;
let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to parse config {}: {e}", self.config.display()),
}
.build()
})?;
let object_store = build_object_store(&store_cfg.storage).await?;
println!("{} Object store initialized", "".green());
// Prepare source identifiers
let (src_region_dir, src_file_id) = split_sst_path(&self.source)?;
println!("{} Source path parsed: {}", "".green(), self.source);
// Load parquet metadata to extract RegionMetadata and file stats
println!("{}", "Loading parquet metadata...".yellow());
let file_size = object_store
.stat(&self.source)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("stat failed: {e}"),
}
.build()
})?
.content_length();
let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("read parquet metadata failed: {e}"),
}
.build()
})?;
let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
let num_row_groups = parquet_meta.num_row_groups() as u64;
println!(
"{} Metadata loaded - rows: {}, size: {} bytes",
"".green(),
num_rows,
file_size
);
// Build a FileHandle for the source file
let file_meta = FileMeta {
region_id: region_meta.region_id,
file_id: src_file_id,
time_range: Default::default(),
level: 0,
file_size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows,
num_row_groups,
sequence: None,
};
let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
// Build the reader for a single file via ParquetReaderBuilder
println!("{}", "Building reader...".yellow());
let (_src_access_layer, _cache_manager) =
build_access_layer_simple(src_region_dir.clone(), object_store.clone()).await?;
let reader_build_start = Instant::now();
let reader = mito2::sst::parquet::reader::ParquetReaderBuilder::new(
src_region_dir.clone(),
src_handle.clone(),
object_store.clone(),
)
.expected_metadata(Some(region_meta.clone()))
.build()
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("build reader failed: {e}"),
}
.build()
})?;
let reader_build_elapsed = reader_build_start.elapsed();
let total_rows = reader.parquet_metadata().file_metadata().num_rows();
println!("{} Reader built in {:?}", "".green(), reader_build_elapsed);
// Prepare target access layer for writing
println!("{}", "Preparing target access layer...".yellow());
let (tgt_access_layer, tgt_cache_manager) =
build_access_layer_simple(self.target.clone(), object_store.clone()).await?;
// Build write request
let fulltext_index_config = FulltextIndexConfig {
create_on_compaction: Mode::Disable,
..Default::default()
};
let write_opts = WriteOptions::default();
let write_req = SstWriteRequest {
op_type: OperationType::Compact,
metadata: region_meta,
source: Source::Reader(Box::new(reader)),
cache_manager: tgt_cache_manager,
storage: None,
max_sequence: None,
index_options: Default::default(),
inverted_index_config: MitoConfig::default().inverted_index,
fulltext_index_config,
bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
};
// Write SST
println!("{}", "Writing SST...".yellow());
let mut metrics = Metrics::default();
// Start profiling if pprof_file is specified
#[cfg(unix)]
let profiler_guard = if self.pprof_file.is_some() {
println!("{} Starting profiling...", "".yellow());
Some(
pprof::ProfilerGuardBuilder::default()
.frequency(99)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to start profiler: {e}"),
}
.build()
})?,
)
} else {
None
};
#[cfg(not(unix))]
if self.pprof_file.is_some() {
eprintln!(
"{}: Profiling is not supported on this platform",
"Warning".yellow()
);
}
let write_start = Instant::now();
let infos = tgt_access_layer
.write_sst(write_req, &write_opts, &mut metrics)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("write_sst failed: {e}"),
}
.build()
})?;
let write_elapsed = write_start.elapsed();
// Stop profiling and generate flamegraph if enabled
#[cfg(unix)]
if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
println!("{} Generating flamegraph...", "🔥".yellow());
match guard.report().build() {
Ok(report) => {
let mut flamegraph_data = Vec::new();
if let Err(e) = report.flamegraph(&mut flamegraph_data) {
eprintln!(
"{}: Failed to generate flamegraph: {}",
"Warning".yellow(),
e
);
} else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
eprintln!(
"{}: Failed to write flamegraph to {}: {}",
"Warning".yellow(),
pprof_file.display(),
e
);
} else {
println!(
"{} Flamegraph saved to {}",
"".green(),
pprof_file.display().to_string().cyan()
);
}
}
Err(e) => {
eprintln!(
"{}: Failed to generate pprof report: {}",
"Warning".yellow(),
e
);
}
}
}
assert_eq!(infos.len(), 1);
let dst_file_id = infos[0].file_id;
let dst_file_path = format!("{}{}", self.target, dst_file_id.as_parquet(),);
// Report results with ANSI colors
println!("\n{} {}", "Write complete!".green().bold(), "".green());
println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
println!(
" {}: {}",
"File size".bold(),
format!("{} bytes", file_size).cyan()
);
println!(
" {}: {:?}",
"Reader build time".bold(),
reader_build_elapsed
);
println!(" {}: {:?}", "Total time".bold(), write_elapsed);
// Print metrics in a formatted way
println!(
" {}: {:?}, sum: {:?}",
"Metrics".bold(),
metrics,
metrics.sum()
);
// Print infos
println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
// Cleanup
println!("\n{}", "Cleaning up...".yellow());
object_store.delete(&dst_file_path).await.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
}
.build()
})?;
println!("{} Temporary file deleted", "".green());
println!("\n{}", "Benchmark completed successfully!".green().bold());
Ok(())
}
}
fn split_sst_path(path: &str) -> Result<(String, FileId)> {
let p = Path::new(path);
let file_name = p.file_name().and_then(|s| s.to_str()).ok_or_else(|| {
error::IllegalConfigSnafu {
msg: "invalid source path".to_string(),
}
.build()
})?;
let uuid_str = file_name.strip_suffix(".parquet").ok_or_else(|| {
error::IllegalConfigSnafu {
msg: "expect .parquet file".to_string(),
}
.build()
})?;
let file_id = FileId::parse_str(uuid_str).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid file id: {e}"),
}
.build()
})?;
let parent = p
.parent()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_string();
Ok((parent, file_id))
}
fn extract_region_metadata(
file_path: &str,
meta: &parquet::file::metadata::ParquetMetaData,
) -> Result<RegionMetadataRef> {
use parquet::format::KeyValue;
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
let Some(kvs) = kvs else {
return Err(error::IllegalConfigSnafu {
msg: format!("{file_path}: missing parquet key_value metadata"),
}
.build());
};
let json = kvs
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.and_then(|kv| kv.value.as_ref())
.ok_or_else(|| {
error::IllegalConfigSnafu {
msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
}
.build()
})?;
let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid region metadata json: {e}"),
}
.build()
})?;
Ok(std::sync::Arc::new(region))
}
async fn build_object_store(sc: &StorageConfig) -> Result<ObjectStore> {
use datanode::config::ObjectStoreConfig::*;
let oss = &sc.store;
match oss {
File(_) => {
use object_store::services::Fs;
let builder = Fs::default().root(&sc.data_home);
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init fs backend failed: {e}"),
}
.build()
})?
.finish())
}
S3(s3) => {
use common_base::secrets::ExposeSecret;
use object_store::services::S3;
use object_store::util;
let root = util::normalize_dir(&s3.root);
let mut builder = S3::default()
.root(&root)
.bucket(&s3.bucket)
.access_key_id(s3.access_key_id.expose_secret())
.secret_access_key(s3.secret_access_key.expose_secret());
if let Some(ep) = &s3.endpoint {
builder = builder.endpoint(ep);
}
if let Some(region) = &s3.region {
builder = builder.region(region);
}
if s3.enable_virtual_host_style {
builder = builder.enable_virtual_host_style();
}
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init s3 backend failed: {e}"),
}
.build()
})?
.finish())
}
Oss(oss) => {
use common_base::secrets::ExposeSecret;
use object_store::services::Oss;
use object_store::util;
let root = util::normalize_dir(&oss.root);
let builder = Oss::default()
.root(&root)
.bucket(&oss.bucket)
.endpoint(&oss.endpoint)
.access_key_id(oss.access_key_id.expose_secret())
.access_key_secret(oss.access_key_secret.expose_secret());
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init oss backend failed: {e}"),
}
.build()
})?
.finish())
}
Azblob(az) => {
use common_base::secrets::ExposeSecret;
use object_store::services::Azblob;
use object_store::util;
let root = util::normalize_dir(&az.root);
let mut builder = Azblob::default()
.root(&root)
.container(&az.container)
.endpoint(&az.endpoint)
.account_name(az.account_name.expose_secret())
.account_key(az.account_key.expose_secret());
if let Some(token) = &az.sas_token {
builder = builder.sas_token(token);
}
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init azblob backend failed: {e}"),
}
.build()
})?
.finish())
}
Gcs(gcs) => {
use common_base::secrets::ExposeSecret;
use object_store::services::Gcs;
use object_store::util;
let root = util::normalize_dir(&gcs.root);
let builder = Gcs::default()
.root(&root)
.bucket(&gcs.bucket)
.scope(&gcs.scope)
.credential_path(gcs.credential_path.expose_secret())
.credential(gcs.credential.expose_secret())
.endpoint(&gcs.endpoint);
Ok(ObjectStore::new(builder)
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("init gcs backend failed: {e}"),
}
.build()
})?
.finish())
}
}
}
async fn build_access_layer_simple(
region_dir: String,
object_store: ObjectStore,
) -> Result<(
std::sync::Arc<mito2::AccessLayer>,
std::sync::Arc<mito2::CacheManager>,
)> {
// Minimal index aux path setup
let mut mito_cfg = MitoConfig::default();
// Use a temporary directory as aux path
let data_home = std::env::temp_dir().join("greptime_objbench");
let _ = std::fs::create_dir_all(&data_home);
let _ = mito_cfg.index.sanitize(
data_home.to_str().unwrap_or("/tmp"),
&mito_cfg.inverted_index,
);
let access_layer = build_access_layer(&region_dir, object_store, &mito_cfg)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("build_access_layer failed: {e}"),
}
.build()
})?;
Ok((
access_layer,
std::sync::Arc::new(mito2::CacheManager::default()),
))
}
fn new_noop_file_purger() -> FilePurgerRef {
#[derive(Debug)]
struct Noop;
impl FilePurger for Noop {
fn send_request(&self, _request: PurgeRequest) {}
}
std::sync::Arc::new(Noop)
}
async fn load_parquet_metadata(
object_store: ObjectStore,
path: &str,
file_size: u64,
) -> std::result::Result<
parquet::file::metadata::ParquetMetaData,
Box<dyn std::error::Error + Send + Sync>,
> {
use parquet::file::metadata::ParquetMetaDataReader;
use parquet::file::FOOTER_SIZE;
let actual_size = if file_size == 0 {
object_store.stat(path).await?.content_length()
} else {
file_size
};
if actual_size < FOOTER_SIZE as u64 {
return Err("file too small".into());
}
let prefetch: u64 = 64 * 1024;
let start = actual_size.saturating_sub(prefetch);
let buffer = object_store
.read_with(path)
.range(start..actual_size)
.await?
.to_vec();
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let metadata_len = ParquetMetaDataReader::decode_footer(&footer)? as u64;
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
return Err("invalid footer/metadata length".into());
}
if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
let meta = ParquetMetaDataReader::decode_metadata(
&buffer[metadata_start..buffer_len - FOOTER_SIZE],
)?;
Ok(meta)
} else {
let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
let data = object_store
.read_with(path)
.range(metadata_start..(actual_size - FOOTER_SIZE as u64))
.await?
.to_vec();
let meta = ParquetMetaDataReader::decode_metadata(&data)?;
Ok(meta)
}
}
#[cfg(test)]
mod tests {
use super::StorageConfigWrapper;
#[test]
fn test_decode() {
let cfg = std::fs::read_to_string("/home/lei/datanode-bulk.toml").unwrap();
let storage: StorageConfigWrapper = toml::from_str(&cfg).unwrap();
println!("{:?}", storage);
}
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
@@ -42,6 +43,29 @@ pub type AccessLayerRef = Arc<AccessLayer>;
/// SST write results.
pub type SstInfoArray = SmallVec<[SstInfo; 2]>;
#[derive(Debug, Default)]
pub struct Metrics {
pub read: Duration,
pub write: Duration,
pub convert: Duration,
pub index_update: Duration,
pub index_finish: Duration,
pub close: Duration,
pub num_series: usize,
// SST Opendal metrics.
pub opendal_create_cost: Duration,
pub opendal_num_writes: usize,
pub opendal_write_cost: Duration,
pub opendal_complete_cost: Duration,
}
impl Metrics {
pub fn sum(&self) -> Duration {
self.read + self.write + self.convert + self.index_update + self.index_finish + self.close
}
}
/// A layer to access SST files under the same directory.
pub struct AccessLayer {
region_dir: String,
@@ -121,10 +145,11 @@ impl AccessLayer {
/// Writes a SST with specific `file_id` and `metadata` to the layer.
///
/// Returns the info of the SST. If no data written, returns None.
pub(crate) async fn write_sst(
pub async fn write_sst(
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
metrics: &mut Metrics,
) -> Result<SstInfoArray> {
let region_id = request.metadata.region_id;
let cache_manager = request.cache_manager.clone();
@@ -167,9 +192,16 @@ impl AccessLayer {
path_provider,
)
.await;
writer
.write_all(request.source, request.max_sequence, write_opts)
.await?
let sst_info = writer
.write_all(request.source, request.max_sequence, write_opts, metrics)
.await?;
let opendal_metrics = writer.opendal_metrics_val();
metrics.opendal_create_cost += opendal_metrics.create_cost;
metrics.opendal_num_writes += opendal_metrics.num_writes;
metrics.opendal_write_cost += opendal_metrics.write_cost;
metrics.opendal_complete_cost += opendal_metrics.complete_cost;
sst_info
};
// Put parquet metadata to cache manager.
@@ -189,28 +221,53 @@ impl AccessLayer {
}
}
/// Helper to build an [AccessLayerRef] with internal index managers.
///
/// This is a convenience constructor intended for tooling that needs to
/// interact with SSTs without wiring all indexing internals manually.
pub async fn build_access_layer(
region_dir: &str,
object_store: ObjectStore,
config: &crate::config::MitoConfig,
) -> Result<AccessLayerRef> {
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path).await?;
Ok(Arc::new(AccessLayer::new(
region_dir,
object_store,
puffin_manager_factory,
intermediate_manager,
)))
}
/// `OperationType` represents the origin of the `SstWriteRequest`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum OperationType {
pub enum OperationType {
Flush,
Compact,
}
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
pub struct SstWriteRequest {
pub op_type: OperationType,
pub metadata: RegionMetadataRef,
pub source: Source,
pub cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
pub(crate) max_sequence: Option<SequenceNumber>,
pub storage: Option<String>,
pub max_sequence: Option<SequenceNumber>,
/// Configs for index
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
pub index_options: IndexOptions,
pub inverted_index_config: InvertedIndexConfig,
pub fulltext_index_config: FulltextIndexConfig,
pub bloom_filter_index_config: BloomFilterConfig,
}
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {

View File

@@ -40,6 +40,7 @@ use crate::sst::index::IndexerBuilderImpl;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::WriteOptions;
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
use crate::Metrics;
/// A cache for uploading files to remote object stores.
///
@@ -140,7 +141,12 @@ impl WriteCache {
.await;
let sst_info = writer
.write_all(write_request.source, write_request.max_sequence, write_opts)
.write_all(
write_request.source,
write_request.max_sequence,
write_opts,
&mut Metrics::default(),
)
.await?;
timer.stop_and_record();

View File

@@ -27,7 +27,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
use crate::access_layer::{AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{new_picker, PickerOutput};
use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
@@ -340,6 +340,7 @@ impl Compactor for DefaultCompactor {
bloom_filter_index_config,
},
&write_opts,
&mut Metrics::default(),
)
.await?
.into_iter()

View File

@@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use strum::IntoStaticStr;
use tokio::sync::{mpsc, watch};
use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
use crate::access_layer::{AccessLayerRef, Metrics, OperationType, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
@@ -366,7 +366,7 @@ impl RegionFlushTask {
let ssts_written = self
.access_layer
.write_sst(write_request, &write_opts)
.write_sst(write_request, &write_opts, &mut Metrics::default())
.await?;
if ssts_written.is_empty() {
// No data written.

View File

@@ -44,6 +44,12 @@ mod time_provider;
pub mod wal;
mod worker;
// Public re-exports for tooling convenience
pub use access_layer::{
build_access_layer, AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest,
};
pub use cache::{CacheManager, CacheManagerRef};
#[cfg_attr(doc, aquamarine::aquamarine)]
/// # Mito developer document
///

View File

@@ -109,6 +109,7 @@ mod tests {
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};
use crate::Metrics;
const FILE_DIR: &str = "/";
@@ -165,7 +166,7 @@ mod tests {
.await;
let info = writer
.write_all(source, None, &write_opts)
.write_all(source, None, &write_opts, &mut Metrics::default())
.await
.unwrap()
.remove(0);
@@ -222,7 +223,7 @@ mod tests {
.await;
writer
.write_all(source, None, &write_opts)
.write_all(source, None, &write_opts, &mut Metrics::default())
.await
.unwrap()
.remove(0);
@@ -293,7 +294,7 @@ mod tests {
.await;
let sst_info = writer
.write_all(source, None, &write_opts)
.write_all(source, None, &write_opts, &mut Metrics::default())
.await
.unwrap()
.remove(0);
@@ -334,7 +335,7 @@ mod tests {
)
.await;
writer
.write_all(source, None, &write_opts)
.write_all(source, None, &write_opts, &mut Metrics::default())
.await
.unwrap()
.remove(0);
@@ -389,7 +390,7 @@ mod tests {
)
.await;
writer
.write_all(source, None, &write_opts)
.write_all(source, None, &write_opts, &mut Metrics::default())
.await
.unwrap()
.remove(0);
@@ -427,7 +428,7 @@ mod tests {
.await;
writer
.write_all(source, None, &write_opts)
.write_all(source, None, &write_opts, &mut Metrics::default())
.await
.unwrap()
.remove(0);

View File

@@ -1117,7 +1117,6 @@ impl ParquetReader {
self.context.read_format().metadata()
}
#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.context.reader_builder().parquet_meta.clone()
}

View File

@@ -17,14 +17,19 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use bytes::Bytes;
use common_time::Timestamp;
use datatypes::arrow::datatypes::SchemaRef;
use object_store::{FuturesAsyncWriter, ObjectStore};
use futures::future::BoxFuture;
use object_store::{FuturesAsyncWriter, ObjectStore, Writer};
use parquet::arrow::async_writer::AsyncFileWriter;
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::errors::ParquetError;
use parquet::file::metadata::KeyValue;
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::schema::types::ColumnPath;
@@ -45,12 +50,13 @@ use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
use crate::Metrics;
/// Parquet SST writer.
pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
/// Path provider that creates SST and index file paths according to file id.
path_provider: P,
writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
writer: Option<AsyncArrowWriter<OpenDalWriter>>,
/// Current active file id.
current_file: FileId,
writer_factory: F,
@@ -61,11 +67,18 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
/// Current active indexer.
current_indexer: Option<Indexer>,
bytes_written: Arc<AtomicUsize>,
opendal_metrics: Arc<Mutex<OpenDalMetrics>>,
}
pub trait WriterFactory {
type Writer: AsyncWrite + Send + Unpin;
fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
fn create_opendal(
&mut self,
file_path: &str,
size: Arc<AtomicUsize>,
) -> impl Future<Output = Result<OpenDalWriter>>;
}
pub struct ObjectStoreWriterFactory {
@@ -84,6 +97,22 @@ impl WriterFactory for ObjectStoreWriterFactory {
.map(|v| v.into_futures_async_write().compat_write())
.context(OpenDalSnafu)
}
async fn create_opendal(
&mut self,
file_path: &str,
size: Arc<AtomicUsize>,
) -> Result<OpenDalWriter> {
let writer = self
.object_store
.writer_with(file_path)
.chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
.concurrent(DEFAULT_WRITE_CONCURRENCY)
.await
.context(OpenDalSnafu)?;
Ok(OpenDalWriter::new(writer, size))
}
}
impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
@@ -105,6 +134,10 @@ where
)
.await
}
pub fn opendal_metrics_val(&self) -> OpenDalMetrics {
self.opendal_metrics.lock().unwrap().clone()
}
}
impl<F, I, P> ParquetWriter<F, I, P>
@@ -132,6 +165,7 @@ where
indexer_builder,
current_indexer: Some(indexer),
bytes_written: Arc::new(AtomicUsize::new(0)),
opendal_metrics: Arc::new(Mutex::new(OpenDalMetrics::default())),
}
}
@@ -156,20 +190,33 @@ where
mut source: Source,
override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
opts: &WriteOptions,
metrics: &mut Metrics,
) -> Result<SstInfoArray> {
let write_format =
WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
let mut stats = SourceStats::default();
let mut last_key = None;
while let Some(res) = self
.write_next_batch(&mut source, &write_format, opts)
.write_next_batch(&mut source, &write_format, opts, metrics)
.await
.transpose()
{
match res {
Ok(mut batch) => {
if let Some(last) = &last_key {
if last != batch.primary_key() {
metrics.num_series += 1;
last_key = Some(batch.primary_key().to_vec());
}
} else {
metrics.num_series += 1;
}
stats.update(&batch);
let index_start = Instant::now();
self.get_or_create_indexer().await.update(&mut batch).await;
metrics.index_update += index_start.elapsed();
}
Err(e) => {
self.get_or_create_indexer().await.abort().await;
@@ -178,7 +225,9 @@ where
}
}
let index_finish_start = Instant::now();
let index_output = self.get_or_create_indexer().await.finish().await;
metrics.index_finish += index_finish_start.elapsed();
if stats.num_rows == 0 {
return Ok(smallvec![]);
@@ -189,9 +238,10 @@ where
return Ok(smallvec![]);
};
let close_start = Instant::now();
arrow_writer.flush().await.context(WriteParquetSnafu)?;
let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?;
metrics.close += close_start.elapsed();
let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
// Safety: num rows > 0 so we must have min/max.
@@ -238,17 +288,25 @@ where
source: &mut Source,
write_format: &WriteFormat,
opts: &WriteOptions,
metrics: &mut Metrics,
) -> Result<Option<Batch>> {
let read_start = Instant::now();
let Some(batch) = source.next_batch().await? else {
return Ok(None);
};
metrics.read += read_start.elapsed();
let convert_start = Instant::now();
let arrow_batch = write_format.convert_batch(&batch)?;
metrics.convert += convert_start.elapsed();
let write_start = Instant::now();
self.maybe_init_writer(write_format.arrow_schema(), opts)
.await?
.write(&arrow_batch)
.await
.context(WriteParquetSnafu)?;
metrics.write += write_start.elapsed();
Ok(Some(batch))
}
@@ -256,7 +314,7 @@ where
&mut self,
schema: &SchemaRef,
opts: &WriteOptions,
) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
) -> Result<&mut AsyncArrowWriter<OpenDalWriter>> {
if let Some(ref mut w) = self.writer {
Ok(w)
} else {
@@ -274,10 +332,17 @@ where
let writer_props = props_builder.build();
let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
let writer = SizeAwareWriter::new(
self.writer_factory.create(&sst_file_path).await?,
self.bytes_written.clone(),
);
// let writer = SizeAwareWriter::new(
// self.writer_factory.create(&sst_file_path).await?,
// self.bytes_written.clone(),
// );
let create_start = Instant::now();
let mut writer = self
.writer_factory
.create_opendal(&sst_file_path, self.bytes_written.clone())
.await?;
self.opendal_metrics.lock().unwrap().create_cost += create_start.elapsed();
writer = writer.with_metrics(self.opendal_metrics.clone());
let arrow_writer =
AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
.context(WriteParquetSnafu)?;
@@ -317,6 +382,78 @@ impl SourceStats {
}
}
#[derive(Default, Debug, Clone)]
pub(crate) struct OpenDalMetrics {
pub(crate) create_cost: Duration,
pub(crate) num_writes: usize,
pub(crate) write_cost: Duration,
pub(crate) complete_cost: Duration,
}
/// Workaround for [AsyncArrowWriter] does not provide a method to
/// get total bytes written after close.
pub struct OpenDalWriter {
inner: Writer,
size: Arc<AtomicUsize>,
metrics: Option<Arc<Mutex<OpenDalMetrics>>>,
}
impl OpenDalWriter {
fn new(inner: Writer, size: Arc<AtomicUsize>) -> Self {
Self {
inner,
size: size.clone(),
metrics: None,
}
}
fn with_metrics(mut self, metrics: Arc<Mutex<OpenDalMetrics>>) -> Self {
self.metrics = Some(metrics);
self
}
}
impl AsyncFileWriter for OpenDalWriter {
fn write(&mut self, bs: Bytes) -> BoxFuture<'_, Result<(), ParquetError>> {
let write_start = Instant::now();
let size = self.size.clone();
let metrics = self.metrics.clone();
Box::pin(async move {
let bytes_written = bs.len();
self.inner
.write(bs)
.await
.map_err(|err| ParquetError::External(Box::new(err)))?;
size.fetch_add(bytes_written, Ordering::Relaxed);
if let Some(metrics) = metrics {
let mut m = metrics.lock().unwrap();
m.num_writes += 1;
m.write_cost += write_start.elapsed();
}
Ok(())
})
}
fn complete(&mut self) -> BoxFuture<'_, Result<(), ParquetError>> {
let complete_start = Instant::now();
let metrics = self.metrics.clone();
Box::pin(async move {
self.inner
.close()
.await
.map(|_| ())
.map_err(|err| ParquetError::External(Box::new(err)))?;
if let Some(metrics) = metrics {
let mut m = metrics.lock().unwrap();
m.complete_cost += complete_start.elapsed();
}
Ok(())
})
}
}
/// Workaround for [AsyncArrowWriter] does not provide a method to
/// get total bytes written after close.
struct SizeAwareWriter<W> {