cmd/datanode/
objbench.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::PathBuf;
16use std::sync::Arc;
17use std::time::Instant;
18
19use clap::Parser;
20use colored::Colorize;
21use datanode::config::RegionEngineConfig;
22use datanode::store;
23use either::Either;
24use mito2::access_layer::{
25    AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
26};
27use mito2::cache::{CacheManager, CacheManagerRef};
28use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
29use mito2::read::Source;
30use mito2::sst::file::{FileHandle, FileMeta};
31use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
32use mito2::sst::index::intermediate::IntermediateManager;
33use mito2::sst::index::puffin_manager::PuffinManagerFactory;
34use mito2::sst::parquet::reader::ParquetReaderBuilder;
35use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
36use mito2::worker::write_cache_from_config;
37use object_store::ObjectStore;
38use parquet::file::metadata::{FooterTail, KeyValue};
39use regex::Regex;
40use snafu::OptionExt;
41use store_api::metadata::{RegionMetadata, RegionMetadataRef};
42use store_api::path_utils::region_name;
43use store_api::region_request::PathType;
44use store_api::storage::FileId;
45
46use crate::datanode::{StorageConfig, StorageConfigWrapper};
47use crate::error;
48
49/// Object storage benchmark command
50#[derive(Debug, Parser)]
51pub struct ObjbenchCommand {
52    /// Path to the object-store config file (TOML). Must deserialize into object_store::config::ObjectStoreConfig.
53    #[clap(long, value_name = "FILE")]
54    pub config: PathBuf,
55
56    /// Source SST file path in object-store (e.g. "region_dir/<uuid>.parquet").
57    #[clap(long, value_name = "PATH")]
58    pub source: String,
59
60    /// Verbose output
61    #[clap(short, long, default_value_t = false)]
62    pub verbose: bool,
63
64    /// Output file path for pprof flamegraph (enables profiling)
65    #[clap(long, value_name = "FILE")]
66    pub pprof_file: Option<PathBuf>,
67}
68
69fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConfig)> {
70    let cfg_str = std::fs::read_to_string(config_path).map_err(|e| {
71        error::IllegalConfigSnafu {
72            msg: format!("failed to read config {}: {e}", config_path.display()),
73        }
74        .build()
75    })?;
76
77    let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
78        error::IllegalConfigSnafu {
79            msg: format!("failed to parse config {}: {e}", config_path.display()),
80        }
81        .build()
82    })?;
83
84    let storage_config = store_cfg.storage;
85    let mito_engine_config = store_cfg
86        .region_engine
87        .into_iter()
88        .filter_map(|c| {
89            if let RegionEngineConfig::Mito(mito) = c {
90                Some(mito)
91            } else {
92                None
93            }
94        })
95        .next()
96        .with_context(|| error::IllegalConfigSnafu {
97            msg: format!("Engine config not found in {:?}", config_path),
98        })?;
99    Ok((storage_config, mito_engine_config))
100}
101
102impl ObjbenchCommand {
103    pub async fn run(&self) -> error::Result<()> {
104        if self.verbose {
105            common_telemetry::init_default_ut_logging();
106        }
107
108        println!("{}", "Starting objbench with config:".cyan().bold());
109
110        // Build object store from config
111        let (store_cfg, mut mito_engine_config) = parse_config(&self.config)?;
112
113        let object_store = build_object_store(&store_cfg).await?;
114        println!("{} Object store initialized", "✓".green());
115
116        // Prepare source identifiers
117        let components = parse_file_dir_components(&self.source)?;
118        println!(
119            "{} Source path parsed: {}, components: {:?}",
120            "✓".green(),
121            self.source,
122            components
123        );
124
125        // Load parquet metadata to extract RegionMetadata and file stats
126        println!("{}", "Loading parquet metadata...".yellow());
127        let file_size = object_store
128            .stat(&self.source)
129            .await
130            .map_err(|e| {
131                error::IllegalConfigSnafu {
132                    msg: format!("stat failed: {e}"),
133                }
134                .build()
135            })?
136            .content_length();
137        let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
138            .await
139            .map_err(|e| {
140                error::IllegalConfigSnafu {
141                    msg: format!("read parquet metadata failed: {e}"),
142                }
143                .build()
144            })?;
145
146        let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
147        let num_rows = parquet_meta.file_metadata().num_rows() as u64;
148        let num_row_groups = parquet_meta.num_row_groups() as u64;
149        let max_row_group_uncompressed_size: u64 = parquet_meta
150            .row_groups()
151            .iter()
152            .map(|rg| {
153                rg.columns()
154                    .iter()
155                    .map(|c| c.uncompressed_size() as u64)
156                    .sum::<u64>()
157            })
158            .max()
159            .unwrap_or(0);
160
161        println!(
162            "{} Metadata loaded - rows: {}, size: {} bytes",
163            "✓".green(),
164            num_rows,
165            file_size
166        );
167
168        // Build a FileHandle for the source file
169        let file_meta = FileMeta {
170            region_id: region_meta.region_id,
171            file_id: components.file_id,
172            time_range: Default::default(),
173            level: 0,
174            file_size,
175            max_row_group_uncompressed_size,
176            available_indexes: Default::default(),
177            indexes: Default::default(),
178            index_file_size: 0,
179            index_version: 0,
180            num_rows,
181            num_row_groups,
182            sequence: None,
183            partition_expr: None,
184            num_series: 0,
185        };
186        let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
187
188        // Build the reader for a single file via ParquetReaderBuilder
189        let table_dir = components.table_dir();
190        let (src_access_layer, cache_manager) = build_access_layer_simple(
191            &components,
192            object_store.clone(),
193            &mut mito_engine_config,
194            &store_cfg.data_home,
195        )
196        .await?;
197        let reader_build_start = Instant::now();
198
199        let reader = ParquetReaderBuilder::new(
200            table_dir,
201            components.path_type,
202            src_handle.clone(),
203            object_store.clone(),
204        )
205        .expected_metadata(Some(region_meta.clone()))
206        .build()
207        .await
208        .map_err(|e| {
209            error::IllegalConfigSnafu {
210                msg: format!("build reader failed: {e:?}"),
211            }
212            .build()
213        })?;
214
215        let reader_build_elapsed = reader_build_start.elapsed();
216        let total_rows = reader.parquet_metadata().file_metadata().num_rows();
217        println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed);
218
219        // Build write request
220        let fulltext_index_config = FulltextIndexConfig {
221            create_on_compaction: Mode::Disable,
222            ..Default::default()
223        };
224
225        let write_req = SstWriteRequest {
226            op_type: OperationType::Flush,
227            metadata: region_meta,
228            source: Either::Left(Source::Reader(Box::new(reader))),
229            cache_manager,
230            storage: None,
231            max_sequence: None,
232            index_options: Default::default(),
233            index_config: mito_engine_config.index.clone(),
234            inverted_index_config: MitoConfig::default().inverted_index,
235            fulltext_index_config,
236            bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
237            #[cfg(feature = "vector_index")]
238            vector_index_config: Default::default(),
239        };
240
241        // Write SST
242        println!("{}", "Writing SST...".yellow());
243
244        // Start profiling if pprof_file is specified
245        #[cfg(unix)]
246        let profiler_guard = if self.pprof_file.is_some() {
247            println!("{} Starting profiling...", "⚡".yellow());
248            Some(
249                pprof::ProfilerGuardBuilder::default()
250                    .frequency(99)
251                    .blocklist(&["libc", "libgcc", "pthread", "vdso"])
252                    .build()
253                    .map_err(|e| {
254                        error::IllegalConfigSnafu {
255                            msg: format!("Failed to start profiler: {e}"),
256                        }
257                        .build()
258                    })?,
259            )
260        } else {
261            None
262        };
263
264        #[cfg(not(unix))]
265        if self.pprof_file.is_some() {
266            eprintln!(
267                "{}: Profiling is not supported on this platform",
268                "Warning".yellow()
269            );
270        }
271
272        let write_start = Instant::now();
273        let mut metrics = Metrics::new(WriteType::Flush);
274        let infos = src_access_layer
275            .write_sst(write_req, &WriteOptions::default(), &mut metrics)
276            .await
277            .map_err(|e| {
278                error::IllegalConfigSnafu {
279                    msg: format!("write_sst failed: {e:?}"),
280                }
281                .build()
282            })?;
283
284        let write_elapsed = write_start.elapsed();
285
286        // Stop profiling and generate flamegraph if enabled
287        #[cfg(unix)]
288        if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
289            println!("{} Generating flamegraph...", "🔥".yellow());
290            match guard.report().build() {
291                Ok(report) => {
292                    let mut flamegraph_data = Vec::new();
293                    if let Err(e) = report.flamegraph(&mut flamegraph_data) {
294                        println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
295                    } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
296                        println!(
297                            "{}: Failed to write flamegraph to {}: {}",
298                            "Error".red(),
299                            pprof_file.display(),
300                            e
301                        );
302                    } else {
303                        println!(
304                            "{} Flamegraph saved to {}",
305                            "✓".green(),
306                            pprof_file.display().to_string().cyan()
307                        );
308                    }
309                }
310                Err(e) => {
311                    println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
312                }
313            }
314        }
315        assert_eq!(infos.len(), 1);
316        let dst_file_id = infos[0].file_id;
317        let dst_file_path = format!("{}/{}.parquet", components.region_dir(), dst_file_id);
318        let mut dst_index_path = None;
319        if infos[0].index_metadata.file_size > 0 {
320            dst_index_path = Some(format!(
321                "{}/index/{}.puffin",
322                components.region_dir(),
323                dst_file_id
324            ));
325        }
326
327        // Report results with ANSI colors
328        println!("\n{} {}", "Write complete!".green().bold(), "✓".green());
329        println!("  {}: {}", "Destination file".bold(), dst_file_path.cyan());
330        println!("  {}: {}", "Rows".bold(), total_rows.to_string().cyan());
331        println!(
332            "  {}: {}",
333            "File size".bold(),
334            format!("{} bytes", file_size).cyan()
335        );
336        println!(
337            "  {}: {:?}",
338            "Reader build time".bold(),
339            reader_build_elapsed
340        );
341        println!("  {}: {:?}", "Total time".bold(), write_elapsed);
342
343        // Print metrics in a formatted way
344        println!("  {}: {:?}", "Metrics".bold(), metrics,);
345
346        // Print infos
347        println!("  {}: {:?}", "Index".bold(), infos[0].index_metadata);
348
349        // Cleanup
350        println!("\n{}", "Cleaning up...".yellow());
351        object_store.delete(&dst_file_path).await.map_err(|e| {
352            error::IllegalConfigSnafu {
353                msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
354            }
355            .build()
356        })?;
357        println!("{} Temporary file {} deleted", "✓".green(), dst_file_path);
358
359        if let Some(index_path) = dst_index_path {
360            object_store.delete(&index_path).await.map_err(|e| {
361                error::IllegalConfigSnafu {
362                    msg: format!("Failed to delete dest index file {}: {}", index_path, e),
363                }
364                .build()
365            })?;
366            println!(
367                "{} Temporary index file {} deleted",
368                "✓".green(),
369                index_path
370            );
371        }
372
373        println!("\n{}", "Benchmark completed successfully!".green().bold());
374        Ok(())
375    }
376}
377
378#[derive(Debug)]
379struct FileDirComponents {
380    catalog: String,
381    schema: String,
382    table_id: u32,
383    region_sequence: u32,
384    path_type: PathType,
385    file_id: FileId,
386}
387
388impl FileDirComponents {
389    fn table_dir(&self) -> String {
390        format!("data/{}/{}/{}", self.catalog, self.schema, self.table_id)
391    }
392
393    fn region_dir(&self) -> String {
394        let region_name = region_name(self.table_id, self.region_sequence);
395        match self.path_type {
396            PathType::Bare => {
397                format!(
398                    "data/{}/{}/{}/{}",
399                    self.catalog, self.schema, self.table_id, region_name
400                )
401            }
402            PathType::Data => {
403                format!(
404                    "data/{}/{}/{}/{}/data",
405                    self.catalog, self.schema, self.table_id, region_name
406                )
407            }
408            PathType::Metadata => {
409                format!(
410                    "data/{}/{}/{}/{}/metadata",
411                    self.catalog, self.schema, self.table_id, region_name
412                )
413            }
414        }
415    }
416}
417
418fn parse_file_dir_components(path: &str) -> error::Result<FileDirComponents> {
419    // Define the regex pattern to match all three path styles
420    let pattern =
421        r"^data/([^/]+)/([^/]+)/([^/]+)/([^/]+)_([^/]+)(?:/data|/metadata)?/(.+).parquet$";
422
423    // Compile the regex
424    let re = Regex::new(pattern).expect("Invalid regex pattern");
425
426    // Determine the path type
427    let path_type = if path.contains("/data/") {
428        PathType::Data
429    } else if path.contains("/metadata/") {
430        PathType::Metadata
431    } else {
432        PathType::Bare
433    };
434
435    // Try to match the path
436    let components = (|| {
437        let captures = re.captures(path)?;
438        if captures.len() != 7 {
439            return None;
440        }
441        let mut components = FileDirComponents {
442            catalog: "".to_string(),
443            schema: "".to_string(),
444            table_id: 0,
445            region_sequence: 0,
446            path_type,
447            file_id: FileId::default(),
448        };
449        // Extract the components
450        components.catalog = captures.get(1)?.as_str().to_string();
451        components.schema = captures.get(2)?.as_str().to_string();
452        components.table_id = captures[3].parse().ok()?;
453        components.region_sequence = captures[5].parse().ok()?;
454        let file_id_str = &captures[6];
455        components.file_id = FileId::parse_str(file_id_str).ok()?;
456        Some(components)
457    })();
458    components.context(error::IllegalConfigSnafu {
459        msg: format!("Expect valid source file path, got: {}", path),
460    })
461}
462
463fn extract_region_metadata(
464    file_path: &str,
465    meta: &parquet::file::metadata::ParquetMetaData,
466) -> error::Result<RegionMetadataRef> {
467    let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
468    let Some(kvs) = kvs else {
469        return Err(error::IllegalConfigSnafu {
470            msg: format!("{file_path}: missing parquet key_value metadata"),
471        }
472        .build());
473    };
474    let json = kvs
475        .iter()
476        .find(|kv| kv.key == PARQUET_METADATA_KEY)
477        .and_then(|kv| kv.value.as_ref())
478        .ok_or_else(|| {
479            error::IllegalConfigSnafu {
480                msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
481            }
482            .build()
483        })?;
484    let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
485        error::IllegalConfigSnafu {
486            msg: format!("invalid region metadata json: {e}"),
487        }
488        .build()
489    })?;
490    Ok(Arc::new(region))
491}
492
493async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
494    store::new_object_store(sc.store.clone(), &sc.data_home)
495        .await
496        .map_err(|e| {
497            error::IllegalConfigSnafu {
498                msg: format!("Failed to build object store: {e:?}"),
499            }
500            .build()
501        })
502}
503
504async fn build_access_layer_simple(
505    components: &FileDirComponents,
506    object_store: ObjectStore,
507    config: &mut MitoConfig,
508    data_home: &str,
509) -> error::Result<(AccessLayerRef, CacheManagerRef)> {
510    let _ = config.index.sanitize(data_home, &config.inverted_index);
511    let puffin_manager = PuffinManagerFactory::new(
512        &config.index.aux_path,
513        config.index.staging_size.as_bytes(),
514        Some(config.index.write_buffer_size.as_bytes() as _),
515        config.index.staging_ttl,
516    )
517    .await
518    .map_err(|e| {
519        error::IllegalConfigSnafu {
520            msg: format!("Failed to build access layer: {e:?}"),
521        }
522        .build()
523    })?;
524
525    let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
526        .await
527        .map_err(|e| {
528            error::IllegalConfigSnafu {
529                msg: format!("Failed to build IntermediateManager: {e:?}"),
530            }
531            .build()
532        })?
533        .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
534
535    let cache_manager =
536        build_cache_manager(config, puffin_manager.clone(), intermediate_manager.clone()).await?;
537    let layer = AccessLayer::new(
538        components.table_dir(),
539        components.path_type,
540        object_store,
541        puffin_manager,
542        intermediate_manager,
543    );
544    Ok((Arc::new(layer), cache_manager))
545}
546
547async fn build_cache_manager(
548    config: &MitoConfig,
549    puffin_manager: PuffinManagerFactory,
550    intermediate_manager: IntermediateManager,
551) -> error::Result<CacheManagerRef> {
552    let write_cache = write_cache_from_config(config, puffin_manager, intermediate_manager)
553        .await
554        .map_err(|e| {
555            error::IllegalConfigSnafu {
556                msg: format!("Failed to build write cache: {e:?}"),
557            }
558            .build()
559        })?;
560    let cache_manager = Arc::new(
561        CacheManager::builder()
562            .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
563            .vector_cache_size(config.vector_cache_size.as_bytes())
564            .page_cache_size(config.page_cache_size.as_bytes())
565            .selector_result_cache_size(config.selector_result_cache_size.as_bytes())
566            .index_metadata_size(config.index.metadata_cache_size.as_bytes())
567            .index_content_size(config.index.content_cache_size.as_bytes())
568            .index_content_page_size(config.index.content_cache_page_size.as_bytes())
569            .index_result_cache_size(config.index.result_cache_size.as_bytes())
570            .puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
571            .write_cache(write_cache)
572            .build(),
573    );
574    Ok(cache_manager)
575}
576
577fn new_noop_file_purger() -> FilePurgerRef {
578    #[derive(Debug)]
579    struct Noop;
580    impl FilePurger for Noop {
581        fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool, _index_outdated: bool) {}
582    }
583    Arc::new(Noop)
584}
585
586async fn load_parquet_metadata(
587    object_store: ObjectStore,
588    path: &str,
589    file_size: u64,
590) -> Result<parquet::file::metadata::ParquetMetaData, Box<dyn std::error::Error + Send + Sync>> {
591    use parquet::file::FOOTER_SIZE;
592    use parquet::file::metadata::ParquetMetaDataReader;
593    let actual_size = if file_size == 0 {
594        object_store.stat(path).await?.content_length()
595    } else {
596        file_size
597    };
598    if actual_size < FOOTER_SIZE as u64 {
599        return Err("file too small".into());
600    }
601    let prefetch: u64 = 64 * 1024;
602    let start = actual_size.saturating_sub(prefetch);
603    let buffer = object_store
604        .read_with(path)
605        .range(start..actual_size)
606        .await?
607        .to_vec();
608    let buffer_len = buffer.len();
609    let mut footer = [0; 8];
610    footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
611    let footer = FooterTail::try_new(&footer)?;
612    let metadata_len = footer.metadata_length() as u64;
613    if actual_size - (FOOTER_SIZE as u64) < metadata_len {
614        return Err("invalid footer/metadata length".into());
615    }
616    if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
617        let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
618        let meta = ParquetMetaDataReader::decode_metadata(
619            &buffer[metadata_start..buffer_len - FOOTER_SIZE],
620        )?;
621        Ok(meta)
622    } else {
623        let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
624        let data = object_store
625            .read_with(path)
626            .range(metadata_start..(actual_size - FOOTER_SIZE as u64))
627            .await?
628            .to_vec();
629        let meta = ParquetMetaDataReader::decode_metadata(&data)?;
630        Ok(meta)
631    }
632}
633
634#[cfg(test)]
635mod tests {
636    use std::path::PathBuf;
637    use std::str::FromStr;
638
639    use common_base::readable_size::ReadableSize;
640    use store_api::region_request::PathType;
641
642    use crate::datanode::objbench::{parse_config, parse_file_dir_components};
643
644    #[test]
645    fn test_parse_dir() {
646        let meta_path = "data/greptime/public/1024/1024_0000000000/metadata/00020380-009c-426d-953e-b4e34c15af34.parquet";
647        let c = parse_file_dir_components(meta_path).unwrap();
648        assert_eq!(
649            c.file_id.to_string(),
650            "00020380-009c-426d-953e-b4e34c15af34"
651        );
652        assert_eq!(c.catalog, "greptime");
653        assert_eq!(c.schema, "public");
654        assert_eq!(c.table_id, 1024);
655        assert_eq!(c.region_sequence, 0);
656        assert_eq!(c.path_type, PathType::Metadata);
657
658        let c = parse_file_dir_components(
659            "data/greptime/public/1024/1024_0000000000/data/00020380-009c-426d-953e-b4e34c15af34.parquet",
660        ).unwrap();
661        assert_eq!(
662            c.file_id.to_string(),
663            "00020380-009c-426d-953e-b4e34c15af34"
664        );
665        assert_eq!(c.catalog, "greptime");
666        assert_eq!(c.schema, "public");
667        assert_eq!(c.table_id, 1024);
668        assert_eq!(c.region_sequence, 0);
669        assert_eq!(c.path_type, PathType::Data);
670
671        let c = parse_file_dir_components(
672            "data/greptime/public/1024/1024_0000000000/00020380-009c-426d-953e-b4e34c15af34.parquet",
673        ).unwrap();
674        assert_eq!(
675            c.file_id.to_string(),
676            "00020380-009c-426d-953e-b4e34c15af34"
677        );
678        assert_eq!(c.catalog, "greptime");
679        assert_eq!(c.schema, "public");
680        assert_eq!(c.table_id, 1024);
681        assert_eq!(c.region_sequence, 0);
682        assert_eq!(c.path_type, PathType::Bare);
683    }
684
685    #[test]
686    fn test_parse_config() {
687        let path = "../../config/datanode.example.toml";
688        let (storage, engine) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
689        assert_eq!(storage.data_home, "./greptimedb_data");
690        assert_eq!(engine.index.staging_size, ReadableSize::gb(2));
691    }
692}