1use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::Instant;
18
19use clap::Parser;
20use colored::Colorize;
21use datanode::config::RegionEngineConfig;
22use datanode::store;
23use either::Either;
24use mito2::access_layer::{
25 AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
26};
27use mito2::cache::{CacheManager, CacheManagerRef};
28use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
29use mito2::read::Source;
30use mito2::sst::file::{FileHandle, FileMeta};
31use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
32use mito2::sst::index::intermediate::IntermediateManager;
33use mito2::sst::index::puffin_manager::PuffinManagerFactory;
34use mito2::sst::parquet::reader::ParquetReaderBuilder;
35use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
36use mito2::worker::write_cache_from_config;
37use object_store::ObjectStore;
38use parquet::file::metadata::{FooterTail, KeyValue};
39use regex::Regex;
40use snafu::OptionExt;
41use store_api::metadata::{RegionMetadata, RegionMetadataRef};
42use store_api::path_utils::region_name;
43use store_api::region_request::PathType;
44use store_api::storage::FileId;
45
46use crate::datanode::{StorageConfig, StorageConfigWrapper};
47use crate::error;
48
49#[derive(Debug, Parser)]
51pub struct ObjbenchCommand {
52 #[clap(long, value_name = "FILE")]
54 pub config: PathBuf,
55
56 #[clap(long, value_name = "PATH")]
58 pub source: String,
59
60 #[clap(short, long, default_value_t = false)]
62 pub verbose: bool,
63
64 #[clap(long, value_name = "FILE")]
66 pub pprof_file: Option<PathBuf>,
67}
68
69fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConfig)> {
70 let cfg_str = std::fs::read_to_string(config_path).map_err(|e| {
71 error::IllegalConfigSnafu {
72 msg: format!("failed to read config {}: {e}", config_path.display()),
73 }
74 .build()
75 })?;
76
77 let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
78 error::IllegalConfigSnafu {
79 msg: format!("failed to parse config {}: {e}", config_path.display()),
80 }
81 .build()
82 })?;
83
84 let storage_config = store_cfg.storage;
85 let mito_engine_config = store_cfg
86 .region_engine
87 .into_iter()
88 .filter_map(|c| {
89 if let RegionEngineConfig::Mito(mito) = c {
90 Some(mito)
91 } else {
92 None
93 }
94 })
95 .next()
96 .with_context(|| error::IllegalConfigSnafu {
97 msg: format!("Engine config not found in {:?}", config_path),
98 })?;
99 Ok((storage_config, mito_engine_config))
100}
101
102impl ObjbenchCommand {
103 pub async fn run(&self) -> error::Result<()> {
104 if self.verbose {
105 common_telemetry::init_default_ut_logging();
106 }
107
108 println!("{}", "Starting objbench with config:".cyan().bold());
109
110 let (store_cfg, mut mito_engine_config) = parse_config(&self.config)?;
112
113 let object_store = build_object_store(&store_cfg).await?;
114 println!("{} Object store initialized", "✓".green());
115
116 let components = parse_file_dir_components(&self.source)?;
118 println!(
119 "{} Source path parsed: {}, components: {:?}",
120 "✓".green(),
121 self.source,
122 components
123 );
124
125 println!("{}", "Loading parquet metadata...".yellow());
127 let file_size = object_store
128 .stat(&self.source)
129 .await
130 .map_err(|e| {
131 error::IllegalConfigSnafu {
132 msg: format!("stat failed: {e}"),
133 }
134 .build()
135 })?
136 .content_length();
137 let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
138 .await
139 .map_err(|e| {
140 error::IllegalConfigSnafu {
141 msg: format!("read parquet metadata failed: {e}"),
142 }
143 .build()
144 })?;
145
146 let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
147 let num_rows = parquet_meta.file_metadata().num_rows() as u64;
148 let num_row_groups = parquet_meta.num_row_groups() as u64;
149 let max_row_group_uncompressed_size: u64 = parquet_meta
150 .row_groups()
151 .iter()
152 .map(|rg| {
153 rg.columns()
154 .iter()
155 .map(|c| c.uncompressed_size() as u64)
156 .sum::<u64>()
157 })
158 .max()
159 .unwrap_or(0);
160
161 println!(
162 "{} Metadata loaded - rows: {}, size: {} bytes",
163 "✓".green(),
164 num_rows,
165 file_size
166 );
167
168 let file_meta = FileMeta {
170 region_id: region_meta.region_id,
171 file_id: components.file_id,
172 time_range: Default::default(),
173 level: 0,
174 file_size,
175 max_row_group_uncompressed_size,
176 available_indexes: Default::default(),
177 indexes: Default::default(),
178 index_file_size: 0,
179 index_version: 0,
180 num_rows,
181 num_row_groups,
182 sequence: None,
183 partition_expr: None,
184 num_series: 0,
185 };
186 let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
187
188 let table_dir = components.table_dir();
190 let (src_access_layer, cache_manager) = build_access_layer_simple(
191 &components,
192 object_store.clone(),
193 &mut mito_engine_config,
194 &store_cfg.data_home,
195 )
196 .await?;
197 let reader_build_start = Instant::now();
198
199 let reader = ParquetReaderBuilder::new(
200 table_dir,
201 components.path_type,
202 src_handle.clone(),
203 object_store.clone(),
204 )
205 .expected_metadata(Some(region_meta.clone()))
206 .build()
207 .await
208 .map_err(|e| {
209 error::IllegalConfigSnafu {
210 msg: format!("build reader failed: {e:?}"),
211 }
212 .build()
213 })?;
214
215 let reader_build_elapsed = reader_build_start.elapsed();
216 let total_rows = reader.parquet_metadata().file_metadata().num_rows();
217 println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed);
218
219 let fulltext_index_config = FulltextIndexConfig {
221 create_on_compaction: Mode::Disable,
222 ..Default::default()
223 };
224
225 let write_req = SstWriteRequest {
226 op_type: OperationType::Flush,
227 metadata: region_meta,
228 source: Either::Left(Source::Reader(Box::new(reader))),
229 cache_manager,
230 storage: None,
231 max_sequence: None,
232 index_options: Default::default(),
233 index_config: mito_engine_config.index.clone(),
234 inverted_index_config: MitoConfig::default().inverted_index,
235 fulltext_index_config,
236 bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
237 #[cfg(feature = "vector_index")]
238 vector_index_config: Default::default(),
239 };
240
241 println!("{}", "Writing SST...".yellow());
243
244 #[cfg(unix)]
246 let profiler_guard = if self.pprof_file.is_some() {
247 println!("{} Starting profiling...", "⚡".yellow());
248 Some(
249 pprof::ProfilerGuardBuilder::default()
250 .frequency(99)
251 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
252 .build()
253 .map_err(|e| {
254 error::IllegalConfigSnafu {
255 msg: format!("Failed to start profiler: {e}"),
256 }
257 .build()
258 })?,
259 )
260 } else {
261 None
262 };
263
264 #[cfg(not(unix))]
265 if self.pprof_file.is_some() {
266 eprintln!(
267 "{}: Profiling is not supported on this platform",
268 "Warning".yellow()
269 );
270 }
271
272 let write_start = Instant::now();
273 let mut metrics = Metrics::new(WriteType::Flush);
274 let infos = src_access_layer
275 .write_sst(write_req, &WriteOptions::default(), &mut metrics)
276 .await
277 .map_err(|e| {
278 error::IllegalConfigSnafu {
279 msg: format!("write_sst failed: {e:?}"),
280 }
281 .build()
282 })?;
283
284 let write_elapsed = write_start.elapsed();
285
286 #[cfg(unix)]
288 if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
289 println!("{} Generating flamegraph...", "🔥".yellow());
290 match guard.report().build() {
291 Ok(report) => {
292 let mut flamegraph_data = Vec::new();
293 if let Err(e) = report.flamegraph(&mut flamegraph_data) {
294 println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
295 } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
296 println!(
297 "{}: Failed to write flamegraph to {}: {}",
298 "Error".red(),
299 pprof_file.display(),
300 e
301 );
302 } else {
303 println!(
304 "{} Flamegraph saved to {}",
305 "✓".green(),
306 pprof_file.display().to_string().cyan()
307 );
308 }
309 }
310 Err(e) => {
311 println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
312 }
313 }
314 }
315 assert_eq!(infos.len(), 1);
316 let dst_file_id = infos[0].file_id;
317 let dst_file_path = format!("{}/{}.parquet", components.region_dir(), dst_file_id);
318 let mut dst_index_path = None;
319 if infos[0].index_metadata.file_size > 0 {
320 dst_index_path = Some(format!(
321 "{}/index/{}.puffin",
322 components.region_dir(),
323 dst_file_id
324 ));
325 }
326
327 println!("\n{} {}", "Write complete!".green().bold(), "✓".green());
329 println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
330 println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
331 println!(
332 " {}: {}",
333 "File size".bold(),
334 format!("{} bytes", file_size).cyan()
335 );
336 println!(
337 " {}: {:?}",
338 "Reader build time".bold(),
339 reader_build_elapsed
340 );
341 println!(" {}: {:?}", "Total time".bold(), write_elapsed);
342
343 println!(" {}: {:?}", "Metrics".bold(), metrics,);
345
346 println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
348
349 println!("\n{}", "Cleaning up...".yellow());
351 object_store.delete(&dst_file_path).await.map_err(|e| {
352 error::IllegalConfigSnafu {
353 msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
354 }
355 .build()
356 })?;
357 println!("{} Temporary file {} deleted", "✓".green(), dst_file_path);
358
359 if let Some(index_path) = dst_index_path {
360 object_store.delete(&index_path).await.map_err(|e| {
361 error::IllegalConfigSnafu {
362 msg: format!("Failed to delete dest index file {}: {}", index_path, e),
363 }
364 .build()
365 })?;
366 println!(
367 "{} Temporary index file {} deleted",
368 "✓".green(),
369 index_path
370 );
371 }
372
373 println!("\n{}", "Benchmark completed successfully!".green().bold());
374 Ok(())
375 }
376}
377
378#[derive(Debug)]
379struct FileDirComponents {
380 catalog: String,
381 schema: String,
382 table_id: u32,
383 region_sequence: u32,
384 path_type: PathType,
385 file_id: FileId,
386}
387
388impl FileDirComponents {
389 fn table_dir(&self) -> String {
390 format!("data/{}/{}/{}", self.catalog, self.schema, self.table_id)
391 }
392
393 fn region_dir(&self) -> String {
394 let region_name = region_name(self.table_id, self.region_sequence);
395 match self.path_type {
396 PathType::Bare => {
397 format!(
398 "data/{}/{}/{}/{}",
399 self.catalog, self.schema, self.table_id, region_name
400 )
401 }
402 PathType::Data => {
403 format!(
404 "data/{}/{}/{}/{}/data",
405 self.catalog, self.schema, self.table_id, region_name
406 )
407 }
408 PathType::Metadata => {
409 format!(
410 "data/{}/{}/{}/{}/metadata",
411 self.catalog, self.schema, self.table_id, region_name
412 )
413 }
414 }
415 }
416}
417
418fn parse_file_dir_components(path: &str) -> error::Result<FileDirComponents> {
419 let pattern =
421 r"^data/([^/]+)/([^/]+)/([^/]+)/([^/]+)_([^/]+)(?:/data|/metadata)?/(.+).parquet$";
422
423 let re = Regex::new(pattern).expect("Invalid regex pattern");
425
426 let path_type = if path.contains("/data/") {
428 PathType::Data
429 } else if path.contains("/metadata/") {
430 PathType::Metadata
431 } else {
432 PathType::Bare
433 };
434
435 let components = (|| {
437 let captures = re.captures(path)?;
438 if captures.len() != 7 {
439 return None;
440 }
441 let mut components = FileDirComponents {
442 catalog: "".to_string(),
443 schema: "".to_string(),
444 table_id: 0,
445 region_sequence: 0,
446 path_type,
447 file_id: FileId::default(),
448 };
449 components.catalog = captures.get(1)?.as_str().to_string();
451 components.schema = captures.get(2)?.as_str().to_string();
452 components.table_id = captures[3].parse().ok()?;
453 components.region_sequence = captures[5].parse().ok()?;
454 let file_id_str = &captures[6];
455 components.file_id = FileId::parse_str(file_id_str).ok()?;
456 Some(components)
457 })();
458 components.context(error::IllegalConfigSnafu {
459 msg: format!("Expect valid source file path, got: {}", path),
460 })
461}
462
463fn extract_region_metadata(
464 file_path: &str,
465 meta: &parquet::file::metadata::ParquetMetaData,
466) -> error::Result<RegionMetadataRef> {
467 let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
468 let Some(kvs) = kvs else {
469 return Err(error::IllegalConfigSnafu {
470 msg: format!("{file_path}: missing parquet key_value metadata"),
471 }
472 .build());
473 };
474 let json = kvs
475 .iter()
476 .find(|kv| kv.key == PARQUET_METADATA_KEY)
477 .and_then(|kv| kv.value.as_ref())
478 .ok_or_else(|| {
479 error::IllegalConfigSnafu {
480 msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
481 }
482 .build()
483 })?;
484 let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
485 error::IllegalConfigSnafu {
486 msg: format!("invalid region metadata json: {e}"),
487 }
488 .build()
489 })?;
490 Ok(Arc::new(region))
491}
492
493async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
494 store::new_object_store(sc.store.clone(), &sc.data_home)
495 .await
496 .map_err(|e| {
497 error::IllegalConfigSnafu {
498 msg: format!("Failed to build object store: {e:?}"),
499 }
500 .build()
501 })
502}
503
504async fn build_access_layer_simple(
505 components: &FileDirComponents,
506 object_store: ObjectStore,
507 config: &mut MitoConfig,
508 data_home: &str,
509) -> error::Result<(AccessLayerRef, CacheManagerRef)> {
510 let _ = config.index.sanitize(data_home, &config.inverted_index);
511 let puffin_manager = PuffinManagerFactory::new(
512 &config.index.aux_path,
513 config.index.staging_size.as_bytes(),
514 Some(config.index.write_buffer_size.as_bytes() as _),
515 config.index.staging_ttl,
516 )
517 .await
518 .map_err(|e| {
519 error::IllegalConfigSnafu {
520 msg: format!("Failed to build access layer: {e:?}"),
521 }
522 .build()
523 })?;
524
525 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
526 .await
527 .map_err(|e| {
528 error::IllegalConfigSnafu {
529 msg: format!("Failed to build IntermediateManager: {e:?}"),
530 }
531 .build()
532 })?
533 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
534
535 let cache_manager =
536 build_cache_manager(config, puffin_manager.clone(), intermediate_manager.clone()).await?;
537 let layer = AccessLayer::new(
538 components.table_dir(),
539 components.path_type,
540 object_store,
541 puffin_manager,
542 intermediate_manager,
543 );
544 Ok((Arc::new(layer), cache_manager))
545}
546
547async fn build_cache_manager(
548 config: &MitoConfig,
549 puffin_manager: PuffinManagerFactory,
550 intermediate_manager: IntermediateManager,
551) -> error::Result<CacheManagerRef> {
552 let write_cache = write_cache_from_config(config, puffin_manager, intermediate_manager)
553 .await
554 .map_err(|e| {
555 error::IllegalConfigSnafu {
556 msg: format!("Failed to build write cache: {e:?}"),
557 }
558 .build()
559 })?;
560 let cache_manager = Arc::new(
561 CacheManager::builder()
562 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
563 .vector_cache_size(config.vector_cache_size.as_bytes())
564 .page_cache_size(config.page_cache_size.as_bytes())
565 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
566 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
567 .index_content_size(config.index.content_cache_size.as_bytes())
568 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
569 .index_result_cache_size(config.index.result_cache_size.as_bytes())
570 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
571 .write_cache(write_cache)
572 .build(),
573 );
574 Ok(cache_manager)
575}
576
577fn new_noop_file_purger() -> FilePurgerRef {
578 #[derive(Debug)]
579 struct Noop;
580 impl FilePurger for Noop {
581 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {}
582 }
583 Arc::new(Noop)
584}
585
586async fn load_parquet_metadata(
587 object_store: ObjectStore,
588 path: &str,
589 file_size: u64,
590) -> Result<parquet::file::metadata::ParquetMetaData, Box<dyn std::error::Error + Send + Sync>> {
591 use parquet::file::FOOTER_SIZE;
592 use parquet::file::metadata::ParquetMetaDataReader;
593 let actual_size = if file_size == 0 {
594 object_store.stat(path).await?.content_length()
595 } else {
596 file_size
597 };
598 if actual_size < FOOTER_SIZE as u64 {
599 return Err("file too small".into());
600 }
601 let prefetch: u64 = 64 * 1024;
602 let start = actual_size.saturating_sub(prefetch);
603 let buffer = object_store
604 .read_with(path)
605 .range(start..actual_size)
606 .await?
607 .to_vec();
608 let buffer_len = buffer.len();
609 let mut footer = [0; 8];
610 footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
611 let footer = FooterTail::try_new(&footer)?;
612 let metadata_len = footer.metadata_length() as u64;
613 if actual_size - (FOOTER_SIZE as u64) < metadata_len {
614 return Err("invalid footer/metadata length".into());
615 }
616 if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
617 let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
618 let meta = ParquetMetaDataReader::decode_metadata(
619 &buffer[metadata_start..buffer_len - FOOTER_SIZE],
620 )?;
621 Ok(meta)
622 } else {
623 let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
624 let data = object_store
625 .read_with(path)
626 .range(metadata_start..(actual_size - FOOTER_SIZE as u64))
627 .await?
628 .to_vec();
629 let meta = ParquetMetaDataReader::decode_metadata(&data)?;
630 Ok(meta)
631 }
632}
633
634#[cfg(test)]
635mod tests {
636 use std::path::PathBuf;
637 use std::str::FromStr;
638
639 use common_base::readable_size::ReadableSize;
640 use store_api::region_request::PathType;
641
642 use crate::datanode::objbench::{parse_config, parse_file_dir_components};
643
644 #[test]
645 fn test_parse_dir() {
646 let meta_path = "data/greptime/public/1024/1024_0000000000/metadata/00020380-009c-426d-953e-b4e34c15af34.parquet";
647 let c = parse_file_dir_components(meta_path).unwrap();
648 assert_eq!(
649 c.file_id.to_string(),
650 "00020380-009c-426d-953e-b4e34c15af34"
651 );
652 assert_eq!(c.catalog, "greptime");
653 assert_eq!(c.schema, "public");
654 assert_eq!(c.table_id, 1024);
655 assert_eq!(c.region_sequence, 0);
656 assert_eq!(c.path_type, PathType::Metadata);
657
658 let c = parse_file_dir_components(
659 "data/greptime/public/1024/1024_0000000000/data/00020380-009c-426d-953e-b4e34c15af34.parquet",
660 ).unwrap();
661 assert_eq!(
662 c.file_id.to_string(),
663 "00020380-009c-426d-953e-b4e34c15af34"
664 );
665 assert_eq!(c.catalog, "greptime");
666 assert_eq!(c.schema, "public");
667 assert_eq!(c.table_id, 1024);
668 assert_eq!(c.region_sequence, 0);
669 assert_eq!(c.path_type, PathType::Data);
670
671 let c = parse_file_dir_components(
672 "data/greptime/public/1024/1024_0000000000/00020380-009c-426d-953e-b4e34c15af34.parquet",
673 ).unwrap();
674 assert_eq!(
675 c.file_id.to_string(),
676 "00020380-009c-426d-953e-b4e34c15af34"
677 );
678 assert_eq!(c.catalog, "greptime");
679 assert_eq!(c.schema, "public");
680 assert_eq!(c.table_id, 1024);
681 assert_eq!(c.region_sequence, 0);
682 assert_eq!(c.path_type, PathType::Bare);
683 }
684
685 #[test]
686 fn test_parse_config() {
687 let path = "../../config/datanode.example.toml";
688 let (storage, engine) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
689 assert_eq!(storage.data_home, "./greptimedb_data");
690 assert_eq!(engine.index.staging_size, ReadableSize::gb(2));
691 }
692}