From 530265072837f8dee9fd60886028a55f9417a88b Mon Sep 17 00:00:00 2001 From: zyy17 Date: Wed, 10 Sep 2025 16:13:32 -0700 Subject: [PATCH] refactor: sanitize cache config in `DatanodeOptions` and `StandaloneOptions` Signed-off-by: zyy17 --- src/cmd/src/standalone.rs | 32 ++++++++++++ src/datanode/src/config.rs | 26 ++++++++++ src/datanode/src/store.rs | 92 +++++++++++++--------------------- src/object-store/src/config.rs | 24 ++++++--- 4 files changed, 109 insertions(+), 65 deletions(-) diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index 9ee7d6b728..cd469e312f 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -218,6 +218,7 @@ impl Into for StandaloneOptions { } impl StandaloneOptions { + /// Returns the `FrontendOptions` for the standalone instance. pub fn frontend_options(&self) -> FrontendOptions { let cloned_opts = self.clone(); FrontendOptions { @@ -241,6 +242,7 @@ impl StandaloneOptions { } } + /// Returns the `DatanodeOptions` for the standalone instance. pub fn datanode_options(&self) -> DatanodeOptions { let cloned_opts = self.clone(); DatanodeOptions { @@ -256,6 +258,17 @@ impl StandaloneOptions { ..Default::default() } } + + /// Sanitize the `StandaloneOptions` to ensure the config is valid. + pub fn sanitize(&mut self) { + if self.storage.is_object_storage() { + self.storage + .store + .cache_config_mut() + .unwrap() + .sanitize(&self.storage.data_home); + } + } } pub struct Instance { @@ -396,6 +409,7 @@ impl StartCommand { .context(error::LoadLayeredConfigSnafu)?; self.merge_with_cli_options(global_options, &mut opts.component)?; + opts.component.sanitize(); Ok(opts) } @@ -1147,4 +1161,22 @@ mod tests { assert_eq!(options.logging, default_options.logging); assert_eq!(options.region_engine, default_options.region_engine); } + + #[test] + fn test_cache_config() { + let toml_str = r#" + [storage] + data_home = "test_data_home" + type = "S3" + [storage.cache_config] + enable_read_cache = true + "#; + let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap(); + opts.sanitize(); + assert!(opts.storage.store.cache_config().unwrap().enable_read_cache); + assert_eq!( + opts.storage.store.cache_config().unwrap().cache_path, + "test_data_home" + ); + } } diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 1cd43ec951..dc77789db5 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -103,6 +103,14 @@ impl DatanodeOptions { /// Sanitize the `DatanodeOptions` to ensure the config is valid. pub fn sanitize(&mut self) { sanitize_workload_types(&mut self.workload_types); + + if self.storage.is_object_storage() { + self.storage + .store + .cache_config_mut() + .unwrap() + .sanitize(&self.storage.data_home); + } } } @@ -237,4 +245,22 @@ mod tests { _ => panic!("Expected S3 config"), } } + + #[test] + fn test_cache_config() { + let toml_str = r#" + [storage] + data_home = "test_data_home" + type = "S3" + [storage.cache_config] + enable_read_cache = true + "#; + let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap(); + opts.sanitize(); + assert!(opts.storage.store.cache_config().unwrap().enable_read_cache); + assert_eq!( + opts.storage.store.cache_config().unwrap().cache_path, + "test_data_home" + ); + } } diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 1d4bedd3a0..8a64735c94 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -14,10 +14,10 @@ //! object storage utilities -use std::path::Path; use std::sync::Arc; use common_telemetry::info; +use object_store::config::ObjectStorageCacheConfig; use object_store::factory::new_raw_object_store; use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::services::Fs; @@ -28,7 +28,7 @@ use object_store::{ use snafu::prelude::*; use crate::config::ObjectStoreConfig; -use crate::error::{self, CreateDirSnafu, Result}; +use crate::error::{self, Result}; fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { object_store.layer( @@ -66,11 +66,15 @@ pub(crate) async fn new_object_store( .context(error::ObjectStoreSnafu)?; // Enable retry layer and cache layer for non-fs object storages let object_store = if store.is_object_storage() { - let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? { - // Adds cache layer - object_store.layer(cache_layer) - } else { - object_store + let object_store = { + // It's safe to unwrap here because we already checked above. + let cache_config = store.cache_config().unwrap(); + if let Some(cache_layer) = build_cache_layer(cache_config).await? { + // Adds cache layer + object_store.layer(cache_layer) + } else { + object_store + } }; // Adds retry layer @@ -84,65 +88,37 @@ pub(crate) async fn new_object_store( } async fn build_cache_layer( - store_config: &ObjectStoreConfig, - data_home: &str, + cache_config: &ObjectStorageCacheConfig, ) -> Result>> { // No need to build cache layer if read cache is disabled. - if !store_config.enable_read_cache() { + if !cache_config.enable_read_cache { return Ok(None); } - let (name, mut cache_path, cache_capacity) = { - // It's safe to unwrap here because we already checked above. - let cache_config = store_config.cache_config().unwrap(); - ( - store_config.config_name(), - cache_config.cache_path.clone(), - cache_config.cache_capacity, - ) - }; + let atomic_temp_dir = join_dir(&cache_config.cache_path, ATOMIC_WRITE_DIR); + clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?; - // If `cache_path` is unset, default to use `${data_home}` as the local read cache directory. - if cache_path.is_empty() { - let read_cache_path = data_home.to_string(); - tokio::fs::create_dir_all(Path::new(&read_cache_path)) - .await - .context(CreateDirSnafu { - dir: &read_cache_path, - })?; + // Compatible code. Remove this after a major release. + let old_atomic_temp_dir = join_dir(&cache_config.cache_path, OLD_ATOMIC_WRITE_DIR); + clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?; - info!( - "The object storage cache path is not set for '{}', using the default path: '{}'", - name, &read_cache_path - ); + let cache_store = Fs::default() + .root(&cache_config.cache_path) + .atomic_write_dir(&atomic_temp_dir) + .build() + .context(error::BuildCacheStoreSnafu)?; - cache_path = read_cache_path; - } + let cache_layer = LruCacheLayer::new( + Arc::new(cache_store), + cache_config.cache_capacity.0 as usize, + ) + .context(error::BuildCacheStoreSnafu)?; + cache_layer.recover_cache(false).await; - if !cache_path.trim().is_empty() { - let atomic_temp_dir = join_dir(&cache_path, ATOMIC_WRITE_DIR); - clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?; + info!( + "Enabled local object storage cache, path: {}, capacity: {}.", + cache_config.cache_path, cache_config.cache_capacity + ); - // Compatible code. Remove this after a major release. - let old_atomic_temp_dir = join_dir(&cache_path, OLD_ATOMIC_WRITE_DIR); - clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?; - - let cache_store = Fs::default() - .root(&cache_path) - .atomic_write_dir(&atomic_temp_dir) - .build() - .context(error::BuildCacheStoreSnafu)?; - - let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) - .context(error::BuildCacheStoreSnafu)?; - cache_layer.recover_cache(false).await; - info!( - "Enabled local object storage cache, path: {}, capacity: {}.", - cache_path, cache_capacity - ); - - Ok(Some(cache_layer)) - } else { - Ok(None) - } + Ok(Some(cache_layer)) } diff --git a/src/object-store/src/config.rs b/src/object-store/src/config.rs index fb0465afd1..fa16800ee6 100644 --- a/src/object-store/src/config.rs +++ b/src/object-store/src/config.rs @@ -83,14 +83,14 @@ impl ObjectStoreConfig { } } - /// Returns whether to enable read cache. If not set, the read cache will be enabled by default. - pub fn enable_read_cache(&self) -> bool { + /// Returns the mutable object storage cache configuration. + pub fn cache_config_mut(&mut self) -> Option<&mut ObjectStorageCacheConfig> { match self { - Self::File(_) => false, - Self::S3(s3) => s3.cache.enable_read_cache, - Self::Oss(oss) => oss.cache.enable_read_cache, - Self::Azblob(az) => az.cache.enable_read_cache, - Self::Gcs(gcs) => gcs.cache.enable_read_cache, + Self::File(_) => None, + Self::S3(s3) => Some(&mut s3.cache), + Self::Oss(oss) => Some(&mut oss.cache), + Self::Azblob(az) => Some(&mut az.cache), + Self::Gcs(gcs) => Some(&mut gcs.cache), } } } @@ -347,6 +347,16 @@ impl Default for ObjectStorageCacheConfig { } } +impl ObjectStorageCacheConfig { + /// Sanitize the `ObjectStorageCacheConfig` to ensure the config is valid. + pub fn sanitize(&mut self, data_home: &str) { + // If `cache_path` is unset, default to use `${data_home}` as the local read cache directory. + if self.cache_path.is_empty() { + self.cache_path = data_home.to_string(); + } + } +} + #[cfg(test)] mod tests { use super::*;