feat: objbench sub command for datanode (#7114)

* feat/objbench-subcmd:
 ### Add Object Storage Benchmark Tool and Update Dependencies

 - **`Cargo.lock` & `Cargo.toml`**: Added dependencies for `colored`, `parquet`, and `pprof` to support new features.
 - **`datanode.rs`**: Introduced `ObjbenchCommand` for benchmarking object storage, including command-line options for configuration and execution. Added `StorageConfig` and `StorageConfigWrapper` for storage engine configuration.
 - **`datanode.rs`**: Implemented a stub for `build_object_store` function to initialize object storage.

 These changes introduce a new subcommand for object storage benchmarking and update dependencies to support additional functionality.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* init

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* fix: code style and clippy

* feat/objbench-subcmd:
 Improve error handling in `objbench.rs`

 - Enhanced error handling in `parse_config` and `parse_file_dir_components` functions by replacing `unwrap` with `OptionExt` and `context` for better error messages.
 - Updated `build_access_layer_simple` and `build_cache_manager` functions to use `map_err` for more descriptive error handling.

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

* chore: rebase main

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>

---------

Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Lei, HUANG
2025-10-28 22:26:29 -07:00
committed by GitHub
parent 37bc2e6b07
commit f0afd675e3
17 changed files with 834 additions and 88 deletions

14
Cargo.lock generated
View File

@@ -1896,6 +1896,7 @@ dependencies = [
"clap 4.5.40", "clap 4.5.40",
"cli", "cli",
"client", "client",
"colored",
"common-base", "common-base",
"common-catalog", "common-catalog",
"common-config", "common-config",
@@ -1917,6 +1918,7 @@ dependencies = [
"common-wal", "common-wal",
"datanode", "datanode",
"datatypes", "datatypes",
"either",
"etcd-client", "etcd-client",
"file-engine", "file-engine",
"flow", "flow",
@@ -1932,7 +1934,9 @@ dependencies = [
"moka", "moka",
"nu-ansi-term", "nu-ansi-term",
"object-store", "object-store",
"parquet",
"plugins", "plugins",
"pprof",
"prometheus", "prometheus",
"prost 0.13.5", "prost 0.13.5",
"query", "query",
@@ -1975,6 +1979,16 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
[[package]]
name = "colored"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c"
dependencies = [
"lazy_static",
"windows-sys 0.59.0",
]
[[package]] [[package]]
name = "comfy-table" name = "comfy-table"
version = "7.1.2" version = "7.1.2"

View File

@@ -29,9 +29,11 @@ base64.workspace = true
cache.workspace = true cache.workspace = true
catalog.workspace = true catalog.workspace = true
chrono.workspace = true chrono.workspace = true
either = "1.15"
clap.workspace = true clap.workspace = true
cli.workspace = true cli.workspace = true
client.workspace = true client.workspace = true
colored = "2.1.0"
common-base.workspace = true common-base.workspace = true
common-catalog.workspace = true common-catalog.workspace = true
common-config.workspace = true common-config.workspace = true
@@ -63,10 +65,13 @@ lazy_static.workspace = true
meta-client.workspace = true meta-client.workspace = true
meta-srv.workspace = true meta-srv.workspace = true
metric-engine.workspace = true metric-engine.workspace = true
mito2.workspace = true
moka.workspace = true moka.workspace = true
nu-ansi-term = "0.46" nu-ansi-term = "0.46"
object-store.workspace = true object-store.workspace = true
parquet = { workspace = true, features = ["object_store"] }
plugins.workspace = true plugins.workspace = true
pprof = "0.14.0"
prometheus.workspace = true prometheus.workspace = true
prost.workspace = true prost.workspace = true
query.workspace = true query.workspace = true

View File

@@ -103,12 +103,15 @@ async fn main_body() -> Result<()> {
async fn start(cli: Command) -> Result<()> { async fn start(cli: Command) -> Result<()> {
match cli.subcmd { match cli.subcmd {
SubCommand::Datanode(cmd) => { SubCommand::Datanode(cmd) => match cmd.subcmd {
let opts = cmd.load_options(&cli.global_options)?; datanode::SubCommand::Start(ref start) => {
let plugins = Plugins::new(); let opts = start.load_options(&cli.global_options)?;
let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?; let plugins = Plugins::new();
cmd.build_with(builder).await?.run().await let builder = InstanceBuilder::try_new_with_init(opts, plugins).await?;
} cmd.build_with(builder).await?.run().await
}
datanode::SubCommand::Objbench(ref bench) => bench.run().await,
},
SubCommand::Flownode(cmd) => { SubCommand::Flownode(cmd) => {
cmd.build(cmd.load_options(&cli.global_options)?) cmd.build(cmd.load_options(&cli.global_options)?)
.await? .await?

View File

@@ -13,6 +13,8 @@
// limitations under the License. // limitations under the License.
pub mod builder; pub mod builder;
#[allow(clippy::print_stdout)]
mod objbench;
use std::path::Path; use std::path::Path;
use std::time::Duration; use std::time::Duration;
@@ -23,13 +25,16 @@ use common_config::Configurable;
use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions}; use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
use common_telemetry::{info, warn}; use common_telemetry::{info, warn};
use common_wal::config::DatanodeWalConfig; use common_wal::config::DatanodeWalConfig;
use datanode::config::RegionEngineConfig;
use datanode::datanode::Datanode; use datanode::datanode::Datanode;
use meta_client::MetaClientOptions; use meta_client::MetaClientOptions;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, ensure}; use snafu::{ResultExt, ensure};
use tracing_appender::non_blocking::WorkerGuard; use tracing_appender::non_blocking::WorkerGuard;
use crate::App; use crate::App;
use crate::datanode::builder::InstanceBuilder; use crate::datanode::builder::InstanceBuilder;
use crate::datanode::objbench::ObjbenchCommand;
use crate::error::{ use crate::error::{
LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu, LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
}; };
@@ -89,7 +94,7 @@ impl App for Instance {
#[derive(Parser)] #[derive(Parser)]
pub struct Command { pub struct Command {
#[clap(subcommand)] #[clap(subcommand)]
subcmd: SubCommand, pub subcmd: SubCommand,
} }
impl Command { impl Command {
@@ -100,13 +105,26 @@ impl Command {
pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> { pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
match &self.subcmd { match &self.subcmd {
SubCommand::Start(cmd) => cmd.load_options(global_options), SubCommand::Start(cmd) => cmd.load_options(global_options),
SubCommand::Objbench(_) => {
// For objbench command, we don't need to load DatanodeOptions
// It's a standalone utility command
let mut opts = datanode::config::DatanodeOptions::default();
opts.sanitize();
Ok(DatanodeOptions {
runtime: Default::default(),
plugins: Default::default(),
component: opts,
})
}
} }
} }
} }
#[derive(Parser)] #[derive(Parser)]
enum SubCommand { pub enum SubCommand {
Start(StartCommand), Start(StartCommand),
/// Object storage benchmark tool
Objbench(ObjbenchCommand),
} }
impl SubCommand { impl SubCommand {
@@ -116,12 +134,33 @@ impl SubCommand {
info!("Building datanode with {:#?}", cmd); info!("Building datanode with {:#?}", cmd);
builder.build().await builder.build().await
} }
SubCommand::Objbench(cmd) => {
cmd.run().await?;
std::process::exit(0);
}
} }
} }
} }
/// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
pub struct StorageConfig {
/// The working directory of database
pub data_home: String,
#[serde(flatten)]
pub store: object_store::config::ObjectStoreConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(default)]
struct StorageConfigWrapper {
storage: StorageConfig,
region_engine: Vec<RegionEngineConfig>,
}
#[derive(Debug, Parser, Default)] #[derive(Debug, Parser, Default)]
struct StartCommand { pub struct StartCommand {
#[clap(long)] #[clap(long)]
node_id: Option<u64>, node_id: Option<u64>,
/// The address to bind the gRPC server. /// The address to bind the gRPC server.
@@ -149,7 +188,7 @@ struct StartCommand {
} }
impl StartCommand { impl StartCommand {
fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> { pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
let mut opts = DatanodeOptions::load_layered_options( let mut opts = DatanodeOptions::load_layered_options(
self.config_file.as_deref(), self.config_file.as_deref(),
self.env_prefix.as_ref(), self.env_prefix.as_ref(),

View File

@@ -0,0 +1,676 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use clap::Parser;
use colored::Colorize;
use datanode::config::RegionEngineConfig;
use datanode::store;
use either::Either;
use mito2::access_layer::{
AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
};
use mito2::cache::{CacheManager, CacheManagerRef};
use mito2::config::{FulltextIndexConfig, MitoConfig, Mode};
use mito2::read::Source;
use mito2::sst::file::{FileHandle, FileMeta};
use mito2::sst::file_purger::{FilePurger, FilePurgerRef};
use mito2::sst::index::intermediate::IntermediateManager;
use mito2::sst::index::puffin_manager::PuffinManagerFactory;
use mito2::sst::parquet::reader::ParquetReaderBuilder;
use mito2::sst::parquet::{PARQUET_METADATA_KEY, WriteOptions};
use mito2::worker::write_cache_from_config;
use object_store::ObjectStore;
use regex::Regex;
use snafu::OptionExt;
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::path_utils::region_name;
use store_api::region_request::PathType;
use store_api::storage::FileId;
use crate::datanode::{StorageConfig, StorageConfigWrapper};
use crate::error;
/// Object storage benchmark command
#[derive(Debug, Parser)]
pub struct ObjbenchCommand {
/// Path to the object-store config file (TOML). Must deserialize into object_store::config::ObjectStoreConfig.
#[clap(long, value_name = "FILE")]
pub config: PathBuf,
/// Source SST file path in object-store (e.g. "region_dir/<uuid>.parquet").
#[clap(long, value_name = "PATH")]
pub source: String,
/// Verbose output
#[clap(short, long, default_value_t = false)]
pub verbose: bool,
/// Output file path for pprof flamegraph (enables profiling)
#[clap(long, value_name = "FILE")]
pub pprof_file: Option<PathBuf>,
}
fn parse_config(config_path: &PathBuf) -> error::Result<(StorageConfig, MitoConfig)> {
let cfg_str = std::fs::read_to_string(config_path).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to read config {}: {e}", config_path.display()),
}
.build()
})?;
let store_cfg: StorageConfigWrapper = toml::from_str(&cfg_str).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("failed to parse config {}: {e}", config_path.display()),
}
.build()
})?;
let storage_config = store_cfg.storage;
let mito_engine_config = store_cfg
.region_engine
.into_iter()
.filter_map(|c| {
if let RegionEngineConfig::Mito(mito) = c {
Some(mito)
} else {
None
}
})
.next()
.with_context(|| error::IllegalConfigSnafu {
msg: format!("Engine config not found in {:?}", config_path),
})?;
Ok((storage_config, mito_engine_config))
}
impl ObjbenchCommand {
pub async fn run(&self) -> error::Result<()> {
if self.verbose {
common_telemetry::init_default_ut_logging();
}
println!("{}", "Starting objbench with config:".cyan().bold());
// Build object store from config
let (store_cfg, mut mito_engine_config) = parse_config(&self.config)?;
let object_store = build_object_store(&store_cfg).await?;
println!("{} Object store initialized", "".green());
// Prepare source identifiers
let components = parse_file_dir_components(&self.source)?;
println!(
"{} Source path parsed: {}, components: {:?}",
"".green(),
self.source,
components
);
// Load parquet metadata to extract RegionMetadata and file stats
println!("{}", "Loading parquet metadata...".yellow());
let file_size = object_store
.stat(&self.source)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("stat failed: {e}"),
}
.build()
})?
.content_length();
let parquet_meta = load_parquet_metadata(object_store.clone(), &self.source, file_size)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("read parquet metadata failed: {e}"),
}
.build()
})?;
let region_meta = extract_region_metadata(&self.source, &parquet_meta)?;
let num_rows = parquet_meta.file_metadata().num_rows() as u64;
let num_row_groups = parquet_meta.num_row_groups() as u64;
println!(
"{} Metadata loaded - rows: {}, size: {} bytes",
"".green(),
num_rows,
file_size
);
// Build a FileHandle for the source file
let file_meta = FileMeta {
region_id: region_meta.region_id,
file_id: components.file_id,
time_range: Default::default(),
level: 0,
file_size,
available_indexes: Default::default(),
index_file_size: 0,
num_rows,
num_row_groups,
sequence: None,
partition_expr: None,
num_series: 0,
};
let src_handle = FileHandle::new(file_meta, new_noop_file_purger());
// Build the reader for a single file via ParquetReaderBuilder
let table_dir = components.table_dir();
let (src_access_layer, cache_manager) = build_access_layer_simple(
&components,
object_store.clone(),
&mut mito_engine_config,
&store_cfg.data_home,
)
.await?;
let reader_build_start = Instant::now();
let reader = ParquetReaderBuilder::new(
table_dir,
components.path_type,
src_handle.clone(),
object_store.clone(),
)
.expected_metadata(Some(region_meta.clone()))
.build()
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("build reader failed: {e:?}"),
}
.build()
})?;
let reader_build_elapsed = reader_build_start.elapsed();
let total_rows = reader.parquet_metadata().file_metadata().num_rows();
println!("{} Reader built in {:?}", "".green(), reader_build_elapsed);
// Build write request
let fulltext_index_config = FulltextIndexConfig {
create_on_compaction: Mode::Disable,
..Default::default()
};
let write_req = SstWriteRequest {
op_type: OperationType::Flush,
metadata: region_meta,
source: Either::Left(Source::Reader(Box::new(reader))),
cache_manager,
storage: None,
max_sequence: None,
index_options: Default::default(),
index_config: mito_engine_config.index.clone(),
inverted_index_config: MitoConfig::default().inverted_index,
fulltext_index_config,
bloom_filter_index_config: MitoConfig::default().bloom_filter_index,
};
// Write SST
println!("{}", "Writing SST...".yellow());
// Start profiling if pprof_file is specified
#[cfg(unix)]
let profiler_guard = if self.pprof_file.is_some() {
println!("{} Starting profiling...", "".yellow());
Some(
pprof::ProfilerGuardBuilder::default()
.frequency(99)
.blocklist(&["libc", "libgcc", "pthread", "vdso"])
.build()
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to start profiler: {e}"),
}
.build()
})?,
)
} else {
None
};
#[cfg(not(unix))]
if self.pprof_file.is_some() {
eprintln!(
"{}: Profiling is not supported on this platform",
"Warning".yellow()
);
}
let write_start = Instant::now();
let mut metrics = Metrics::new(WriteType::Flush);
let infos = src_access_layer
.write_sst(write_req, &WriteOptions::default(), &mut metrics)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("write_sst failed: {e:?}"),
}
.build()
})?;
let write_elapsed = write_start.elapsed();
// Stop profiling and generate flamegraph if enabled
#[cfg(unix)]
if let (Some(guard), Some(pprof_file)) = (profiler_guard, &self.pprof_file) {
println!("{} Generating flamegraph...", "🔥".yellow());
match guard.report().build() {
Ok(report) => {
let mut flamegraph_data = Vec::new();
if let Err(e) = report.flamegraph(&mut flamegraph_data) {
println!("{}: Failed to generate flamegraph: {}", "Error".red(), e);
} else if let Err(e) = std::fs::write(pprof_file, flamegraph_data) {
println!(
"{}: Failed to write flamegraph to {}: {}",
"Error".red(),
pprof_file.display(),
e
);
} else {
println!(
"{} Flamegraph saved to {}",
"".green(),
pprof_file.display().to_string().cyan()
);
}
}
Err(e) => {
println!("{}: Failed to generate pprof report: {}", "Error".red(), e);
}
}
}
assert_eq!(infos.len(), 1);
let dst_file_id = infos[0].file_id;
let dst_file_path = format!("{}/{}.parquet", components.region_dir(), dst_file_id);
let mut dst_index_path = None;
if infos[0].index_metadata.file_size > 0 {
dst_index_path = Some(format!(
"{}/index/{}.puffin",
components.region_dir(),
dst_file_id
));
}
// Report results with ANSI colors
println!("\n{} {}", "Write complete!".green().bold(), "".green());
println!(" {}: {}", "Destination file".bold(), dst_file_path.cyan());
println!(" {}: {}", "Rows".bold(), total_rows.to_string().cyan());
println!(
" {}: {}",
"File size".bold(),
format!("{} bytes", file_size).cyan()
);
println!(
" {}: {:?}",
"Reader build time".bold(),
reader_build_elapsed
);
println!(" {}: {:?}", "Total time".bold(), write_elapsed);
// Print metrics in a formatted way
println!(" {}: {:?}", "Metrics".bold(), metrics,);
// Print infos
println!(" {}: {:?}", "Index".bold(), infos[0].index_metadata);
// Cleanup
println!("\n{}", "Cleaning up...".yellow());
object_store.delete(&dst_file_path).await.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to delete dest file {}: {}", dst_file_path, e),
}
.build()
})?;
println!("{} Temporary file {} deleted", "".green(), dst_file_path);
if let Some(index_path) = dst_index_path {
object_store.delete(&index_path).await.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to delete dest index file {}: {}", index_path, e),
}
.build()
})?;
println!(
"{} Temporary index file {} deleted",
"".green(),
index_path
);
}
println!("\n{}", "Benchmark completed successfully!".green().bold());
Ok(())
}
}
#[derive(Debug)]
struct FileDirComponents {
catalog: String,
schema: String,
table_id: u32,
region_sequence: u32,
path_type: PathType,
file_id: FileId,
}
impl FileDirComponents {
fn table_dir(&self) -> String {
format!("data/{}/{}/{}", self.catalog, self.schema, self.table_id)
}
fn region_dir(&self) -> String {
let region_name = region_name(self.table_id, self.region_sequence);
match self.path_type {
PathType::Bare => {
format!(
"data/{}/{}/{}/{}",
self.catalog, self.schema, self.table_id, region_name
)
}
PathType::Data => {
format!(
"data/{}/{}/{}/{}/data",
self.catalog, self.schema, self.table_id, region_name
)
}
PathType::Metadata => {
format!(
"data/{}/{}/{}/{}/metadata",
self.catalog, self.schema, self.table_id, region_name
)
}
}
}
}
fn parse_file_dir_components(path: &str) -> error::Result<FileDirComponents> {
// Define the regex pattern to match all three path styles
let pattern =
r"^data/([^/]+)/([^/]+)/([^/]+)/([^/]+)_([^/]+)(?:/data|/metadata)?/(.+).parquet$";
// Compile the regex
let re = Regex::new(pattern).expect("Invalid regex pattern");
// Determine the path type
let path_type = if path.contains("/data/") {
PathType::Data
} else if path.contains("/metadata/") {
PathType::Metadata
} else {
PathType::Bare
};
// Try to match the path
let components = (|| {
let captures = re.captures(path)?;
if captures.len() != 7 {
return None;
}
let mut components = FileDirComponents {
catalog: "".to_string(),
schema: "".to_string(),
table_id: 0,
region_sequence: 0,
path_type,
file_id: FileId::default(),
};
// Extract the components
components.catalog = captures.get(1)?.as_str().to_string();
components.schema = captures.get(2)?.as_str().to_string();
components.table_id = captures[3].parse().ok()?;
components.region_sequence = captures[5].parse().ok()?;
let file_id_str = &captures[6];
components.file_id = FileId::parse_str(file_id_str).ok()?;
Some(components)
})();
components.context(error::IllegalConfigSnafu {
msg: format!("Expect valid source file path, got: {}", path),
})
}
fn extract_region_metadata(
file_path: &str,
meta: &parquet::file::metadata::ParquetMetaData,
) -> error::Result<RegionMetadataRef> {
use parquet::format::KeyValue;
let kvs: Option<&Vec<KeyValue>> = meta.file_metadata().key_value_metadata();
let Some(kvs) = kvs else {
return Err(error::IllegalConfigSnafu {
msg: format!("{file_path}: missing parquet key_value metadata"),
}
.build());
};
let json = kvs
.iter()
.find(|kv| kv.key == PARQUET_METADATA_KEY)
.and_then(|kv| kv.value.as_ref())
.ok_or_else(|| {
error::IllegalConfigSnafu {
msg: format!("{file_path}: key {PARQUET_METADATA_KEY} not found or empty"),
}
.build()
})?;
let region: RegionMetadata = RegionMetadata::from_json(json).map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("invalid region metadata json: {e}"),
}
.build()
})?;
Ok(Arc::new(region))
}
async fn build_object_store(sc: &StorageConfig) -> error::Result<ObjectStore> {
store::new_object_store(sc.store.clone(), &sc.data_home)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to build object store: {e:?}"),
}
.build()
})
}
async fn build_access_layer_simple(
components: &FileDirComponents,
object_store: ObjectStore,
config: &mut MitoConfig,
data_home: &str,
) -> error::Result<(AccessLayerRef, CacheManagerRef)> {
let _ = config.index.sanitize(data_home, &config.inverted_index);
let puffin_manager = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to build access layer: {e:?}"),
}
.build()
})?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to build IntermediateManager: {e:?}"),
}
.build()
})?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let cache_manager =
build_cache_manager(config, puffin_manager.clone(), intermediate_manager.clone()).await?;
let layer = AccessLayer::new(
components.table_dir(),
components.path_type,
object_store,
puffin_manager,
intermediate_manager,
);
Ok((Arc::new(layer), cache_manager))
}
async fn build_cache_manager(
config: &MitoConfig,
puffin_manager: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> error::Result<CacheManagerRef> {
let write_cache = write_cache_from_config(config, puffin_manager, intermediate_manager)
.await
.map_err(|e| {
error::IllegalConfigSnafu {
msg: format!("Failed to build write cache: {e:?}"),
}
.build()
})?;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.selector_result_cache_size(config.selector_result_cache_size.as_bytes())
.index_metadata_size(config.index.metadata_cache_size.as_bytes())
.index_content_size(config.index.content_cache_size.as_bytes())
.index_content_page_size(config.index.content_cache_page_size.as_bytes())
.index_result_cache_size(config.index.result_cache_size.as_bytes())
.puffin_metadata_size(config.index.metadata_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
);
Ok(cache_manager)
}
fn new_noop_file_purger() -> FilePurgerRef {
#[derive(Debug)]
struct Noop;
impl FilePurger for Noop {
fn remove_file(&self, _file_meta: FileMeta, _is_delete: bool) {}
}
Arc::new(Noop)
}
async fn load_parquet_metadata(
object_store: ObjectStore,
path: &str,
file_size: u64,
) -> Result<parquet::file::metadata::ParquetMetaData, Box<dyn std::error::Error + Send + Sync>> {
use parquet::file::FOOTER_SIZE;
use parquet::file::metadata::ParquetMetaDataReader;
let actual_size = if file_size == 0 {
object_store.stat(path).await?.content_length()
} else {
file_size
};
if actual_size < FOOTER_SIZE as u64 {
return Err("file too small".into());
}
let prefetch: u64 = 64 * 1024;
let start = actual_size.saturating_sub(prefetch);
let buffer = object_store
.read_with(path)
.range(start..actual_size)
.await?
.to_vec();
let buffer_len = buffer.len();
let mut footer = [0; 8];
footer.copy_from_slice(&buffer[buffer_len - FOOTER_SIZE..]);
let footer = ParquetMetaDataReader::decode_footer_tail(&footer)?;
let metadata_len = footer.metadata_length() as u64;
if actual_size - (FOOTER_SIZE as u64) < metadata_len {
return Err("invalid footer/metadata length".into());
}
if (metadata_len as usize) <= buffer_len - FOOTER_SIZE {
let metadata_start = buffer_len - metadata_len as usize - FOOTER_SIZE;
let meta = ParquetMetaDataReader::decode_metadata(
&buffer[metadata_start..buffer_len - FOOTER_SIZE],
)?;
Ok(meta)
} else {
let metadata_start = actual_size - metadata_len - FOOTER_SIZE as u64;
let data = object_store
.read_with(path)
.range(metadata_start..(actual_size - FOOTER_SIZE as u64))
.await?
.to_vec();
let meta = ParquetMetaDataReader::decode_metadata(&data)?;
Ok(meta)
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::str::FromStr;
use common_base::readable_size::ReadableSize;
use store_api::region_request::PathType;
use crate::datanode::objbench::{parse_config, parse_file_dir_components};
#[test]
fn test_parse_dir() {
let meta_path = "data/greptime/public/1024/1024_0000000000/metadata/00020380-009c-426d-953e-b4e34c15af34.parquet";
let c = parse_file_dir_components(meta_path).unwrap();
assert_eq!(
c.file_id.to_string(),
"00020380-009c-426d-953e-b4e34c15af34"
);
assert_eq!(c.catalog, "greptime");
assert_eq!(c.schema, "public");
assert_eq!(c.table_id, 1024);
assert_eq!(c.region_sequence, 0);
assert_eq!(c.path_type, PathType::Metadata);
let c = parse_file_dir_components(
"data/greptime/public/1024/1024_0000000000/data/00020380-009c-426d-953e-b4e34c15af34.parquet",
).unwrap();
assert_eq!(
c.file_id.to_string(),
"00020380-009c-426d-953e-b4e34c15af34"
);
assert_eq!(c.catalog, "greptime");
assert_eq!(c.schema, "public");
assert_eq!(c.table_id, 1024);
assert_eq!(c.region_sequence, 0);
assert_eq!(c.path_type, PathType::Data);
let c = parse_file_dir_components(
"data/greptime/public/1024/1024_0000000000/00020380-009c-426d-953e-b4e34c15af34.parquet",
).unwrap();
assert_eq!(
c.file_id.to_string(),
"00020380-009c-426d-953e-b4e34c15af34"
);
assert_eq!(c.catalog, "greptime");
assert_eq!(c.schema, "public");
assert_eq!(c.table_id, 1024);
assert_eq!(c.region_sequence, 0);
assert_eq!(c.path_type, PathType::Bare);
}
#[test]
fn test_parse_config() {
let path = "../../config/datanode.example.toml";
let (storage, engine) = parse_config(&PathBuf::from_str(path).unwrap()).unwrap();
assert_eq!(storage.data_home, "./greptimedb_data");
assert_eq!(engine.index.staging_size, ReadableSize::gb(2));
}
}

View File

@@ -47,10 +47,7 @@ pub(crate) async fn new_object_store_without_cache(
Ok(object_store) Ok(object_store)
} }
pub(crate) async fn new_object_store( pub async fn new_object_store(store: ObjectStoreConfig, data_home: &str) -> Result<ObjectStore> {
store: ObjectStoreConfig,
data_home: &str,
) -> Result<ObjectStore> {
let object_store = new_raw_object_store(&store, data_home) let object_store = new_raw_object_store(&store, data_home)
.await .await
.context(error::ObjectStoreSnafu)?; .context(error::ObjectStoreSnafu)?;
@@ -59,7 +56,7 @@ pub(crate) async fn new_object_store(
let object_store = { let object_store = {
// It's safe to unwrap here because we already checked above. // It's safe to unwrap here because we already checked above.
let cache_config = store.cache_config().unwrap(); let cache_config = store.cache_config().unwrap();
if let Some(cache_layer) = build_cache_layer(cache_config).await? { if let Some(cache_layer) = build_cache_layer(cache_config, data_home).await? {
// Adds cache layer // Adds cache layer
object_store.layer(cache_layer) object_store.layer(cache_layer)
} else { } else {
@@ -79,17 +76,22 @@ pub(crate) async fn new_object_store(
async fn build_cache_layer( async fn build_cache_layer(
cache_config: &ObjectStorageCacheConfig, cache_config: &ObjectStorageCacheConfig,
data_home: &str,
) -> Result<Option<LruCacheLayer<impl Access>>> { ) -> Result<Option<LruCacheLayer<impl Access>>> {
// No need to build cache layer if read cache is disabled. // No need to build cache layer if read cache is disabled.
if !cache_config.enable_read_cache { if !cache_config.enable_read_cache {
return Ok(None); return Ok(None);
} }
let cache_base_dir = if cache_config.cache_path.is_empty() {
let atomic_temp_dir = join_dir(&cache_config.cache_path, ATOMIC_WRITE_DIR); data_home
} else {
&cache_config.cache_path
};
let atomic_temp_dir = join_dir(cache_base_dir, ATOMIC_WRITE_DIR);
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?; clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
let cache_store = Fs::default() let cache_store = Fs::default()
.root(&cache_config.cache_path) .root(cache_base_dir)
.atomic_write_dir(&atomic_temp_dir) .atomic_write_dir(&atomic_temp_dir)
.build() .build()
.context(error::BuildCacheStoreSnafu)?; .context(error::BuildCacheStoreSnafu)?;

View File

@@ -72,7 +72,7 @@ pub struct Metrics {
} }
impl Metrics { impl Metrics {
pub(crate) fn new(write_type: WriteType) -> Self { pub fn new(write_type: WriteType) -> Self {
Self { Self {
write_type, write_type,
iter_source: Default::default(), iter_source: Default::default(),
@@ -255,12 +255,12 @@ impl AccessLayer {
&self, &self,
request: SstWriteRequest, request: SstWriteRequest,
write_opts: &WriteOptions, write_opts: &WriteOptions,
write_type: WriteType, metrics: &mut Metrics,
) -> Result<(SstInfoArray, Metrics)> { ) -> Result<SstInfoArray> {
let region_id = request.metadata.region_id; let region_id = request.metadata.region_id;
let cache_manager = request.cache_manager.clone(); let cache_manager = request.cache_manager.clone();
let (sst_info, metrics) = if let Some(write_cache) = cache_manager.write_cache() { let sst_info = if let Some(write_cache) = cache_manager.write_cache() {
// Write to the write cache. // Write to the write cache.
write_cache write_cache
.write_and_upload_sst( .write_and_upload_sst(
@@ -273,7 +273,7 @@ impl AccessLayer {
remote_store: self.object_store.clone(), remote_store: self.object_store.clone(),
}, },
write_opts, write_opts,
write_type, metrics,
) )
.await? .await?
} else { } else {
@@ -303,11 +303,11 @@ impl AccessLayer {
request.index_config, request.index_config,
indexer_builder, indexer_builder,
path_provider, path_provider,
Metrics::new(write_type), metrics,
) )
.await .await
.with_file_cleaner(cleaner); .with_file_cleaner(cleaner);
let ssts = match request.source { match request.source {
Either::Left(source) => { Either::Left(source) => {
writer writer
.write_all(source, request.max_sequence, write_opts) .write_all(source, request.max_sequence, write_opts)
@@ -316,9 +316,7 @@ impl AccessLayer {
Either::Right(flat_source) => { Either::Right(flat_source) => {
writer.write_all_flat(flat_source, write_opts).await? writer.write_all_flat(flat_source, write_opts).await?
} }
}; }
let metrics = writer.into_metrics();
(ssts, metrics)
}; };
// Put parquet metadata to cache manager. // Put parquet metadata to cache manager.
@@ -333,7 +331,7 @@ impl AccessLayer {
} }
} }
Ok((sst_info, metrics)) Ok(sst_info)
} }
/// Puts encoded SST bytes to the write cache (if enabled) and uploads it to the object store. /// Puts encoded SST bytes to the write cache (if enabled) and uploads it to the object store.

View File

@@ -169,8 +169,8 @@ impl WriteCache {
write_request: SstWriteRequest, write_request: SstWriteRequest,
upload_request: SstUploadRequest, upload_request: SstUploadRequest,
write_opts: &WriteOptions, write_opts: &WriteOptions,
write_type: WriteType, metrics: &mut Metrics,
) -> Result<(SstInfoArray, Metrics)> { ) -> Result<SstInfoArray> {
let region_id = write_request.metadata.region_id; let region_id = write_request.metadata.region_id;
let store = self.file_cache.local_store(); let store = self.file_cache.local_store();
@@ -197,7 +197,7 @@ impl WriteCache {
write_request.index_config, write_request.index_config,
indexer, indexer,
path_provider.clone(), path_provider.clone(),
Metrics::new(write_type), metrics,
) )
.await .await
.with_file_cleaner(cleaner); .with_file_cleaner(cleaner);
@@ -210,11 +210,10 @@ impl WriteCache {
} }
either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?, either::Right(flat_source) => writer.write_all_flat(flat_source, write_opts).await?,
}; };
let mut metrics = writer.into_metrics();
// Upload sst file to remote object store. // Upload sst file to remote object store.
if sst_info.is_empty() { if sst_info.is_empty() {
return Ok((sst_info, metrics)); return Ok(sst_info);
} }
let mut upload_tracker = UploadTracker::new(region_id); let mut upload_tracker = UploadTracker::new(region_id);
@@ -256,7 +255,7 @@ impl WriteCache {
return Err(err); return Err(err);
} }
Ok((sst_info, metrics)) Ok(sst_info)
} }
/// Removes a file from the cache by `index_key`. /// Removes a file from the cache by `index_key`.
@@ -559,8 +558,9 @@ mod tests {
}; };
// Write to cache and upload sst to mock remote store // Write to cache and upload sst to mock remote store
let (mut sst_infos, _) = write_cache let mut metrics = Metrics::new(WriteType::Flush);
.write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush) let mut sst_infos = write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
.await .await
.unwrap(); .unwrap();
let sst_info = sst_infos.remove(0); let sst_info = sst_infos.remove(0);
@@ -655,8 +655,9 @@ mod tests {
remote_store: mock_store.clone(), remote_store: mock_store.clone(),
}; };
let (mut sst_infos, _) = write_cache let mut metrics = Metrics::new(WriteType::Flush);
.write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush) let mut sst_infos = write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
.await .await
.unwrap(); .unwrap();
let sst_info = sst_infos.remove(0); let sst_info = sst_infos.remove(0);
@@ -735,8 +736,9 @@ mod tests {
remote_store: mock_store.clone(), remote_store: mock_store.clone(),
}; };
let mut metrics = Metrics::new(WriteType::Flush);
write_cache write_cache
.write_and_upload_sst(write_request, upload_request, &write_opts, WriteType::Flush) .write_and_upload_sst(write_request, upload_request, &write_opts, &mut metrics)
.await .await
.unwrap_err(); .unwrap_err();
let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR); let atomic_write_dir = write_cache_dir.path().join(ATOMIC_WRITE_DIR);

View File

@@ -30,7 +30,9 @@ use store_api::metadata::RegionMetadataRef;
use store_api::region_request::PathType; use store_api::region_request::PathType;
use store_api::storage::RegionId; use store_api::storage::RegionId;
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType}; use crate::access_layer::{
AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
};
use crate::cache::{CacheManager, CacheManagerRef}; use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{PickerOutput, new_picker}; use crate::compaction::picker::{PickerOutput, new_picker};
use crate::compaction::{CompactionSstReaderBuilder, find_ttl}; use crate::compaction::{CompactionSstReaderBuilder, find_ttl};
@@ -387,7 +389,8 @@ impl Compactor for DefaultCompactor {
let reader = builder.build_sst_reader().await?; let reader = builder.build_sst_reader().await?;
either::Left(Source::Reader(reader)) either::Left(Source::Reader(reader))
}; };
let (sst_infos, metrics) = sst_layer let mut metrics = Metrics::new(WriteType::Compaction);
let sst_infos = sst_layer
.write_sst( .write_sst(
SstWriteRequest { SstWriteRequest {
op_type: OperationType::Compact, op_type: OperationType::Compact,
@@ -403,7 +406,7 @@ impl Compactor for DefaultCompactor {
bloom_filter_index_config, bloom_filter_index_config,
}, },
&write_opts, &write_opts,
WriteType::Compaction, &mut metrics,
) )
.await?; .await?;
// Convert partition expression once outside the map // Convert partition expression once outside the map

View File

@@ -525,21 +525,19 @@ impl RegionFlushTask {
let source = Either::Left(source); let source = Either::Left(source);
let write_request = self.new_write_request(version, max_sequence, source); let write_request = self.new_write_request(version, max_sequence, source);
let (ssts_written, metrics) = self let mut metrics = Metrics::new(WriteType::Flush);
let ssts_written = self
.access_layer .access_layer
.write_sst(write_request, &write_opts, WriteType::Flush) .write_sst(write_request, &write_opts, &mut metrics)
.await?; .await?;
if ssts_written.is_empty() { if ssts_written.is_empty() {
// No data written. // No data written.
continue; continue;
} }
common_telemetry::debug!( debug!(
"Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}", "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
self.region_id, self.region_id, num_mem_ranges, num_mem_rows, metrics
num_mem_ranges,
num_mem_rows,
metrics
); );
flush_metrics = flush_metrics.merge(metrics); flush_metrics = flush_metrics.merge(metrics);
@@ -591,9 +589,11 @@ impl RegionFlushTask {
let semaphore = self.flush_semaphore.clone(); let semaphore = self.flush_semaphore.clone();
let task = common_runtime::spawn_global(async move { let task = common_runtime::spawn_global(async move {
let _permit = semaphore.acquire().await.unwrap(); let _permit = semaphore.acquire().await.unwrap();
access_layer let mut metrics = Metrics::new(WriteType::Flush);
.write_sst(write_request, &write_opts, WriteType::Flush) let ssts = access_layer
.await .write_sst(write_request, &write_opts, &mut metrics)
.await?;
Ok((ssts, metrics))
}); });
tasks.push(task); tasks.push(task);
} }

View File

@@ -47,7 +47,7 @@ pub mod schedule;
pub mod sst; pub mod sst;
mod time_provider; mod time_provider;
pub mod wal; pub mod wal;
mod worker; pub mod worker;
#[cfg_attr(doc, aquamarine::aquamarine)] #[cfg_attr(doc, aquamarine::aquamarine)]
/// # Mito developer document /// # Mito developer document

View File

@@ -295,8 +295,8 @@ impl FileHandle {
} }
/// Returns the complete file path of the file. /// Returns the complete file path of the file.
pub fn file_path(&self, file_dir: &str, path_type: PathType) -> String { pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
location::sst_file_path(file_dir, self.file_id(), path_type) location::sst_file_path(table_dir, self.file_id(), path_type)
} }
/// Returns the time range of the file. /// Returns the time range of the file.

View File

@@ -791,7 +791,7 @@ mod tests {
use tokio::sync::mpsc; use tokio::sync::mpsc;
use super::*; use super::*;
use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType}; use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
use crate::cache::write_cache::WriteCache; use crate::cache::write_cache::WriteCache;
use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode}; use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
use crate::memtable::time_partition::TimePartitions; use crate::memtable::time_partition::TimePartitions;
@@ -927,11 +927,11 @@ mod tests {
fulltext_index_config: Default::default(), fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(), bloom_filter_index_config: Default::default(),
}; };
let mut metrics = Metrics::new(WriteType::Flush);
env.access_layer env.access_layer
.write_sst(write_request, &WriteOptions::default(), WriteType::Flush) .write_sst(write_request, &WriteOptions::default(), &mut metrics)
.await .await
.unwrap() .unwrap()
.0
.remove(0) .remove(0)
} }

View File

@@ -181,13 +181,14 @@ mod tests {
..Default::default() ..Default::default()
}; };
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
IndexConfig::default(), IndexConfig::default(),
NoopIndexBuilder, NoopIndexBuilder,
file_path, file_path,
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;
@@ -243,6 +244,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
// Prepare data. // Prepare data.
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
@@ -251,7 +253,7 @@ mod tests {
FixedPathProvider { FixedPathProvider {
region_file_id: handle.file_id(), region_file_id: handle.file_id(),
}, },
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;
@@ -329,6 +331,7 @@ mod tests {
// write the sst file and get sst info // write the sst file and get sst info
// sst info contains the parquet metadata, which is converted from FileMetaData // sst info contains the parquet metadata, which is converted from FileMetaData
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
@@ -337,7 +340,7 @@ mod tests {
FixedPathProvider { FixedPathProvider {
region_file_id: handle.file_id(), region_file_id: handle.file_id(),
}, },
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;
@@ -378,6 +381,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
// Prepare data. // Prepare data.
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
@@ -386,7 +390,7 @@ mod tests {
FixedPathProvider { FixedPathProvider {
region_file_id: handle.file_id(), region_file_id: handle.file_id(),
}, },
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;
writer writer
@@ -437,6 +441,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
// Prepare data. // Prepare data.
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
@@ -445,7 +450,7 @@ mod tests {
FixedPathProvider { FixedPathProvider {
region_file_id: handle.file_id(), region_file_id: handle.file_id(),
}, },
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;
writer writer
@@ -481,6 +486,7 @@ mod tests {
..Default::default() ..Default::default()
}; };
// Prepare data. // Prepare data.
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
@@ -489,7 +495,7 @@ mod tests {
FixedPathProvider { FixedPathProvider {
region_file_id: handle.file_id(), region_file_id: handle.file_id(),
}, },
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;
@@ -639,13 +645,14 @@ mod tests {
table_dir: "test".to_string(), table_dir: "test".to_string(),
path_type: PathType::Bare, path_type: PathType::Bare,
}; };
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
IndexConfig::default(), IndexConfig::default(),
NoopIndexBuilder, NoopIndexBuilder,
path_provider, path_provider,
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;
@@ -716,13 +723,14 @@ mod tests {
bloom_filter_index_config: Default::default(), bloom_filter_index_config: Default::default(),
}; };
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
IndexConfig::default(), IndexConfig::default(),
indexer_builder, indexer_builder,
file_path.clone(), file_path.clone(),
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;
@@ -1092,13 +1100,14 @@ mod tests {
bloom_filter_index_config: Default::default(), bloom_filter_index_config: Default::default(),
}; };
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
IndexConfig::default(), IndexConfig::default(),
indexer_builder, indexer_builder,
file_path.clone(), file_path.clone(),
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;
@@ -1148,13 +1157,14 @@ mod tests {
..Default::default() ..Default::default()
}; };
let mut metrics = Metrics::new(WriteType::Flush);
let mut writer = ParquetWriter::new_with_object_store( let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(), object_store.clone(),
metadata.clone(), metadata.clone(),
IndexConfig::default(), IndexConfig::default(),
NoopIndexBuilder, NoopIndexBuilder,
file_path, file_path,
Metrics::new(WriteType::Flush), &mut metrics,
) )
.await; .await;

View File

@@ -91,7 +91,7 @@ macro_rules! handle_index_error {
/// Parquet SST reader builder. /// Parquet SST reader builder.
pub struct ParquetReaderBuilder { pub struct ParquetReaderBuilder {
/// SST directory. /// SST directory.
file_dir: String, table_dir: String,
/// Path type for generating file paths. /// Path type for generating file paths.
path_type: PathType, path_type: PathType,
file_handle: FileHandle, file_handle: FileHandle,
@@ -122,13 +122,13 @@ pub struct ParquetReaderBuilder {
impl ParquetReaderBuilder { impl ParquetReaderBuilder {
/// Returns a new [ParquetReaderBuilder] to read specific SST. /// Returns a new [ParquetReaderBuilder] to read specific SST.
pub fn new( pub fn new(
file_dir: String, table_dir: String,
path_type: PathType, path_type: PathType,
file_handle: FileHandle, file_handle: FileHandle,
object_store: ObjectStore, object_store: ObjectStore,
) -> ParquetReaderBuilder { ) -> ParquetReaderBuilder {
ParquetReaderBuilder { ParquetReaderBuilder {
file_dir, table_dir,
path_type, path_type,
file_handle, file_handle,
object_store, object_store,
@@ -237,7 +237,7 @@ impl ParquetReaderBuilder {
) -> Result<(FileRangeContext, RowGroupSelection)> { ) -> Result<(FileRangeContext, RowGroupSelection)> {
let start = Instant::now(); let start = Instant::now();
let file_path = self.file_handle.file_path(&self.file_dir, self.path_type); let file_path = self.file_handle.file_path(&self.table_dir, self.path_type);
let file_size = self.file_handle.meta_ref().file_size; let file_size = self.file_handle.meta_ref().file_size;
// Loads parquet metadata of the file. // Loads parquet metadata of the file.
@@ -1227,7 +1227,6 @@ impl ParquetReader {
self.context.read_format().metadata() self.context.read_format().metadata()
} }
#[cfg(test)]
pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> { pub fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
self.context.reader_builder().parquet_meta.clone() self.context.reader_builder().parquet_meta.clone()
} }

View File

@@ -62,7 +62,7 @@ use crate::sst::{
}; };
/// Parquet SST writer. /// Parquet SST writer.
pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> { pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
/// Path provider that creates SST and index file paths according to file id. /// Path provider that creates SST and index file paths according to file id.
path_provider: P, path_provider: P,
writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>, writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
@@ -81,7 +81,7 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
/// Cleaner to remove temp files on failure. /// Cleaner to remove temp files on failure.
file_cleaner: Option<TempFileCleaner>, file_cleaner: Option<TempFileCleaner>,
/// Write metrics /// Write metrics
metrics: Metrics, metrics: &'a mut Metrics,
} }
pub trait WriterFactory { pub trait WriterFactory {
@@ -107,7 +107,7 @@ impl WriterFactory for ObjectStoreWriterFactory {
} }
} }
impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P> impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
where where
P: FilePathProvider, P: FilePathProvider,
I: IndexerBuilder, I: IndexerBuilder,
@@ -118,8 +118,8 @@ where
index_config: IndexConfig, index_config: IndexConfig,
indexer_builder: I, indexer_builder: I,
path_provider: P, path_provider: P,
metrics: Metrics, metrics: &'a mut Metrics,
) -> ParquetWriter<ObjectStoreWriterFactory, I, P> { ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
ParquetWriter::new( ParquetWriter::new(
ObjectStoreWriterFactory { object_store }, ObjectStoreWriterFactory { object_store },
metadata, metadata,
@@ -137,7 +137,7 @@ where
} }
} }
impl<F, I, P> ParquetWriter<F, I, P> impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
where where
F: WriterFactory, F: WriterFactory,
I: IndexerBuilder, I: IndexerBuilder,
@@ -150,8 +150,8 @@ where
index_config: IndexConfig, index_config: IndexConfig,
indexer_builder: I, indexer_builder: I,
path_provider: P, path_provider: P,
metrics: Metrics, metrics: &'a mut Metrics,
) -> ParquetWriter<F, I, P> { ) -> ParquetWriter<'a, F, I, P> {
let init_file = FileId::random(); let init_file = FileId::random();
let indexer = indexer_builder.build(init_file).await; let indexer = indexer_builder.build(init_file).await;
@@ -487,11 +487,6 @@ where
Ok(self.writer.as_mut().unwrap()) Ok(self.writer.as_mut().unwrap())
} }
} }
/// Consumes write and return the collected metrics.
pub fn into_metrics(self) -> Metrics {
self.metrics
}
} }
#[derive(Default)] #[derive(Default)]

View File

@@ -423,7 +423,7 @@ fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
% num_workers % num_workers
} }
async fn write_cache_from_config( pub async fn write_cache_from_config(
config: &MitoConfig, config: &MitoConfig,
puffin_manager_factory: PuffinManagerFactory, puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager, intermediate_manager: IntermediateManager,