diff --git a/Cargo.lock b/Cargo.lock index 71caf8c8b0..7b076d9273 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1896,6 +1896,7 @@ dependencies = [ "clap 4.5.40", "cli", "client", + "colored", "common-base", "common-catalog", "common-config", @@ -1917,6 +1918,7 @@ dependencies = [ "common-wal", "datanode", "datatypes", + "either", "etcd-client", "file-engine", "flow", @@ -1932,7 +1934,9 @@ dependencies = [ "moka", "nu-ansi-term", "object-store", + "parquet", "plugins", + "pprof", "prometheus", "prost 0.13.5", "query", @@ -1975,6 +1979,16 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" +[[package]] +name = "colored" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" +dependencies = [ + "lazy_static", + "windows-sys 0.59.0", +] + [[package]] name = "comfy-table" version = "7.1.2" diff --git a/src/cmd/Cargo.toml b/src/cmd/Cargo.toml index 744d13faeb..94dc3da56b 100644 --- a/src/cmd/Cargo.toml +++ b/src/cmd/Cargo.toml @@ -29,9 +29,11 @@ base64.workspace = true cache.workspace = true catalog.workspace = true chrono.workspace = true +either = "1.15" clap.workspace = true cli.workspace = true client.workspace = true +colored = "2.1.0" common-base.workspace = true common-catalog.workspace = true common-config.workspace = true @@ -63,10 +65,13 @@ lazy_static.workspace = true meta-client.workspace = true meta-srv.workspace = true metric-engine.workspace = true +mito2.workspace = true moka.workspace = true nu-ansi-term = "0.46" object-store.workspace = true +parquet = { workspace = true, features = ["object_store"] } plugins.workspace = true +pprof = "0.14.0" prometheus.workspace = true prost.workspace = true query.workspace = true diff --git a/src/cmd/src/bin/greptime.rs b/src/cmd/src/bin/greptime.rs index cf72b3d32f..f6bbebf7fb 100644 --- a/src/cmd/src/bin/greptime.rs +++ b/src/cmd/src/bin/greptime.rs @@ -103,12 +103,15 @@ async fn main_body() -> Result<()> { async fn start(cli: Command) -> Result<()> { match cli.subcmd { - SubCommand::Datanode(cmd) => { - let opts = cmd.load_options(&cli.global_options)?; - let plugins = Plugins::new(); - let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?; - cmd.build_with(builder).await?.run().await - } + SubCommand::Datanode(cmd) => match cmd.subcmd { + datanode::SubCommand::Start(ref start) => { + let opts = start.load_options(&cli.global_options)?; + let plugins = Plugins::new(); + let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?; + cmd.build_with(builder).await?.run().await + } + datanode::SubCommand::Objbench(ref bench) => bench.run().await, + }, SubCommand::Flownode(cmd) => { cmd.build(cmd.load_options(&cli.global_options)?) .await? diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 641d3fc5fd..23ca644ffc 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -13,6 +13,8 @@ // limitations under the License. pub mod builder; +#[allow(clippy::print_stdout)] +mod objbench; use std::path::Path; use std::time::Duration; @@ -23,13 +25,16 @@ use common_config::Configurable; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_telemetry::{info, warn}; use common_wal::config::DatanodeWalConfig; +use datanode::config::RegionEngineConfig; use datanode::datanode::Datanode; use meta_client::MetaClientOptions; +use serde::{Deserialize, Serialize}; use snafu::{ResultExt, ensure}; use tracing_appender::non_blocking::WorkerGuard; use crate::App; use crate::datanode::builder::InstanceBuilder; +use crate::datanode::objbench::ObjbenchCommand; use crate::error::{ LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, }; @@ -89,7 +94,7 @@ impl App for Instance { #[derive(Parser)] pub struct Command { #[clap(subcommand)] - subcmd: SubCommand, + pub subcmd: SubCommand, } impl Command { @@ -100,13 +105,26 @@ impl Command { pub fn load_options(&self, global_options: &GlobalOptions) -> Result { match &self.subcmd { SubCommand::Start(cmd) => cmd.load_options(global_options), + SubCommand::Objbench(_) => { + // For objbench command, we don't need to load DatanodeOptions + // It's a standalone utility command + let mut opts = datanode::config::DatanodeOptions::default(); + opts.sanitize(); + Ok(DatanodeOptions { + runtime: Default::default(), + plugins: Default::default(), + component: opts, + }) + } } } } #[derive(Parser)] -enum SubCommand { +pub enum SubCommand { Start(StartCommand), + /// Object storage benchmark tool + Objbench(ObjbenchCommand), } impl SubCommand { @@ -116,12 +134,33 @@ impl SubCommand { info!("Building datanode with {:#?}", cmd); builder.build().await } + SubCommand::Objbench(cmd) => { + cmd.run().await?; + std::process::exit(0); + } } } } +/// Storage engine config +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +#[serde(default)] +pub struct StorageConfig { + /// The working directory of database + pub data_home: String, + #[serde(flatten)] + pub store: object_store::config::ObjectStoreConfig, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +#[serde(default)] +struct StorageConfigWrapper { + storage: StorageConfig, + region_engine: Vec, +} + #[derive(Debug, Parser, Default)] -struct StartCommand { +pub struct StartCommand { #[clap(long)] node_id: Option, /// The address to bind the gRPC server. @@ -149,7 +188,7 @@ struct StartCommand { } impl StartCommand { - fn load_options(&self, global_options: &GlobalOptions) -> Result { + pub fn load_options(&self, global_options: &GlobalOptions) -> Result { let mut opts = DatanodeOptions::load_layered_options( self.config_file.as_deref(), self.env_prefix.as_ref(), diff --git a/src/cmd/src/datanode/objbench.rs b/src/cmd/src/datanode/objbench.rs new file mode 100644 index 0000000000..564e8c744b --- /dev/null +++ b/src/cmd/src/datanode/objbench.rs @@ -0,0 +1,676 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +use clap::Parser; +use colored::Colorize; +use datanode::config::RegionEngineConfig; +use datanode::store; +use either::Either; +use mito2::access_layer::{ + AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType, +}; +use mito2::cache::{CacheManager, CacheManagerRef}; +use mito2::config::{FulltextIndexConfig, MitoConfig, Mode}; +use mito2::read::Source; +use mito2::sst::file::{FileHandle, FileMeta}; +use mito2::sst::file_purger::{FilePurger, FilePurgerRef}; +use mito2::sst::index::intermediate::IntermediateManager; +use mito2::sst::index::puffin_manager::PuffinManagerFactory; +use mito2::sst::parquet::reader::ParquetReaderBuilder; +use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions}; +use mito2::worker::write_cache_from_config; +use object_store::ObjectStore; +use regex::Regex; +use snafu::OptionExt; +use store_api::metadata::{RegionMetadata, RegionMetadataRef}; +use store_api::path_utils::region_name; +use store_api::region_request::PathType; +use store_api::storage::FileId; + +use crate::datanode::{StorageConfig, StorageConfigWrapper}; +use crate::error; + +/// Object storage benchmark command +#[derive(Debug, Parser)] +pub struct ObjbenchCommand { + /// Path to the object-store config file (TOML). Must deserialize into object_store::config::ObjectStoreConfig. + #[clap(long, value_name = "FILE")] + pub config: PathBuf, + + /// Source SST file path in object-store (e.g. "region_dir/.parquet"). + #[clap(long, value_name = "PATH")] + pub source: String, + + /// Verbose output + #[clap(short, long, default_value_t = false)] + pub verbose: bool, + + /// Output file path for pprof flamegraph (enables profiling) + #[clap(long, value_name = "FILE")] + pub pprof_file: Option, +} + +fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConfig)> { + let cfg_str = std::fs::read_to_string(config_path).map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("failed to read config {}: {e}", config_path.display()), + } + .build() + })?; + + let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("failed to parse config {}: {e}", config_path.display()), + } + .build() + })?; + + let storage_config = store_cfg.storage; + let mito_engine_config = store_cfg + .region_engine + .into_iter() + .filter_map(|c| { + if let RegionEngineConfig::Mito(mito) = c { + Some(mito) + } else { + None + } + }) + .next() + .with_context(|| error::IllegalConfigSnafu { + msg: format!("Engine config not found in {:?}", config_path), + })?; + Ok((storage_config, mito_engine_config)) +} + +impl ObjbenchCommand { + pub async fn run(&self) -> error::Result<()> { + if self.verbose { + common_telemetry::init_default_ut_logging(); + } + + println!("{}", "Starting objbench with config:".cyan().bold()); + + // Build object store from config + let (store_cfg, mut mito_engine_config) = parse_config(&self.config)?; + + let object_store = build_object_store(&store_cfg).await?; + println!("{} Object store initialized", "✓".green()); + + // Prepare source identifiers + let components = parse_file_dir_components(&self.source)?; + println!( + "{} Source path parsed: {}, components: {:?}", + "✓".green(), + self.source, + components + ); + + // Load parquet metadata to extract RegionMetadata and file stats + println!("{}", "Loading parquet metadata...".yellow()); + let file_size = object_store + .stat(&self.source) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("stat failed: {e}"), + } + .build() + })? + .content_length(); + let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("read parquet metadata failed: {e}"), + } + .build() + })?; + + let region_meta = extract_region_metadata(&self.source, &parquet_meta)?; + let num_rows = parquet_meta.file_metadata().num_rows() as u64; + let num_row_groups = parquet_meta.num_row_groups() as u64; + + println!( + "{} Metadata loaded - rows: {}, size: {} bytes", + "✓".green(), + num_rows, + file_size + ); + + // Build a FileHandle for the source file + let file_meta = FileMeta { + region_id: region_meta.region_id, + file_id: components.file_id, + time_range: Default::default(), + level: 0, + file_size, + available_indexes: Default::default(), + index_file_size: 0, + num_rows, + num_row_groups, + sequence: None, + partition_expr: None, + num_series: 0, + }; + let src_handle = FileHandle::new(file_meta, new_noop_file_purger()); + + // Build the reader for a single file via ParquetReaderBuilder + let table_dir = components.table_dir(); + let (src_access_layer, cache_manager) = build_access_layer_simple( + &components, + object_store.clone(), + &mut mito_engine_config, + &store_cfg.data_home, + ) + .await?; + let reader_build_start = Instant::now(); + + let reader = ParquetReaderBuilder::new( + table_dir, + components.path_type, + src_handle.clone(), + object_store.clone(), + ) + .expected_metadata(Some(region_meta.clone())) + .build() + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("build reader failed: {e:?}"), + } + .build() + })?; + + let reader_build_elapsed = reader_build_start.elapsed(); + let total_rows = reader.parquet_metadata().file_metadata().num_rows(); + println!("{} Reader built in {:?}", "✓".green(), reader_build_elapsed); + + // Build write request + let fulltext_index_config = FulltextIndexConfig { + create_on_compaction: Mode::Disable, + ..Default::default() + }; + + let write_req = SstWriteRequest { + op_type: OperationType::Flush, + metadata: region_meta, + source: Either::Left(Source::Reader(Box::new(reader))), + cache_manager, + storage: None, + max_sequence: None, + index_options: Default::default(), + index_config: mito_engine_config.index.clone(), + inverted_index_config: MitoConfig::default().inverted_index, + fulltext_index_config, + bloom_filter_index_config: MitoConfig::default().bloom_filter_index, + }; + + // Write SST + println!("{}", "Writing SST...".yellow()); + + // Start profiling if pprof_file is specified + #[cfg(unix)] + let profiler_guard = if self.pprof_file.is_some() { + println!("{} Starting profiling...", "⚡".yellow()); + Some( + pprof::ProfilerGuardBuilder::default() + .frequency(99) + .blocklist(&["libc", "libgcc", "pthread", "vdso"]) + .build() + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Failed to start profiler: {e}"), + } + .build() + })?, + ) + } else { + None + }; + + #[cfg(not(unix))] + if self.pprof_file.is_some() { + eprintln!( + "{}: Profiling is not supported on this platform", + "Warning".yellow() + ); + } + + let write_start = Instant::now(); + let mut metrics = Metrics::new(WriteType::Flush); + let infos = src_access_layer + .write_sst(write_req, &WriteOptions::default(), &mut metrics) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("write_sst failed: {e:?}"), + } + .build() + })?; + + let write_elapsed = write_start.elapsed(); + + // Stop profiling and generate flamegraph if enabled + #[cfg(unix)] + if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) { + println!("{} Generating flamegraph...", "🔥".yellow()); + match guard.report().build() { + Ok(report) => { + let mut flamegraph_data = Vec::new(); + if let Err(e) = report.flamegraph(&mut flamegraph_data) { + println!("{}: Failed to generate flamegraph: {}", "Error".red(), e); + } else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) { + println!( + "{}: Failed to write flamegraph to {}: {}", + "Error".red(), + pprof_file.display(), + e + ); + } else { + println!( + "{} Flamegraph saved to {}", + "✓".green(), + pprof_file.display().to_string().cyan() + ); + } + } + Err(e) => { + println!("{}: Failed to generate pprof report: {}", "Error".red(), e); + } + } + } + assert_eq!(infos.len(), 1); + let dst_file_id = infos[0].file_id; + let dst_file_path = format!("{}/{}.parquet", components.region_dir(), dst_file_id); + let mut dst_index_path = None; + if infos[0].index_metadata.file_size > 0 { + dst_index_path = Some(format!( + "{}/index/{}.puffin", + components.region_dir(), + dst_file_id + )); + } + + // Report results with ANSI colors + println!("\n{} {}", "Write complete!".green().bold(), "✓".green()); + println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan()); + println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan()); + println!( + " {}: {}", + "File size".bold(), + format!("{} bytes", file_size).cyan() + ); + println!( + " {}: {:?}", + "Reader build time".bold(), + reader_build_elapsed + ); + println!(" {}: {:?}", "Total time".bold(), write_elapsed); + + // Print metrics in a formatted way + println!(" {}: {:?}", "Metrics".bold(), metrics,); + + // Print infos + println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata); + + // Cleanup + println!("\n{}", "Cleaning up...".yellow()); + object_store.delete(&dst_file_path).await.map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Failed to delete dest file {}: {}", dst_file_path, e), + } + .build() + })?; + println!("{} Temporary file {} deleted", "✓".green(), dst_file_path); + + if let Some(index_path) = dst_index_path { + object_store.delete(&index_path).await.map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Failed to delete dest index file {}: {}", index_path, e), + } + .build() + })?; + println!( + "{} Temporary index file {} deleted", + "✓".green(), + index_path + ); + } + + println!("\n{}", "Benchmark completed successfully!".green().bold()); + Ok(()) + } +} + +#[derive(Debug)] +struct FileDirComponents { + catalog: String, + schema: String, + table_id: u32, + region_sequence: u32, + path_type: PathType, + file_id: FileId, +} + +impl FileDirComponents { + fn table_dir(&self) -> String { + format!("data/{}/{}/{}", self.catalog, self.schema, self.table_id) + } + + fn region_dir(&self) -> String { + let region_name = region_name(self.table_id, self.region_sequence); + match self.path_type { + PathType::Bare => { + format!( + "data/{}/{}/{}/{}", + self.catalog, self.schema, self.table_id, region_name + ) + } + PathType::Data => { + format!( + "data/{}/{}/{}/{}/data", + self.catalog, self.schema, self.table_id, region_name + ) + } + PathType::Metadata => { + format!( + "data/{}/{}/{}/{}/metadata", + self.catalog, self.schema, self.table_id, region_name + ) + } + } + } +} + +fn parse_file_dir_components(path: &str) -> error::Result { + // Define the regex pattern to match all three path styles + let pattern = + r"^data/([^/]+)/([^/]+)/([^/]+)/([^/]+)_([^/]+)(?:/data|/metadata)?/(.+).parquet$"; + + // Compile the regex + let re = Regex::new(pattern).expect("Invalid regex pattern"); + + // Determine the path type + let path_type = if path.contains("/data/") { + PathType::Data + } else if path.contains("/metadata/") { + PathType::Metadata + } else { + PathType::Bare + }; + + // Try to match the path + let components = (|| { + let captures = re.captures(path)?; + if captures.len() != 7 { + return None; + } + let mut components = FileDirComponents { + catalog: "".to_string(), + schema: "".to_string(), + table_id: 0, + region_sequence: 0, + path_type, + file_id: FileId::default(), + }; + // Extract the components + components.catalog = captures.get(1)?.as_str().to_string(); + components.schema = captures.get(2)?.as_str().to_string(); + components.table_id = captures[3].parse().ok()?; + components.region_sequence = captures[5].parse().ok()?; + let file_id_str = &captures[6]; + components.file_id = FileId::parse_str(file_id_str).ok()?; + Some(components) + })(); + components.context(error::IllegalConfigSnafu { + msg: format!("Expect valid source file path, got: {}", path), + }) +} + +fn extract_region_metadata( + file_path: &str, + meta: &parquet::file::metadata::ParquetMetaData, +) -> error::Result { + use parquet::format::KeyValue; + let kvs: Option<&Vec> = meta.file_metadata().key_value_metadata(); + let Some(kvs) = kvs else { + return Err(error::IllegalConfigSnafu { + msg: format!("{file_path}: missing parquet key_value metadata"), + } + .build()); + }; + let json = kvs + .iter() + .find(|kv| kv.key == PARQUET_METADATA_KEY) + .and_then(|kv| kv.value.as_ref()) + .ok_or_else(|| { + error::IllegalConfigSnafu { + msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"), + } + .build() + })?; + let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("invalid region metadata json: {e}"), + } + .build() + })?; + Ok(Arc::new(region)) +} + +async fn build_object_store(sc: &StorageConfig) -> error::Result { + store::new_object_store(sc.store.clone(), &sc.data_home) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Failed to build object store: {e:?}"), + } + .build() + }) +} + +async fn build_access_layer_simple( + components: &FileDirComponents, + object_store: ObjectStore, + config: &mut MitoConfig, + data_home: &str, +) -> error::Result<(AccessLayerRef, CacheManagerRef)> { + let _ = config.index.sanitize(data_home, &config.inverted_index); + let puffin_manager = PuffinManagerFactory::new( + &config.index.aux_path, + config.index.staging_size.as_bytes(), + Some(config.index.write_buffer_size.as_bytes() as _), + config.index.staging_ttl, + ) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Failed to build access layer: {e:?}"), + } + .build() + })?; + + let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Failed to build IntermediateManager: {e:?}"), + } + .build() + })? + .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _)); + + let cache_manager = + build_cache_manager(config, puffin_manager.clone(), intermediate_manager.clone()).await?; + let layer = AccessLayer::new( + components.table_dir(), + components.path_type, + object_store, + puffin_manager, + intermediate_manager, + ); + Ok((Arc::new(layer), cache_manager)) +} + +async fn build_cache_manager( + config: &MitoConfig, + puffin_manager: PuffinManagerFactory, + intermediate_manager: IntermediateManager, +) -> error::Result { + let write_cache = write_cache_from_config(config, puffin_manager, intermediate_manager) + .await + .map_err(|e| { + error::IllegalConfigSnafu { + msg: format!("Failed to build write cache: {e:?}"), + } + .build() + })?; + let cache_manager = Arc::new( + CacheManager::builder() + .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes()) + .vector_cache_size(config.vector_cache_size.as_bytes()) + .page_cache_size(config.page_cache_size.as_bytes()) + .selector_result_cache_size(config.selector_result_cache_size.as_bytes()) + .index_metadata_size(config.index.metadata_cache_size.as_bytes()) + .index_content_size(config.index.content_cache_size.as_bytes()) + .index_content_page_size(config.index.content_cache_page_size.as_bytes()) + .index_result_cache_size(config.index.result_cache_size.as_bytes()) + .puffin_metadata_size(config.index.metadata_cache_size.as_bytes()) + .write_cache(write_cache) + .build(), + ); + Ok(cache_manager) +} + +fn new_noop_file_purger() -> FilePurgerRef { + #[derive(Debug)] + struct Noop; + impl FilePurger for Noop { + fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {} + } + Arc::new(Noop) +} + +async fn load_parquet_metadata( + object_store: ObjectStore, + path: &str, + file_size: u64, +) -> Result> { + use parquet::file::FOOTER_SIZE; + use parquet::file::metadata::ParquetMetaDataReader; + let actual_size = if file_size == 0 { + object_store.stat(path).await?.content_length() + } else { + file_size + }; + if actual_size < FOOTER_SIZE as u64 { + return Err("file too small".into()); + } + let prefetch: u64 = 64 * 1024; + let start = actual_size.saturating_sub(prefetch); + let buffer = object_store + .read_with(path) + .range(start..actual_size) + .await? + .to_vec(); + let buffer_len = buffer.len(); + let mut footer = [0; 8]; + footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]); + let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?; + let metadata_len = footer.metadata_length() as u64; + if actual_size - (FOOTER_SIZE as u64) < metadata_len { + return Err("invalid footer/metadata length".into()); + } + if (metadata_len as usize) <= buffer_len - FOOTER_SIZE { + let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE; + let meta = ParquetMetaDataReader::decode_metadata( + &buffer[metadata_start..buffer_len - FOOTER_SIZE], + )?; + Ok(meta) + } else { + let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64; + let data = object_store + .read_with(path) + .range(metadata_start..(actual_size - FOOTER_SIZE as u64)) + .await? + .to_vec(); + let meta = ParquetMetaDataReader::decode_metadata(&data)?; + Ok(meta) + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + use std::str::FromStr; + + use common_base::readable_size::ReadableSize; + use store_api::region_request::PathType; + + use crate::datanode::objbench::{parse_config, parse_file_dir_components}; + + #[test] + fn test_parse_dir() { + let meta_path = "data/greptime/public/1024/1024_0000000000/metadata/00020380-009c-426d-953e-b4e34c15af34.parquet"; + let c = parse_file_dir_components(meta_path).unwrap(); + assert_eq!( + c.file_id.to_string(), + "00020380-009c-426d-953e-b4e34c15af34" + ); + assert_eq!(c.catalog, "greptime"); + assert_eq!(c.schema, "public"); + assert_eq!(c.table_id, 1024); + assert_eq!(c.region_sequence, 0); + assert_eq!(c.path_type, PathType::Metadata); + + let c = parse_file_dir_components( + "data/greptime/public/1024/1024_0000000000/data/00020380-009c-426d-953e-b4e34c15af34.parquet", + ).unwrap(); + assert_eq!( + c.file_id.to_string(), + "00020380-009c-426d-953e-b4e34c15af34" + ); + assert_eq!(c.catalog, "greptime"); + assert_eq!(c.schema, "public"); + assert_eq!(c.table_id, 1024); + assert_eq!(c.region_sequence, 0); + assert_eq!(c.path_type, PathType::Data); + + let c = parse_file_dir_components( + "data/greptime/public/1024/1024_0000000000/00020380-009c-426d-953e-b4e34c15af34.parquet", + ).unwrap(); + assert_eq!( + c.file_id.to_string(), + "00020380-009c-426d-953e-b4e34c15af34" + ); + assert_eq!(c.catalog, "greptime"); + assert_eq!(c.schema, "public"); + assert_eq!(c.table_id, 1024); + assert_eq!(c.region_sequence, 0); + assert_eq!(c.path_type, PathType::Bare); + } + + #[test] + fn test_parse_config() { + let path = "../../config/datanode.example.toml"; + let (storage, engine) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap(); + assert_eq!(storage.data_home, "./greptimedb_data"); + assert_eq!(engine.index.staging_size, ReadableSize::gb(2)); + } +} diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 8d1c8c99dc..6dc6f280c6 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -47,10 +47,7 @@ pub(crate) async fn new_object_store_without_cache( Ok(object_store) } -pub(crate) async fn new_object_store( - store: ObjectStoreConfig, - data_home: &str, -) -> Result { +pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Result { let object_store = new_raw_object_store(&store, data_home) .await .context(error::ObjectStoreSnafu)?; @@ -59,7 +56,7 @@ pub(crate) async fn new_object_store( let object_store = { // It's safe to unwrap here because we already checked above. let cache_config = store.cache_config().unwrap(); - if let Some(cache_layer) = build_cache_layer(cache_config).await? { + if let Some(cache_layer) = build_cache_layer(cache_config, data_home).await? { // Adds cache layer object_store.layer(cache_layer) } else { @@ -79,17 +76,22 @@ pub(crate) async fn new_object_store( async fn build_cache_layer( cache_config: &ObjectStorageCacheConfig, + data_home: &str, ) -> Result>> { // No need to build cache layer if read cache is disabled. if !cache_config.enable_read_cache { return Ok(None); } - - let atomic_temp_dir = join_dir(&cache_config.cache_path, ATOMIC_WRITE_DIR); + let cache_base_dir = if cache_config.cache_path.is_empty() { + data_home + } else { + &cache_config.cache_path + }; + let atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR); clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?; let cache_store = Fs::default() - .root(&cache_config.cache_path) + .root(cache_base_dir) .atomic_write_dir(&atomic_temp_dir) .build() .context(error::BuildCacheStoreSnafu)?; diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index e5401209ca..2eb8a2ea0e 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -72,7 +72,7 @@ pub struct Metrics { } impl Metrics { - pub(crate) fn new(write_type: WriteType) -> Self { + pub fn new(write_type: WriteType) -> Self { Self { write_type, iter_source: Default::default(), @@ -255,12 +255,12 @@ impl AccessLayer { &self, request: SstWriteRequest, write_opts: &WriteOptions, - write_type: WriteType, - ) -> Result<(SstInfoArray, Metrics)> { + metrics: &mut Metrics, + ) -> Result { let region_id = request.metadata.region_id; let cache_manager = request.cache_manager.clone(); - let (sst_info, metrics) = if let Some(write_cache) = cache_manager.write_cache() { + let sst_info = if let Some(write_cache) = cache_manager.write_cache() { // Write to the write cache. write_cache .write_and_upload_sst( @@ -273,7 +273,7 @@ impl AccessLayer { remote_store: self.object_store.clone(), }, write_opts, - write_type, + metrics, ) .await? } else { @@ -303,11 +303,11 @@ impl AccessLayer { request.index_config, indexer_builder, path_provider, - Metrics::new(write_type), + metrics, ) .await .with_file_cleaner(cleaner); - let ssts = match request.source { + match request.source { Either::Left(source) => { writer .write_all(source, request.max_sequence, write_opts) @@ -316,9 +316,7 @@ impl AccessLayer { Either::Right(flat_source) => { writer.write_all_flat(flat_source, write_opts).await? } - }; - let metrics = writer.into_metrics(); - (ssts, metrics) + } }; // Put parquet metadata to cache manager. @@ -333,7 +331,7 @@ impl AccessLayer { } } - Ok((sst_info, metrics)) + Ok(sst_info) } /// Puts encoded SST bytes to the write cache (if enabled) and uploads it to the object store. diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index d2b7e34997..96f2030562 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -169,8 +169,8 @@ impl WriteCache { write_request: SstWriteRequest, upload_request: SstUploadRequest, write_opts: &WriteOptions, - write_type: WriteType, - ) -> Result<(SstInfoArray, Metrics)> { + metrics: &mut Metrics, + ) -> Result { let region_id = write_request.metadata.region_id; let store = self.file_cache.local_store(); @@ -197,7 +197,7 @@ impl WriteCache { write_request.index_config, indexer, path_provider.clone(), - Metrics::new(write_type), + metrics, ) .await .with_file_cleaner(cleaner); @@ -210,11 +210,10 @@ impl WriteCache { } either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?, }; - let mut metrics = writer.into_metrics(); // Upload sst file to remote object store. if sst_info.is_empty() { - return Ok((sst_info, metrics)); + return Ok(sst_info); } let mut upload_tracker = UploadTracker::new(region_id); @@ -256,7 +255,7 @@ impl WriteCache { return Err(err); } - Ok((sst_info, metrics)) + Ok(sst_info) } /// Removes a file from the cache by `index_key`. @@ -559,8 +558,9 @@ mod tests { }; // Write to cache and upload sst to mock remote store - let (mut sst_infos, _) = write_cache - .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush) + let mut metrics = Metrics::new(WriteType::Flush); + let mut sst_infos = write_cache + .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics) .await .unwrap(); let sst_info = sst_infos.remove(0); @@ -655,8 +655,9 @@ mod tests { remote_store: mock_store.clone(), }; - let (mut sst_infos, _) = write_cache - .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush) + let mut metrics = Metrics::new(WriteType::Flush); + let mut sst_infos = write_cache + .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics) .await .unwrap(); let sst_info = sst_infos.remove(0); @@ -735,8 +736,9 @@ mod tests { remote_store: mock_store.clone(), }; + let mut metrics = Metrics::new(WriteType::Flush); write_cache - .write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush) + .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics) .await .unwrap_err(); let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR); diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 2b871947c0..52b8fe068a 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -30,7 +30,9 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_request::PathType; use store_api::storage::RegionId; -use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType}; +use crate::access_layer::{ + AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType, +}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::picker::{PickerOutput, new_picker}; use crate::compaction::{CompactionSstReaderBuilder, find_ttl}; @@ -387,7 +389,8 @@ impl Compactor for DefaultCompactor { let reader = builder.build_sst_reader().await?; either::Left(Source::Reader(reader)) }; - let (sst_infos, metrics) = sst_layer + let mut metrics = Metrics::new(WriteType::Compaction); + let sst_infos = sst_layer .write_sst( SstWriteRequest { op_type: OperationType::Compact, @@ -403,7 +406,7 @@ impl Compactor for DefaultCompactor { bloom_filter_index_config, }, &write_opts, - WriteType::Compaction, + &mut metrics, ) .await?; // Convert partition expression once outside the map diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index ddad947f8a..0b0b4b05db 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -525,21 +525,19 @@ impl RegionFlushTask { let source = Either::Left(source); let write_request = self.new_write_request(version, max_sequence, source); - let (ssts_written, metrics) = self + let mut metrics = Metrics::new(WriteType::Flush); + let ssts_written = self .access_layer - .write_sst(write_request, &write_opts, WriteType::Flush) + .write_sst(write_request, &write_opts, &mut metrics) .await?; if ssts_written.is_empty() { // No data written. continue; } - common_telemetry::debug!( + debug!( "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}", - self.region_id, - num_mem_ranges, - num_mem_rows, - metrics + self.region_id, num_mem_ranges, num_mem_rows, metrics ); flush_metrics = flush_metrics.merge(metrics); @@ -591,9 +589,11 @@ impl RegionFlushTask { let semaphore = self.flush_semaphore.clone(); let task = common_runtime::spawn_global(async move { let _permit = semaphore.acquire().await.unwrap(); - access_layer - .write_sst(write_request, &write_opts, WriteType::Flush) - .await + let mut metrics = Metrics::new(WriteType::Flush); + let ssts = access_layer + .write_sst(write_request, &write_opts, &mut metrics) + .await?; + Ok((ssts, metrics)) }); tasks.push(task); } diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 45ce635148..a15711b34a 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -47,7 +47,7 @@ pub mod schedule; pub mod sst; mod time_provider; pub mod wal; -mod worker; +pub mod worker; #[cfg_attr(doc, aquamarine::aquamarine)] /// # Mito developer document diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index ae255e9407..dc5727f9cc 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -295,8 +295,8 @@ impl FileHandle { } /// Returns the complete file path of the file. - pub fn file_path(&self, file_dir: &str, path_type: PathType) -> String { - location::sst_file_path(file_dir, self.file_id(), path_type) + pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String { + location::sst_file_path(table_dir, self.file_id(), path_type) } /// Returns the time range of the file. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index cc8469332a..e67a4b6e98 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -791,7 +791,7 @@ mod tests { use tokio::sync::mpsc; use super::*; - use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType}; + use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType}; use crate::cache::write_cache::WriteCache; use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode}; use crate::memtable::time_partition::TimePartitions; @@ -927,11 +927,11 @@ mod tests { fulltext_index_config: Default::default(), bloom_filter_index_config: Default::default(), }; + let mut metrics = Metrics::new(WriteType::Flush); env.access_layer - .write_sst(write_request, &WriteOptions::default(), WriteType::Flush) + .write_sst(write_request, &WriteOptions::default(), &mut metrics) .await .unwrap() - .0 .remove(0) } diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 83cd17acc8..5a11a15e70 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -181,13 +181,14 @@ mod tests { ..Default::default() }; + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), IndexConfig::default(), NoopIndexBuilder, file_path, - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; @@ -243,6 +244,7 @@ mod tests { ..Default::default() }; // Prepare data. + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), @@ -251,7 +253,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; @@ -329,6 +331,7 @@ mod tests { // write the sst file and get sst info // sst info contains the parquet metadata, which is converted from FileMetaData + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), @@ -337,7 +340,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; @@ -378,6 +381,7 @@ mod tests { ..Default::default() }; // Prepare data. + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), @@ -386,7 +390,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; writer @@ -437,6 +441,7 @@ mod tests { ..Default::default() }; // Prepare data. + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), @@ -445,7 +450,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; writer @@ -481,6 +486,7 @@ mod tests { ..Default::default() }; // Prepare data. + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), @@ -489,7 +495,7 @@ mod tests { FixedPathProvider { region_file_id: handle.file_id(), }, - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; @@ -639,13 +645,14 @@ mod tests { table_dir: "test".to_string(), path_type: PathType::Bare, }; + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), IndexConfig::default(), NoopIndexBuilder, path_provider, - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; @@ -716,13 +723,14 @@ mod tests { bloom_filter_index_config: Default::default(), }; + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), IndexConfig::default(), indexer_builder, file_path.clone(), - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; @@ -1092,13 +1100,14 @@ mod tests { bloom_filter_index_config: Default::default(), }; + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), IndexConfig::default(), indexer_builder, file_path.clone(), - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; @@ -1148,13 +1157,14 @@ mod tests { ..Default::default() }; + let mut metrics = Metrics::new(WriteType::Flush); let mut writer = ParquetWriter::new_with_object_store( object_store.clone(), metadata.clone(), IndexConfig::default(), NoopIndexBuilder, file_path, - Metrics::new(WriteType::Flush), + &mut metrics, ) .await; diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index 60cf654380..21fc6511f8 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -91,7 +91,7 @@ macro_rules! handle_index_error { /// Parquet SST reader builder. pub struct ParquetReaderBuilder { /// SST directory. - file_dir: String, + table_dir: String, /// Path type for generating file paths. path_type: PathType, file_handle: FileHandle, @@ -122,13 +122,13 @@ pub struct ParquetReaderBuilder { impl ParquetReaderBuilder { /// Returns a new [ParquetReaderBuilder] to read specific SST. pub fn new( - file_dir: String, + table_dir: String, path_type: PathType, file_handle: FileHandle, object_store: ObjectStore, ) -> ParquetReaderBuilder { ParquetReaderBuilder { - file_dir, + table_dir, path_type, file_handle, object_store, @@ -237,7 +237,7 @@ impl ParquetReaderBuilder { ) -> Result<(FileRangeContext, RowGroupSelection)> { let start = Instant::now(); - let file_path = self.file_handle.file_path(&self.file_dir, self.path_type); + let file_path = self.file_handle.file_path(&self.table_dir, self.path_type); let file_size = self.file_handle.meta_ref().file_size; // Loads parquet metadata of the file. @@ -1227,7 +1227,6 @@ impl ParquetReader { self.context.read_format().metadata() } - #[cfg(test)] pub fn parquet_metadata(&self) -> Arc { self.context.reader_builder().parquet_meta.clone() } diff --git a/src/mito2/src/sst/parquet/writer.rs b/src/mito2/src/sst/parquet/writer.rs index d52615690f..857ec08878 100644 --- a/src/mito2/src/sst/parquet/writer.rs +++ b/src/mito2/src/sst/parquet/writer.rs @@ -62,7 +62,7 @@ use crate::sst::{ }; /// Parquet SST writer. -pub struct ParquetWriter { +pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> { /// Path provider that creates SST and index file paths according to file id. path_provider: P, writer: Option>>, @@ -81,7 +81,7 @@ pub struct ParquetWriter, /// Write metrics - metrics: Metrics, + metrics: &'a mut Metrics, } pub trait WriterFactory { @@ -107,7 +107,7 @@ impl WriterFactory for ObjectStoreWriterFactory { } } -impl ParquetWriter +impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> where P: FilePathProvider, I: IndexerBuilder, @@ -118,8 +118,8 @@ where index_config: IndexConfig, indexer_builder: I, path_provider: P, - metrics: Metrics, - ) -> ParquetWriter { + metrics: &'a mut Metrics, + ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> { ParquetWriter::new( ObjectStoreWriterFactory { object_store }, metadata, @@ -137,7 +137,7 @@ where } } -impl ParquetWriter +impl<'a, F, I, P> ParquetWriter<'a, F, I, P> where F: WriterFactory, I: IndexerBuilder, @@ -150,8 +150,8 @@ where index_config: IndexConfig, indexer_builder: I, path_provider: P, - metrics: Metrics, - ) -> ParquetWriter { + metrics: &'a mut Metrics, + ) -> ParquetWriter<'a, F, I, P> { let init_file = FileId::random(); let indexer = indexer_builder.build(init_file).await; @@ -487,11 +487,6 @@ where Ok(self.writer.as_mut().unwrap()) } } - - /// Consumes write and return the collected metrics. - pub fn into_metrics(self) -> Metrics { - self.metrics - } } #[derive(Default)] diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 75aff36b52..adb88f3467 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -423,7 +423,7 @@ fn region_id_to_index(id: RegionId, num_workers: usize) -> usize { % num_workers } -async fn write_cache_from_config( +pub async fn write_cache_from_config( config: &MitoConfig, puffin_manager_factory: PuffinManagerFactory, intermediate_manager: IntermediateManager,