1use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::Instant;
18
19use clap::Parser;
20use colored::Colorize;
21use datanode::config::RegionEngineConfig;
22use datanode::store;
23use futures::stream;
24use mito2::access_layer::{
25 AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
26};
27use mito2::cache::{CacheManager, CacheManagerRef};
28use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
29use mito2::read::FlatSource;
30use mito2::sst::FormatType;
31use mito2::sst::file::{FileHandle, FileMeta};
32use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
33use mito2::sst::index::intermediate::IntermediateManager;
34use mito2::sst::index::puffin_manager::PuffinManagerFactory;
35use mito2::sst::parquet::reader::ParquetReaderBuilder;
36use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
37use mito2::worker::write_cache_from_config;
38use object_store::ObjectStore;
39use parquet::file::metadata::{FooterTail, KeyValue};
40use regex::Regex;
41use snafu::OptionExt;
42use store_api::metadata::{RegionMetadata, RegionMetadataRef};
43use store_api::path_utils::region_name;
44use store_api::region_request::PathType;
45use store_api::storage::FileId;
46
47use crate::datanode::{StorageConfig, StorageConfigWrapper};
48use crate::error;
49
50#[derive(Debug, Parser)]
52pub struct ObjbenchCommand {
53 #[clap(long, value_name = "FILE")]
55 pub config: PathBuf,
56
57 #[clap(long, value_name = "PATH")]
59 pub source: String,
60
61 #[clap(short, long, default_value_t = false)]
63 pub verbose: bool,
64
65 #[clap(long, value_name = "FILE")]
67 pub pprof_file: Option<PathBuf>,
68}
69
70pub(super) fn parse_config(
71 config_path: &PathBuf,
72) -> error::Result<(
73 StorageConfig,
74 MitoConfig,
75 common_wal::config::DatanodeWalConfig,
76)> {
77 let cfg_str = std::fs::read_to_string(config_path).map_err(|e| {
78 error::IllegalConfigSnafu {
79 msg: format!("failed to read config {}: {e}", config_path.display()),
80 }
81 .build()
82 })?;
83
84 let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
85 error::IllegalConfigSnafu {
86 msg: format!("failed to parse config {}: {e}", config_path.display()),
87 }
88 .build()
89 })?;
90
91 let wal_config = store_cfg.wal;
92 let storage_config = store_cfg.storage;
93 let mito_engine_config = store_cfg
94 .region_engine
95 .into_iter()
96 .filter_map(|c| {
97 if let RegionEngineConfig::Mito(mito) = c {
98 Some(mito)
99 } else {
100 None
101 }
102 })
103 .next()
104 .with_context(|| error::IllegalConfigSnafu {
105 msg: format!("Engine config not found in {:?}", config_path),
106 })?;
107 Ok((storage_config, mito_engine_config, wal_config))
108}
109
110impl ObjbenchCommand {
111 pub async fn run(&self) -> error::Result<()> {
112 if self.verbose {
113 common_telemetry::init_default_ut_logging();
114 }
115
116 println!("{}", "Starting objbench with config:".cyan().bold());
117
118 let (store_cfg, mut mito_engine_config, _wal_config) = parse_config(&self.config)?;
120
121 let object_store = build_object_store(&store_cfg).await?;
122 println!("{} Object store initialized", "✓".green());
123
124 let components = parse_file_dir_components(&self.source)?;
126 println!(
127 "{} Source path parsed: {}, components: {:?}",
128 "✓".green(),
129 self.source,
130 components
131 );
132
133 println!("{}", "Loading parquet metadata...".yellow());
135 let file_size = object_store
136 .stat(&self.source)
137 .await
138 .map_err(|e| {
139 error::IllegalConfigSnafu {
140 msg: format!("stat failed: {e}"),
141 }
142 .build()
143 })?
144 .content_length();
145 let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
146 .await
147 .map_err(|e| {
148 error::IllegalConfigSnafu {
149 msg: format!("read parquet metadata failed: {e}"),
150 }
151 .build()
152 })?;
153
154 let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
155 let num_rows = parquet_meta.file_metadata().num_rows() as u64;
156 let num_row_groups = parquet_meta.num_row_groups() as u64;
157 let max_row_group_uncompressed_size: u64 = parquet_meta
158 .row_groups()
159 .iter()
160 .map(|rg| {
161 rg.columns()
162 .iter()
163 .map(|c| c.uncompressed_size() as u64)
164 .sum::<u64>()
165 })
166 .max()
167 .unwrap_or(0);
168
169 println!(
170 "{} Metadata loaded - rows: {}, size: {} bytes",
171 "✓".green(),
172 num_rows,
173 file_size
174 );
175
176 let file_meta = FileMeta {
178 region_id: region_meta.region_id,
179 file_id: components.file_id,
180 time_range: Default::default(),
181 level: 0,
182 file_size,
183 max_row_group_uncompressed_size,
184 available_indexes: Default::default(),
185 indexes: Default::default(),
186 index_file_size: 0,
187 index_version: 0,
188 num_rows,
189 num_row_groups,
190 sequence: None,
191 partition_expr: None,
192 num_series: 0,
193 ..Default::default()
194 };
195 let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
196
197 let table_dir = components.table_dir();
199 let (src_access_layer, cache_manager) = build_access_layer_simple(
200 &components,
201 object_store.clone(),
202 &mut mito_engine_config,
203 &store_cfg.data_home,
204 )
205 .await?;
206 let reader_build_start = Instant::now();
207
208 let reader = ParquetReaderBuilder::new(
209 table_dir,
210 components.path_type,
211 src_handle.clone(),
212 object_store.clone(),
213 )
214 .expected_metadata(Some(region_meta.clone()))
215 .build()
216 .await
217 .map_err(|e| {
218 error::IllegalConfigSnafu {
219 msg: format!("build reader failed: {e:?}"),
220 }
221 .build()
222 })?;
223 let reader = reader.ok_or_else(|| {
224 error::IllegalConfigSnafu {
225 msg: format!(
226 "build reader returned no readable rows for source file {}",
227 src_handle.file_id()
228 ),
229 }
230 .build()
231 })?;
232
233 let reader_build_elapsed = reader_build_start.elapsed();
234 let total_rows = reader.parquet_metadata().file_metadata().num_rows();
235 println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed);
236 let reader_stream = Box::pin(stream::try_unfold(reader, |mut reader| async move {
237 let batch = reader.next_record_batch().await?;
238 Ok(batch.map(|batch| (batch, reader)))
239 }));
240
241 let fulltext_index_config = FulltextIndexConfig {
243 create_on_compaction: Mode::Disable,
244 ..Default::default()
245 };
246
247 let source =
248 FlatSource::new_stream(region_meta.schema.arrow_schema().clone(), reader_stream);
249 let write_req = SstWriteRequest {
250 op_type: OperationType::Flush,
251 metadata: region_meta,
252 source,
253 cache_manager,
254 storage: None,
255 max_sequence: None,
256 sst_write_format: FormatType::PrimaryKey,
257 index_options: Default::default(),
258 index_config: mito_engine_config.index.clone(),
259 inverted_index_config: MitoConfig::default().inverted_index,
260 fulltext_index_config,
261 bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
262 #[cfg(feature = "vector_index")]
263 vector_index_config: Default::default(),
264 };
265
266 println!("{}", "Writing SST...".yellow());
268
269 #[cfg(unix)]
271 let profiler_guard = if self.pprof_file.is_some() {
272 println!("{} Starting profiling...", "⚡".yellow());
273 Some(
274 pprof::ProfilerGuardBuilder::default()
275 .frequency(99)
276 .blocklist(&["libc", "libgcc", "pthread", "vdso"])
277 .build()
278 .map_err(|e| {
279 error::IllegalConfigSnafu {
280 msg: format!("Failed to start profiler: {e}"),
281 }
282 .build()
283 })?,
284 )
285 } else {
286 None
287 };
288
289 #[cfg(not(unix))]
290 if self.pprof_file.is_some() {
291 eprintln!(
292 "{}: Profiling is not supported on this platform",
293 "Warning".yellow()
294 );
295 }
296
297 let write_start = Instant::now();
298 let mut metrics = Metrics::new(WriteType::Flush);
299 let infos = src_access_layer
300 .write_sst(write_req, &WriteOptions::default(), &mut metrics)
301 .await
302 .map_err(|e| {
303 error::IllegalConfigSnafu {
304 msg: format!("write_sst failed: {e:?}"),
305 }
306 .build()
307 })?;
308
309 let write_elapsed = write_start.elapsed();
310
311 #[cfg(unix)]
313 if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
314 println!("{} Generating flamegraph...", "🔥".yellow());
315 match guard.report().build() {
316 Ok(report) => {
317 let mut flamegraph_data = Vec::new();
318 if let Err(e) = report.flamegraph(&mut flamegraph_data) {
319 println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
320 } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
321 println!(
322 "{}: Failed to write flamegraph to {}: {}",
323 "Error".red(),
324 pprof_file.display(),
325 e
326 );
327 } else {
328 println!(
329 "{} Flamegraph saved to {}",
330 "✓".green(),
331 pprof_file.display().to_string().cyan()
332 );
333 }
334 }
335 Err(e) => {
336 println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
337 }
338 }
339 }
340 assert_eq!(infos.len(), 1);
341 let dst_file_id = infos[0].file_id;
342 let dst_file_path = format!("{}/{}.parquet", components.region_dir(), dst_file_id);
343 let mut dst_index_path = None;
344 if infos[0].index_metadata.file_size > 0 {
345 dst_index_path = Some(format!(
346 "{}/index/{}.puffin",
347 components.region_dir(),
348 dst_file_id
349 ));
350 }
351
352 println!("\n{} {}", "Write complete!".green().bold(), "✓".green());
354 println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
355 println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
356 println!(
357 " {}: {}",
358 "File size".bold(),
359 format!("{} bytes", file_size).cyan()
360 );
361 println!(
362 " {}: {:?}",
363 "Reader build time".bold(),
364 reader_build_elapsed
365 );
366 println!(" {}: {:?}", "Total time".bold(), write_elapsed);
367
368 println!(" {}: {:?}", "Metrics".bold(), metrics,);
370
371 println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
373
374 println!("\n{}", "Cleaning up...".yellow());
376 object_store.delete(&dst_file_path).await.map_err(|e| {
377 error::IllegalConfigSnafu {
378 msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
379 }
380 .build()
381 })?;
382 println!("{} Temporary file {} deleted", "✓".green(), dst_file_path);
383
384 if let Some(index_path) = dst_index_path {
385 object_store.delete(&index_path).await.map_err(|e| {
386 error::IllegalConfigSnafu {
387 msg: format!("Failed to delete dest index file {}: {}", index_path, e),
388 }
389 .build()
390 })?;
391 println!(
392 "{} Temporary index file {} deleted",
393 "✓".green(),
394 index_path
395 );
396 }
397
398 println!("\n{}", "Benchmark completed successfully!".green().bold());
399 Ok(())
400 }
401}
402
403#[derive(Debug)]
404struct FileDirComponents {
405 catalog: String,
406 schema: String,
407 table_id: u32,
408 region_sequence: u32,
409 path_type: PathType,
410 file_id: FileId,
411}
412
413impl FileDirComponents {
414 fn table_dir(&self) -> String {
415 format!("data/{}/{}/{}", self.catalog, self.schema, self.table_id)
416 }
417
418 fn region_dir(&self) -> String {
419 let region_name = region_name(self.table_id, self.region_sequence);
420 match self.path_type {
421 PathType::Bare => {
422 format!(
423 "data/{}/{}/{}/{}",
424 self.catalog, self.schema, self.table_id, region_name
425 )
426 }
427 PathType::Data => {
428 format!(
429 "data/{}/{}/{}/{}/data",
430 self.catalog, self.schema, self.table_id, region_name
431 )
432 }
433 PathType::Metadata => {
434 format!(
435 "data/{}/{}/{}/{}/metadata",
436 self.catalog, self.schema, self.table_id, region_name
437 )
438 }
439 }
440 }
441}
442
443fn parse_file_dir_components(path: &str) -> error::Result<FileDirComponents> {
444 let pattern =
446 r"^data/([^/]+)/([^/]+)/([^/]+)/([^/]+)_([^/]+)(?:/data|/metadata)?/(.+).parquet$";
447
448 let re = Regex::new(pattern).expect("Invalid regex pattern");
450
451 let path_type = if path.contains("/data/") {
453 PathType::Data
454 } else if path.contains("/metadata/") {
455 PathType::Metadata
456 } else {
457 PathType::Bare
458 };
459
460 let components = (|| {
462 let captures = re.captures(path)?;
463 if captures.len() != 7 {
464 return None;
465 }
466 let mut components = FileDirComponents {
467 catalog: "".to_string(),
468 schema: "".to_string(),
469 table_id: 0,
470 region_sequence: 0,
471 path_type,
472 file_id: FileId::default(),
473 };
474 components.catalog = captures.get(1)?.as_str().to_string();
476 components.schema = captures.get(2)?.as_str().to_string();
477 components.table_id = captures[3].parse().ok()?;
478 components.region_sequence = captures[5].parse().ok()?;
479 let file_id_str = &captures[6];
480 components.file_id = FileId::parse_str(file_id_str).ok()?;
481 Some(components)
482 })();
483 components.context(error::IllegalConfigSnafu {
484 msg: format!("Expect valid source file path, got: {}", path),
485 })
486}
487
488fn extract_region_metadata(
489 file_path: &str,
490 meta: &parquet::file::metadata::ParquetMetaData,
491) -> error::Result<RegionMetadataRef> {
492 let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
493 let Some(kvs) = kvs else {
494 return Err(error::IllegalConfigSnafu {
495 msg: format!("{file_path}: missing parquet key_value metadata"),
496 }
497 .build());
498 };
499 let json = kvs
500 .iter()
501 .find(|kv| kv.key == PARQUET_METADATA_KEY)
502 .and_then(|kv| kv.value.as_ref())
503 .ok_or_else(|| {
504 error::IllegalConfigSnafu {
505 msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
506 }
507 .build()
508 })?;
509 let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
510 error::IllegalConfigSnafu {
511 msg: format!("invalid region metadata json: {e}"),
512 }
513 .build()
514 })?;
515 Ok(Arc::new(region))
516}
517
518pub(super) async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
519 store::new_object_store(sc.store.clone(), &sc.data_home)
520 .await
521 .map_err(|e| {
522 error::IllegalConfigSnafu {
523 msg: format!("Failed to build object store: {e:?}"),
524 }
525 .build()
526 })
527}
528
529async fn build_access_layer_simple(
530 components: &FileDirComponents,
531 object_store: ObjectStore,
532 config: &mut MitoConfig,
533 data_home: &str,
534) -> error::Result<(AccessLayerRef, CacheManagerRef)> {
535 let _ = config.index.sanitize(data_home, &config.inverted_index);
536 let puffin_manager = PuffinManagerFactory::new(
537 &config.index.aux_path,
538 config.index.staging_size.as_bytes(),
539 Some(config.index.write_buffer_size.as_bytes() as _),
540 config.index.staging_ttl,
541 )
542 .await
543 .map_err(|e| {
544 error::IllegalConfigSnafu {
545 msg: format!("Failed to build access layer: {e:?}"),
546 }
547 .build()
548 })?;
549
550 let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
551 .await
552 .map_err(|e| {
553 error::IllegalConfigSnafu {
554 msg: format!("Failed to build IntermediateManager: {e:?}"),
555 }
556 .build()
557 })?
558 .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
559
560 let cache_manager =
561 build_cache_manager(config, puffin_manager.clone(), intermediate_manager.clone()).await?;
562 let layer = AccessLayer::new(
563 components.table_dir(),
564 components.path_type,
565 object_store,
566 puffin_manager,
567 intermediate_manager,
568 );
569 Ok((Arc::new(layer), cache_manager))
570}
571
572async fn build_cache_manager(
573 config: &MitoConfig,
574 puffin_manager: PuffinManagerFactory,
575 intermediate_manager: IntermediateManager,
576) -> error::Result<CacheManagerRef> {
577 let write_cache = write_cache_from_config(config, puffin_manager, intermediate_manager)
578 .await
579 .map_err(|e| {
580 error::IllegalConfigSnafu {
581 msg: format!("Failed to build write cache: {e:?}"),
582 }
583 .build()
584 })?;
585 let cache_manager = Arc::new(
586 CacheManager::builder()
587 .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
588 .vector_cache_size(config.vector_cache_size.as_bytes())
589 .page_cache_size(config.page_cache_size.as_bytes())
590 .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
591 .index_metadata_size(config.index.metadata_cache_size.as_bytes())
592 .index_content_size(config.index.content_cache_size.as_bytes())
593 .index_content_page_size(config.index.content_cache_page_size.as_bytes())
594 .index_result_cache_size(config.index.result_cache_size.as_bytes())
595 .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
596 .write_cache(write_cache)
597 .build(),
598 );
599 Ok(cache_manager)
600}
601
602fn new_noop_file_purger() -> FilePurgerRef {
603 #[derive(Debug)]
604 struct Noop;
605 impl FilePurger for Noop {
606 fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {}
607 }
608 Arc::new(Noop)
609}
610
611async fn load_parquet_metadata(
612 object_store: ObjectStore,
613 path: &str,
614 file_size: u64,
615) -> Result<parquet::file::metadata::ParquetMetaData, Box<dyn std::error::Error + Send + Sync>> {
616 use parquet::file::FOOTER_SIZE;
617 use parquet::file::metadata::ParquetMetaDataReader;
618 let actual_size = if file_size == 0 {
619 object_store.stat(path).await?.content_length()
620 } else {
621 file_size
622 };
623 if actual_size < FOOTER_SIZE as u64 {
624 return Err("file too small".into());
625 }
626 let prefetch: u64 = 64 * 1024;
627 let start = actual_size.saturating_sub(prefetch);
628 let buffer = object_store
629 .read_with(path)
630 .range(start..actual_size)
631 .await?
632 .to_vec();
633 let buffer_len = buffer.len();
634 let mut footer = [0; 8];
635 footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
636 let footer = FooterTail::try_new(&footer)?;
637 let metadata_len = footer.metadata_length() as u64;
638 if actual_size - (FOOTER_SIZE as u64) < metadata_len {
639 return Err("invalid footer/metadata length".into());
640 }
641 if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
642 let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
643 let meta = ParquetMetaDataReader::decode_metadata(
644 &buffer[metadata_start..buffer_len - FOOTER_SIZE],
645 )?;
646 Ok(meta)
647 } else {
648 let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
649 let data = object_store
650 .read_with(path)
651 .range(metadata_start..(actual_size - FOOTER_SIZE as u64))
652 .await?
653 .to_vec();
654 let meta = ParquetMetaDataReader::decode_metadata(&data)?;
655 Ok(meta)
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use std::path::PathBuf;
662 use std::str::FromStr;
663
664 use common_base::readable_size::ReadableSize;
665 use store_api::region_request::PathType;
666
667 use crate::datanode::objbench::{parse_config, parse_file_dir_components};
668
669 #[test]
670 fn test_parse_dir() {
671 let meta_path = "data/greptime/public/1024/1024_0000000000/metadata/00020380-009c-426d-953e-b4e34c15af34.parquet";
672 let c = parse_file_dir_components(meta_path).unwrap();
673 assert_eq!(
674 c.file_id.to_string(),
675 "00020380-009c-426d-953e-b4e34c15af34"
676 );
677 assert_eq!(c.catalog, "greptime");
678 assert_eq!(c.schema, "public");
679 assert_eq!(c.table_id, 1024);
680 assert_eq!(c.region_sequence, 0);
681 assert_eq!(c.path_type, PathType::Metadata);
682
683 let c = parse_file_dir_components(
684 "data/greptime/public/1024/1024_0000000000/data/00020380-009c-426d-953e-b4e34c15af34.parquet",
685 ).unwrap();
686 assert_eq!(
687 c.file_id.to_string(),
688 "00020380-009c-426d-953e-b4e34c15af34"
689 );
690 assert_eq!(c.catalog, "greptime");
691 assert_eq!(c.schema, "public");
692 assert_eq!(c.table_id, 1024);
693 assert_eq!(c.region_sequence, 0);
694 assert_eq!(c.path_type, PathType::Data);
695
696 let c = parse_file_dir_components(
697 "data/greptime/public/1024/1024_0000000000/00020380-009c-426d-953e-b4e34c15af34.parquet",
698 ).unwrap();
699 assert_eq!(
700 c.file_id.to_string(),
701 "00020380-009c-426d-953e-b4e34c15af34"
702 );
703 assert_eq!(c.catalog, "greptime");
704 assert_eq!(c.schema, "public");
705 assert_eq!(c.table_id, 1024);
706 assert_eq!(c.region_sequence, 0);
707 assert_eq!(c.path_type, PathType::Bare);
708 }
709
710 #[test]
711 fn test_parse_config() {
712 let path = "../../config/datanode.example.toml";
713 let (storage, engine, _wal) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
714 assert_eq!(storage.data_home, "./greptimedb_data");
715 assert_eq!(engine.index.staging_size, ReadableSize::gb(2));
716 }
717}