diff --git a/src/datanode/src/config.rs b/src/datanode/src/config.rs index 9829c1a982..1cd43ec951 100644 --- a/src/datanode/src/config.rs +++ b/src/datanode/src/config.rs @@ -33,8 +33,6 @@ use servers::grpc::GrpcOptions; use servers::heartbeat_options::HeartbeatOptions; use servers::http::HttpOptions; -pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5); - /// Storage engine config #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(default)] diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 491408836e..1d4bedd3a0 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -27,7 +27,7 @@ use object_store::{ }; use snafu::prelude::*; -use crate::config::{DEFAULT_OBJECT_STORE_CACHE_SIZE, ObjectStoreConfig}; +use crate::config::ObjectStoreConfig; use crate::error::{self, CreateDirSnafu, Result}; fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { @@ -87,52 +87,23 @@ async fn build_cache_layer( store_config: &ObjectStoreConfig, data_home: &str, ) -> Result>> { + // No need to build cache layer if read cache is disabled. if !store_config.enable_read_cache() { return Ok(None); } - let (name, mut cache_path, cache_capacity) = match store_config { - ObjectStoreConfig::S3(s3_config) => { - let path = s3_config.cache.cache_path.clone(); - let name = &s3_config.name; - let capacity = s3_config - .cache - .cache_capacity - .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); - (name, path, capacity) - } - ObjectStoreConfig::Oss(oss_config) => { - let path = oss_config.cache.cache_path.clone(); - let name = &oss_config.name; - let capacity = oss_config - .cache - .cache_capacity - .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); - (name, path, capacity) - } - ObjectStoreConfig::Azblob(azblob_config) => { - let path = azblob_config.cache.cache_path.clone(); - let name = &azblob_config.name; - let capacity = azblob_config - .cache - .cache_capacity - .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); - (name, path, capacity) - } - ObjectStoreConfig::Gcs(gcs_config) => { - let path = gcs_config.cache.cache_path.clone(); - let name = &gcs_config.name; - let capacity = gcs_config - .cache - .cache_capacity - .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); - (name, path, capacity) - } - _ => unreachable!("Already checked above"), + let (name, mut cache_path, cache_capacity) = { + // It's safe to unwrap here because we already checked above. + let cache_config = store_config.cache_config().unwrap(); + ( + store_config.config_name(), + cache_config.cache_path.clone(), + cache_config.cache_capacity, + ) }; - // If `cache_path` is unset or an empty string, default to use `${data_home}` as the local read cache directory. - if cache_path.as_ref().is_none_or(|p| p.is_empty()) { + // If `cache_path` is unset, default to use `${data_home}` as the local read cache directory. + if cache_path.is_empty() { let read_cache_path = data_home.to_string(); tokio::fs::create_dir_all(Path::new(&read_cache_path)) .await @@ -145,21 +116,19 @@ async fn build_cache_layer( name, &read_cache_path ); - cache_path = Some(read_cache_path); + cache_path = read_cache_path; } - if let Some(path) = cache_path.as_ref() - && !path.trim().is_empty() - { - let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR); + if !cache_path.trim().is_empty() { + let atomic_temp_dir = join_dir(&cache_path, ATOMIC_WRITE_DIR); clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?; // Compatible code. Remove this after a major release. - let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR); + let old_atomic_temp_dir = join_dir(&cache_path, OLD_ATOMIC_WRITE_DIR); clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?; let cache_store = Fs::default() - .root(path) + .root(&cache_path) .atomic_write_dir(&atomic_temp_dir) .build() .context(error::BuildCacheStoreSnafu)?; @@ -169,7 +138,7 @@ async fn build_cache_layer( cache_layer.recover_cache(false).await; info!( "Enabled local object storage cache, path: {}, capacity: {}.", - path, cache_capacity + cache_path, cache_capacity ); Ok(Some(cache_layer)) diff --git a/src/object-store/src/config.rs b/src/object-store/src/config.rs index 893c98aa4c..fb0465afd1 100644 --- a/src/object-store/src/config.rs +++ b/src/object-store/src/config.rs @@ -18,6 +18,8 @@ use common_base::readable_size::ReadableSize; use common_base::secrets::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; +const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5); + /// Object storage config #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(tag = "type")] @@ -70,17 +72,26 @@ impl ObjectStoreConfig { name } + /// Returns the object storage cache configuration. + pub fn cache_config(&self) -> Option<&ObjectStorageCacheConfig> { + match self { + Self::File(_) => None, + Self::S3(s3) => Some(&s3.cache), + Self::Oss(oss) => Some(&oss.cache), + Self::Azblob(az) => Some(&az.cache), + Self::Gcs(gcs) => Some(&gcs.cache), + } + } + /// Returns whether to enable read cache. If not set, the read cache will be enabled by default. pub fn enable_read_cache(&self) -> bool { - let enable_read_cache = match self { - Self::File(_) => Some(false), + match self { + Self::File(_) => false, Self::S3(s3) => s3.cache.enable_read_cache, Self::Oss(oss) => oss.cache.enable_read_cache, Self::Azblob(az) => az.cache.enable_read_cache, Self::Gcs(gcs) => gcs.cache.enable_read_cache, - }; - - enable_read_cache.unwrap_or(true) + } } } @@ -314,15 +325,26 @@ impl Default for HttpClientConfig { } } -#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(default)] pub struct ObjectStorageCacheConfig { /// Whether to enable read cache. If not set, the read cache will be enabled by default. - pub enable_read_cache: Option, + pub enable_read_cache: bool, /// The local file cache directory - pub cache_path: Option, + pub cache_path: String, /// The cache capacity in bytes - pub cache_capacity: Option, + pub cache_capacity: ReadableSize, +} + +impl Default for ObjectStorageCacheConfig { + fn default() -> Self { + Self { + enable_read_cache: true, + // The cache directory is set to the value of data_home in the build_cache_layer process. + cache_path: String::default(), + cache_capacity: DEFAULT_OBJECT_STORE_CACHE_SIZE, + } + } } #[cfg(test)] diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index b483be1c43..016e871525 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -229,10 +229,10 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te let mut s3_config = s3_test_config(); if *store_type == StorageType::S3WithCache { - s3_config.cache.cache_path = Some("/tmp/greptimedb_cache".to_string()); + s3_config.cache.cache_path = "/tmp/greptimedb_cache".to_string(); } else { // An empty string means disabling. - s3_config.cache.cache_path = Some("".to_string()); + s3_config.cache.cache_path = "".to_string(); } let mut builder = S3::default()