1use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::Instant;
18
19use clap::Parser;
20use colored::Colorize;
21use datanode::config::RegionEngineConfig;
22use datanode::store;
23use either::Either;
24use mito2::access_layer::{
25 AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
26};
27use mito2::cache::{CacheManager, CacheManagerRef};
28use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
29use mito2::read::Source;
30use mito2::sst::file::{FileHandle, FileMeta};
31use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
32use mito2::sst::index::intermediate::IntermediateManager;
33use mito2::sst::index::puffin_manager::PuffinManagerFactory;
34use mito2::sst::parquet::reader::ParquetReaderBuilder;
35use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
36use mito2::worker::write_cache_from_config;
37use object_store::ObjectStore;
38use parquet::file::metadata::{FooterTail, KeyValue};
39use regex::Regex;
40use snafu::OptionExt;
41use store_api::metadata::{RegionMetadata, RegionMetadataRef};
42use store_api::path_utils::region_name;
43use store_api::region_request::PathType;
44use store_api::storage::FileId;
45
46use crate::datanode::{StorageConfig, StorageConfigWrapper};
47use crate::error;
48
49#[derive(Debug, Parser)]
51pub struct ObjbenchCommand {
52 #[clap(long, value_name = "FILE")]
54 pub config: PathBuf,
55
56 #[clap(long, value_name = "PATH")]
58 pub source: String,
59
60 #[clap(short, long, default_value_t = false)]
62 pub verbose: bool,
63
64 #[clap(long, value_name = "FILE")]
66 pub pprof_file: Option<PathBuf>,
67}
68
69pub(super) fn parse_config(
70 config_path: &PathBuf,
71) -> error::Result<(
72 StorageConfig,
73 MitoConfig,
74 common_wal::config::DatanodeWalConfig,
75)> {
76 let cfg_str = std::fs::read_to_string(config_path).map_err(|e| {
77 error::IllegalConfigSnafu {
78 msg: format!("failed to read config {}: {e}", config_path.display()),
79 }
80 .build()
81 })?;
82
83 let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
84 error::IllegalConfigSnafu {
85 msg: format!("failed to parse config {}: {e}", config_path.display()),
86 }
87 .build()
88 })?;
89
90 let wal_config = store_cfg.wal;
91 let storage_config = store_cfg.storage;
92 let mito_engine_config = store_cfg
93 .region_engine
94 .into_iter()
95 .filter_map(|c| {
96 if let RegionEngineConfig::Mito(mito) = c {
97 Some(mito)
98 } else {
99 None
100 }
101 })
102 .next()
103 .with_context(|| error::IllegalConfigSnafu {
104 msg: format!("Engine config not found in {:?}", config_path),
105 })?;
106 Ok((storage_config, mito_engine_config, wal_config))
107}
108
109impl ObjbenchCommand {
110 pub async fn run(&self) -> error::Result<()> {
111 if self.verbose {
112 common_telemetry::init_default_ut_logging();
113 }
114
115 println!("{}", "Starting objbench with config:".cyan().bold());
116
117 let (store_cfg, mut mito_engine_config, _wal_config) = parse_config(&self.config)?;
119
120 let object_store = build_object_store(&store_cfg).await?;
121 println!("{} Object store initialized", "✓".green());
122
123 let components = parse_file_dir_components(&self.source)?;
125 println!(
126 "{} Source path parsed: {}, components: {:?}",
127 "✓".green(),
128 self.source,
129 components
130 );
131
132 println!("{}", "Loading parquet metadata...".yellow());
134 let file_size = object_store
135 .stat(&self.source)
136 .await
137 .map_err(|e| {
138 error::IllegalConfigSnafu {
139 msg: format!("stat failed: {e}"),
140 }
141 .build()
142 })?
143 .content_length();
144 let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
145 .await
146 .map_err(|e| {
147 error::IllegalConfigSnafu {
148 msg: format!("read parquet metadata failed: {e}"),
149 }
150 .build()
151 })?;
152
153 let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
154 let num_rows = parquet_meta.file_metadata().num_rows() as u64;
155 let num_row_groups = parquet_meta.num_row_groups() as u64;
156 let max_row_group_uncompressed_size: u64 = parquet_meta
157 .row_groups()
158 .iter()
159 .map(|rg| {
160 rg.columns()
161 .iter()
162 .map(|c| c.uncompressed_size() as u64)
163 .sum::<u64>()
164 })
165 .max()
166 .unwrap_or(0);
167
168 println!(
169 "{} Metadata loaded - rows: {}, size: {} bytes",
170 "✓".green(),
171 num_rows,
172 file_size
173 );
174
175 let file_meta = FileMeta {
177 region_id: region_meta.region_id,
178 file_id: components.file_id,
179 time_range: Default::default(),
180 level: 0,
181 file_size,
182 max_row_group_uncompressed_size,
183 available_indexes: Default::default(),
184 indexes: Default::default(),
185 index_file_size: 0,
186 index_version: 0,
187 num_rows,
188 num_row_groups,
189 sequence: None,
190 partition_expr: None,
191 num_series: 0,
192 };
193 let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
194
195 let table_dir = components.table_dir();
197 let (src_access_layer, cache_manager) = build_access_layer_simple(
198 &components,
199 object_store.clone(),
200 &mut mito_engine_config,
201 &store_cfg.data_home,
202 )
203 .await?;
204 let reader_build_start = Instant::now();
205
206 let reader = ParquetReaderBuilder::new(
207 table_dir,
208 components.path_type,
209 src_handle.clone(),
210 object_store.clone(),
211 )
212 .expected_metadata(Some(region_meta.clone()))
213 .build()
214 .await
215 .map_err(|e| {
216 error::IllegalConfigSnafu {
217 msg: format!("build reader failed: {e:?}"),
218 }
219 .build()
220 })?;
221 let reader = reader.ok_or_else(|| {
222 error::IllegalConfigSnafu {
223 msg: format!(
224 "build reader returned no readable rows for source file {}",
225 src_handle.file_id()
226 ),
227 }
228 .build()
229 })?;
230
231 let reader_build_elapsed = reader_build_start.elapsed();
232 let total_rows = reader.parquet_metadata().file_metadata().num_rows();
233 println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed);
234
235 let fulltext_index_config = FulltextIndexConfig {
237 create_on_compaction: Mode::Disable,
238 ..Default::default()
239 };
240
241 let write_req = SstWriteRequest {
242 op_type: OperationType::Flush,
243 metadata: region_meta,
244 source: Either::Left(Source::Reader(Box::new(reader))),
245 cache_manager,
246 storage: None,
247 max_sequence: None,
248 index_options: Default::default(),
249 index_config: mito_engine_config.index.clone(),
250 inverted_index_config: MitoConfig::default().inverted_index,
251 fulltext_index_config,
252 bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
253 #[cfg(feature = "vector_index")]
254 vector_index_config: Default::default(),
255 };
256
257 println!("{}", "Writing SST...".yellow());
259
260 #[cfg(unix)]
262 let profiler_guard = if self.pprof_file.is_some() {
263 println!("{} Starting profiling...", "⚡".yellow());
264 Some(
265 pprof::ProfilerGuardBuilder::default()
266 .frequency(99)
267 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
268 .build()
269 .map_err(|e| {
270 error::IllegalConfigSnafu {
271 msg: format!("Failed to start profiler: {e}"),
272 }
273 .build()
274 })?,
275 )
276 } else {
277 None
278 };
279
280 #[cfg(not(unix))]
281 if self.pprof_file.is_some() {
282 eprintln!(
283 "{}: Profiling is not supported on this platform",
284 "Warning".yellow()
285 );
286 }
287
288 let write_start = Instant::now();
289 let mut metrics = Metrics::new(WriteType::Flush);
290 let infos = src_access_layer
291 .write_sst(write_req, &WriteOptions::default(), &mut metrics)
292 .await
293 .map_err(|e| {
294 error::IllegalConfigSnafu {
295 msg: format!("write_sst failed: {e:?}"),
296 }
297 .build()
298 })?;
299
300 let write_elapsed = write_start.elapsed();
301
302 #[cfg(unix)]
304 if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
305 println!("{} Generating flamegraph...", "🔥".yellow());
306 match guard.report().build() {
307 Ok(report) => {
308 let mut flamegraph_data = Vec::new();
309 if let Err(e) = report.flamegraph(&mut flamegraph_data) {
310 println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
311 } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
312 println!(
313 "{}: Failed to write flamegraph to {}: {}",
314 "Error".red(),
315 pprof_file.display(),
316 e
317 );
318 } else {
319 println!(
320 "{} Flamegraph saved to {}",
321 "✓".green(),
322 pprof_file.display().to_string().cyan()
323 );
324 }
325 }
326 Err(e) => {
327 println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
328 }
329 }
330 }
331 assert_eq!(infos.len(), 1);
332 let dst_file_id = infos[0].file_id;
333 let dst_file_path = format!("{}/{}.parquet", components.region_dir(), dst_file_id);
334 let mut dst_index_path = None;
335 if infos[0].index_metadata.file_size > 0 {
336 dst_index_path = Some(format!(
337 "{}/index/{}.puffin",
338 components.region_dir(),
339 dst_file_id
340 ));
341 }
342
343 println!("\n{} {}", "Write complete!".green().bold(), "✓".green());
345 println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
346 println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
347 println!(
348 " {}: {}",
349 "File size".bold(),
350 format!("{} bytes", file_size).cyan()
351 );
352 println!(
353 " {}: {:?}",
354 "Reader build time".bold(),
355 reader_build_elapsed
356 );
357 println!(" {}: {:?}", "Total time".bold(), write_elapsed);
358
359 println!(" {}: {:?}", "Metrics".bold(), metrics,);
361
362 println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
364
365 println!("\n{}", "Cleaning up...".yellow());
367 object_store.delete(&dst_file_path).await.map_err(|e| {
368 error::IllegalConfigSnafu {
369 msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
370 }
371 .build()
372 })?;
373 println!("{} Temporary file {} deleted", "✓".green(), dst_file_path);
374
375 if let Some(index_path) = dst_index_path {
376 object_store.delete(&index_path).await.map_err(|e| {
377 error::IllegalConfigSnafu {
378 msg: format!("Failed to delete dest index file {}: {}", index_path, e),
379 }
380 .build()
381 })?;
382 println!(
383 "{} Temporary index file {} deleted",
384 "✓".green(),
385 index_path
386 );
387 }
388
389 println!("\n{}", "Benchmark completed successfully!".green().bold());
390 Ok(())
391 }
392}
393
394#[derive(Debug)]
395struct FileDirComponents {
396 catalog: String,
397 schema: String,
398 table_id: u32,
399 region_sequence: u32,
400 path_type: PathType,
401 file_id: FileId,
402}
403
404impl FileDirComponents {
405 fn table_dir(&self) -> String {
406 format!("data/{}/{}/{}", self.catalog, self.schema, self.table_id)
407 }
408
409 fn region_dir(&self) -> String {
410 let region_name = region_name(self.table_id, self.region_sequence);
411 match self.path_type {
412 PathType::Bare => {
413 format!(
414 "data/{}/{}/{}/{}",
415 self.catalog, self.schema, self.table_id, region_name
416 )
417 }
418 PathType::Data => {
419 format!(
420 "data/{}/{}/{}/{}/data",
421 self.catalog, self.schema, self.table_id, region_name
422 )
423 }
424 PathType::Metadata => {
425 format!(
426 "data/{}/{}/{}/{}/metadata",
427 self.catalog, self.schema, self.table_id, region_name
428 )
429 }
430 }
431 }
432}
433
434fn parse_file_dir_components(path: &str) -> error::Result<FileDirComponents> {
435 let pattern =
437 r"^data/([^/]+)/([^/]+)/([^/]+)/([^/]+)_([^/]+)(?:/data|/metadata)?/(.+).parquet$";
438
439 let re = Regex::new(pattern).expect("Invalid regex pattern");
441
442 let path_type = if path.contains("/data/") {
444 PathType::Data
445 } else if path.contains("/metadata/") {
446 PathType::Metadata
447 } else {
448 PathType::Bare
449 };
450
451 let components = (|| {
453 let captures = re.captures(path)?;
454 if captures.len() != 7 {
455 return None;
456 }
457 let mut components = FileDirComponents {
458 catalog: "".to_string(),
459 schema: "".to_string(),
460 table_id: 0,
461 region_sequence: 0,
462 path_type,
463 file_id: FileId::default(),
464 };
465 components.catalog = captures.get(1)?.as_str().to_string();
467 components.schema = captures.get(2)?.as_str().to_string();
468 components.table_id = captures[3].parse().ok()?;
469 components.region_sequence = captures[5].parse().ok()?;
470 let file_id_str = &captures[6];
471 components.file_id = FileId::parse_str(file_id_str).ok()?;
472 Some(components)
473 })();
474 components.context(error::IllegalConfigSnafu {
475 msg: format!("Expect valid source file path, got: {}", path),
476 })
477}
478
479fn extract_region_metadata(
480 file_path: &str,
481 meta: &parquet::file::metadata::ParquetMetaData,
482) -> error::Result<RegionMetadataRef> {
483 let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
484 let Some(kvs) = kvs else {
485 return Err(error::IllegalConfigSnafu {
486 msg: format!("{file_path}: missing parquet key_value metadata"),
487 }
488 .build());
489 };
490 let json = kvs
491 .iter()
492 .find(|kv| kv.key == PARQUET_METADATA_KEY)
493 .and_then(|kv| kv.value.as_ref())
494 .ok_or_else(|| {
495 error::IllegalConfigSnafu {
496 msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
497 }
498 .build()
499 })?;
500 let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
501 error::IllegalConfigSnafu {
502 msg: format!("invalid region metadata json: {e}"),
503 }
504 .build()
505 })?;
506 Ok(Arc::new(region))
507}
508
509pub(super) async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
510 store::new_object_store(sc.store.clone(), &sc.data_home)
511 .await
512 .map_err(|e| {
513 error::IllegalConfigSnafu {
514 msg: format!("Failed to build object store: {e:?}"),
515 }
516 .build()
517 })
518}
519
520async fn build_access_layer_simple(
521 components: &FileDirComponents,
522 object_store: ObjectStore,
523 config: &mut MitoConfig,
524 data_home: &str,
525) -> error::Result<(AccessLayerRef, CacheManagerRef)> {
526 let _ = config.index.sanitize(data_home, &config.inverted_index);
527 let puffin_manager = PuffinManagerFactory::new(
528 &config.index.aux_path,
529 config.index.staging_size.as_bytes(),
530 Some(config.index.write_buffer_size.as_bytes() as _),
531 config.index.staging_ttl,
532 )
533 .await
534 .map_err(|e| {
535 error::IllegalConfigSnafu {
536 msg: format!("Failed to build access layer: {e:?}"),
537 }
538 .build()
539 })?;
540
541 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
542 .await
543 .map_err(|e| {
544 error::IllegalConfigSnafu {
545 msg: format!("Failed to build IntermediateManager: {e:?}"),
546 }
547 .build()
548 })?
549 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
550
551 let cache_manager =
552 build_cache_manager(config, puffin_manager.clone(), intermediate_manager.clone()).await?;
553 let layer = AccessLayer::new(
554 components.table_dir(),
555 components.path_type,
556 object_store,
557 puffin_manager,
558 intermediate_manager,
559 );
560 Ok((Arc::new(layer), cache_manager))
561}
562
563async fn build_cache_manager(
564 config: &MitoConfig,
565 puffin_manager: PuffinManagerFactory,
566 intermediate_manager: IntermediateManager,
567) -> error::Result<CacheManagerRef> {
568 let write_cache = write_cache_from_config(config, puffin_manager, intermediate_manager)
569 .await
570 .map_err(|e| {
571 error::IllegalConfigSnafu {
572 msg: format!("Failed to build write cache: {e:?}"),
573 }
574 .build()
575 })?;
576 let cache_manager = Arc::new(
577 CacheManager::builder()
578 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
579 .vector_cache_size(config.vector_cache_size.as_bytes())
580 .page_cache_size(config.page_cache_size.as_bytes())
581 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
582 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
583 .index_content_size(config.index.content_cache_size.as_bytes())
584 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
585 .index_result_cache_size(config.index.result_cache_size.as_bytes())
586 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
587 .write_cache(write_cache)
588 .build(),
589 );
590 Ok(cache_manager)
591}
592
593fn new_noop_file_purger() -> FilePurgerRef {
594 #[derive(Debug)]
595 struct Noop;
596 impl FilePurger for Noop {
597 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {}
598 }
599 Arc::new(Noop)
600}
601
602async fn load_parquet_metadata(
603 object_store: ObjectStore,
604 path: &str,
605 file_size: u64,
606) -> Result<parquet::file::metadata::ParquetMetaData, Box<dyn std::error::Error + Send + Sync>> {
607 use parquet::file::FOOTER_SIZE;
608 use parquet::file::metadata::ParquetMetaDataReader;
609 let actual_size = if file_size == 0 {
610 object_store.stat(path).await?.content_length()
611 } else {
612 file_size
613 };
614 if actual_size < FOOTER_SIZE as u64 {
615 return Err("file too small".into());
616 }
617 let prefetch: u64 = 64 * 1024;
618 let start = actual_size.saturating_sub(prefetch);
619 let buffer = object_store
620 .read_with(path)
621 .range(start..actual_size)
622 .await?
623 .to_vec();
624 let buffer_len = buffer.len();
625 let mut footer = [0; 8];
626 footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
627 let footer = FooterTail::try_new(&footer)?;
628 let metadata_len = footer.metadata_length() as u64;
629 if actual_size - (FOOTER_SIZE as u64) < metadata_len {
630 return Err("invalid footer/metadata length".into());
631 }
632 if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
633 let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
634 let meta = ParquetMetaDataReader::decode_metadata(
635 &buffer[metadata_start..buffer_len - FOOTER_SIZE],
636 )?;
637 Ok(meta)
638 } else {
639 let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
640 let data = object_store
641 .read_with(path)
642 .range(metadata_start..(actual_size - FOOTER_SIZE as u64))
643 .await?
644 .to_vec();
645 let meta = ParquetMetaDataReader::decode_metadata(&data)?;
646 Ok(meta)
647 }
648}
649
650#[cfg(test)]
651mod tests {
652 use std::path::PathBuf;
653 use std::str::FromStr;
654
655 use common_base::readable_size::ReadableSize;
656 use store_api::region_request::PathType;
657
658 use crate::datanode::objbench::{parse_config, parse_file_dir_components};
659
660 #[test]
661 fn test_parse_dir() {
662 let meta_path = "data/greptime/public/1024/1024_0000000000/metadata/00020380-009c-426d-953e-b4e34c15af34.parquet";
663 let c = parse_file_dir_components(meta_path).unwrap();
664 assert_eq!(
665 c.file_id.to_string(),
666 "00020380-009c-426d-953e-b4e34c15af34"
667 );
668 assert_eq!(c.catalog, "greptime");
669 assert_eq!(c.schema, "public");
670 assert_eq!(c.table_id, 1024);
671 assert_eq!(c.region_sequence, 0);
672 assert_eq!(c.path_type, PathType::Metadata);
673
674 let c = parse_file_dir_components(
675 "data/greptime/public/1024/1024_0000000000/data/00020380-009c-426d-953e-b4e34c15af34.parquet",
676 ).unwrap();
677 assert_eq!(
678 c.file_id.to_string(),
679 "00020380-009c-426d-953e-b4e34c15af34"
680 );
681 assert_eq!(c.catalog, "greptime");
682 assert_eq!(c.schema, "public");
683 assert_eq!(c.table_id, 1024);
684 assert_eq!(c.region_sequence, 0);
685 assert_eq!(c.path_type, PathType::Data);
686
687 let c = parse_file_dir_components(
688 "data/greptime/public/1024/1024_0000000000/00020380-009c-426d-953e-b4e34c15af34.parquet",
689 ).unwrap();
690 assert_eq!(
691 c.file_id.to_string(),
692 "00020380-009c-426d-953e-b4e34c15af34"
693 );
694 assert_eq!(c.catalog, "greptime");
695 assert_eq!(c.schema, "public");
696 assert_eq!(c.table_id, 1024);
697 assert_eq!(c.region_sequence, 0);
698 assert_eq!(c.path_type, PathType::Bare);
699 }
700
701 #[test]
702 fn test_parse_config() {
703 let path = "../../config/datanode.example.toml";
704 let (storage, engine, _wal) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
705 assert_eq!(storage.data_home, "./greptimedb_data");
706 assert_eq!(engine.index.staging_size, ReadableSize::gb(2));
707 }
708}