refactor: sanitize cache config in DatanodeOptions and StandaloneOptions

Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
zyy17
2025-09-10 16:13:32 -07:00
parent b14daa7313
commit 5302650728
4 changed files with 109 additions and 65 deletions

View File

@@ -218,6 +218,7 @@ impl Into<FrontendOptions> for StandaloneOptions {
} }
impl StandaloneOptions { impl StandaloneOptions {
/// Returns the `FrontendOptions` for the standalone instance.
pub fn frontend_options(&self) -> FrontendOptions { pub fn frontend_options(&self) -> FrontendOptions {
let cloned_opts = self.clone(); let cloned_opts = self.clone();
FrontendOptions { FrontendOptions {
@@ -241,6 +242,7 @@ impl StandaloneOptions {
} }
} }
/// Returns the `DatanodeOptions` for the standalone instance.
pub fn datanode_options(&self) -> DatanodeOptions { pub fn datanode_options(&self) -> DatanodeOptions {
let cloned_opts = self.clone(); let cloned_opts = self.clone();
DatanodeOptions { DatanodeOptions {
@@ -256,6 +258,17 @@ impl StandaloneOptions {
..Default::default() ..Default::default()
} }
} }
/// Sanitize the `StandaloneOptions` to ensure the config is valid.
pub fn sanitize(&mut self) {
if self.storage.is_object_storage() {
self.storage
.store
.cache_config_mut()
.unwrap()
.sanitize(&self.storage.data_home);
}
}
} }
pub struct Instance { pub struct Instance {
@@ -396,6 +409,7 @@ impl StartCommand {
.context(error::LoadLayeredConfigSnafu)?; .context(error::LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts.component)?; self.merge_with_cli_options(global_options, &mut opts.component)?;
opts.component.sanitize();
Ok(opts) Ok(opts)
} }
@@ -1147,4 +1161,22 @@ mod tests {
assert_eq!(options.logging, default_options.logging); assert_eq!(options.logging, default_options.logging);
assert_eq!(options.region_engine, default_options.region_engine); assert_eq!(options.region_engine, default_options.region_engine);
} }
#[test]
fn test_cache_config() {
let toml_str = r#"
[storage]
data_home = "test_data_home"
type = "S3"
[storage.cache_config]
enable_read_cache = true
"#;
let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
opts.sanitize();
assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
assert_eq!(
opts.storage.store.cache_config().unwrap().cache_path,
"test_data_home"
);
}
} }

View File

@@ -103,6 +103,14 @@ impl DatanodeOptions {
/// Sanitize the `DatanodeOptions` to ensure the config is valid. /// Sanitize the `DatanodeOptions` to ensure the config is valid.
pub fn sanitize(&mut self) { pub fn sanitize(&mut self) {
sanitize_workload_types(&mut self.workload_types); sanitize_workload_types(&mut self.workload_types);
if self.storage.is_object_storage() {
self.storage
.store
.cache_config_mut()
.unwrap()
.sanitize(&self.storage.data_home);
}
} }
} }
@@ -237,4 +245,22 @@ mod tests {
_ => panic!("Expected S3 config"), _ => panic!("Expected S3 config"),
} }
} }
#[test]
fn test_cache_config() {
let toml_str = r#"
[storage]
data_home = "test_data_home"
type = "S3"
[storage.cache_config]
enable_read_cache = true
"#;
let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
opts.sanitize();
assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
assert_eq!(
opts.storage.store.cache_config().unwrap().cache_path,
"test_data_home"
);
}
} }

View File

@@ -14,10 +14,10 @@
//! object storage utilities //! object storage utilities
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use common_telemetry::info; use common_telemetry::info;
use object_store::config::ObjectStorageCacheConfig;
use object_store::factory::new_raw_object_store; use object_store::factory::new_raw_object_store;
use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::layers::{LruCacheLayer, RetryLayer};
use object_store::services::Fs; use object_store::services::Fs;
@@ -28,7 +28,7 @@ use object_store::{
use snafu::prelude::*; use snafu::prelude::*;
use crate::config::ObjectStoreConfig; use crate::config::ObjectStoreConfig;
use crate::error::{self, CreateDirSnafu, Result}; use crate::error::{self, Result};
fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer( object_store.layer(
@@ -66,11 +66,15 @@ pub(crate) async fn new_object_store(
.context(error::ObjectStoreSnafu)?; .context(error::ObjectStoreSnafu)?;
// Enable retry layer and cache layer for non-fs object storages // Enable retry layer and cache layer for non-fs object storages
let object_store = if store.is_object_storage() { let object_store = if store.is_object_storage() {
let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? { let object_store = {
// Adds cache layer // It's safe to unwrap here because we already checked above.
object_store.layer(cache_layer) let cache_config = store.cache_config().unwrap();
} else { if let Some(cache_layer) = build_cache_layer(cache_config).await? {
object_store // Adds cache layer
object_store.layer(cache_layer)
} else {
object_store
}
}; };
// Adds retry layer // Adds retry layer
@@ -84,65 +88,37 @@ pub(crate) async fn new_object_store(
} }
async fn build_cache_layer( async fn build_cache_layer(
store_config: &ObjectStoreConfig, cache_config: &ObjectStorageCacheConfig,
data_home: &str,
) -> Result<Option<LruCacheLayer<impl Access>>> { ) -> Result<Option<LruCacheLayer<impl Access>>> {
// No need to build cache layer if read cache is disabled. // No need to build cache layer if read cache is disabled.
if !store_config.enable_read_cache() { if !cache_config.enable_read_cache {
return Ok(None); return Ok(None);
} }
let (name, mut cache_path, cache_capacity) = { let atomic_temp_dir = join_dir(&cache_config.cache_path, ATOMIC_WRITE_DIR);
// It's safe to unwrap here because we already checked above. clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
let cache_config = store_config.cache_config().unwrap();
(
store_config.config_name(),
cache_config.cache_path.clone(),
cache_config.cache_capacity,
)
};
// If `cache_path` is unset, default to use `${data_home}` as the local read cache directory. // Compatible code. Remove this after a major release.
if cache_path.is_empty() { let old_atomic_temp_dir = join_dir(&cache_config.cache_path, OLD_ATOMIC_WRITE_DIR);
let read_cache_path = data_home.to_string(); clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
tokio::fs::create_dir_all(Path::new(&read_cache_path))
.await
.context(CreateDirSnafu {
dir: &read_cache_path,
})?;
info!( let cache_store = Fs::default()
"The object storage cache path is not set for '{}', using the default path: '{}'", .root(&cache_config.cache_path)
name, &read_cache_path .atomic_write_dir(&atomic_temp_dir)
); .build()
.context(error::BuildCacheStoreSnafu)?;
cache_path = read_cache_path; let cache_layer = LruCacheLayer::new(
} Arc::new(cache_store),
cache_config.cache_capacity.0 as usize,
)
.context(error::BuildCacheStoreSnafu)?;
cache_layer.recover_cache(false).await;
if !cache_path.trim().is_empty() { info!(
let atomic_temp_dir = join_dir(&cache_path, ATOMIC_WRITE_DIR); "Enabled local object storage cache, path: {}, capacity: {}.",
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?; cache_config.cache_path, cache_config.cache_capacity
);
// Compatible code. Remove this after a major release. Ok(Some(cache_layer))
let old_atomic_temp_dir = join_dir(&cache_path, OLD_ATOMIC_WRITE_DIR);
clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
let cache_store = Fs::default()
.root(&cache_path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::BuildCacheStoreSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
.context(error::BuildCacheStoreSnafu)?;
cache_layer.recover_cache(false).await;
info!(
"Enabled local object storage cache, path: {}, capacity: {}.",
cache_path, cache_capacity
);
Ok(Some(cache_layer))
} else {
Ok(None)
}
} }

View File

@@ -83,14 +83,14 @@ impl ObjectStoreConfig {
} }
} }
/// Returns whether to enable read cache. If not set, the read cache will be enabled by default. /// Returns the mutable object storage cache configuration.
pub fn enable_read_cache(&self) -> bool { pub fn cache_config_mut(&mut self) -> Option<&mut ObjectStorageCacheConfig> {
match self { match self {
Self::File(_) => false, Self::File(_) => None,
Self::S3(s3) => s3.cache.enable_read_cache, Self::S3(s3) => Some(&mut s3.cache),
Self::Oss(oss) => oss.cache.enable_read_cache, Self::Oss(oss) => Some(&mut oss.cache),
Self::Azblob(az) => az.cache.enable_read_cache, Self::Azblob(az) => Some(&mut az.cache),
Self::Gcs(gcs) => gcs.cache.enable_read_cache, Self::Gcs(gcs) => Some(&mut gcs.cache),
} }
} }
} }
@@ -347,6 +347,16 @@ impl Default for ObjectStorageCacheConfig {
} }
} }
impl ObjectStorageCacheConfig {
/// Sanitize the `ObjectStorageCacheConfig` to ensure the config is valid.
pub fn sanitize(&mut self, data_home: &str) {
// If `cache_path` is unset, default to use `${data_home}` as the local read cache directory.
if self.cache_path.is_empty() {
self.cache_path = data_home.to_string();
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;