mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-29 19:30:37 +00:00
refactor: remove the unessary Option type for ObjectStorageCacheConfig
Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
@@ -33,8 +33,6 @@ use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
|
||||
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
|
||||
|
||||
/// Storage engine config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
|
||||
@@ -27,7 +27,7 @@ use object_store::{
|
||||
};
|
||||
use snafu::prelude::*;
|
||||
|
||||
use crate::config::{DEFAULT_OBJECT_STORE_CACHE_SIZE, ObjectStoreConfig};
|
||||
use crate::config::ObjectStoreConfig;
|
||||
use crate::error::{self, CreateDirSnafu, Result};
|
||||
|
||||
fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
|
||||
@@ -87,52 +87,23 @@ async fn build_cache_layer(
|
||||
store_config: &ObjectStoreConfig,
|
||||
data_home: &str,
|
||||
) -> Result<Option<LruCacheLayer<impl Access>>> {
|
||||
// No need to build cache layer if read cache is disabled.
|
||||
if !store_config.enable_read_cache() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let (name, mut cache_path, cache_capacity) = match store_config {
|
||||
ObjectStoreConfig::S3(s3_config) => {
|
||||
let path = s3_config.cache.cache_path.clone();
|
||||
let name = &s3_config.name;
|
||||
let capacity = s3_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Oss(oss_config) => {
|
||||
let path = oss_config.cache.cache_path.clone();
|
||||
let name = &oss_config.name;
|
||||
let capacity = oss_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Azblob(azblob_config) => {
|
||||
let path = azblob_config.cache.cache_path.clone();
|
||||
let name = &azblob_config.name;
|
||||
let capacity = azblob_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Gcs(gcs_config) => {
|
||||
let path = gcs_config.cache.cache_path.clone();
|
||||
let name = &gcs_config.name;
|
||||
let capacity = gcs_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
_ => unreachable!("Already checked above"),
|
||||
let (name, mut cache_path, cache_capacity) = {
|
||||
// It's safe to unwrap here because we already checked above.
|
||||
let cache_config = store_config.cache_config().unwrap();
|
||||
(
|
||||
store_config.config_name(),
|
||||
cache_config.cache_path.clone(),
|
||||
cache_config.cache_capacity,
|
||||
)
|
||||
};
|
||||
|
||||
// If `cache_path` is unset or an empty string, default to use `${data_home}` as the local read cache directory.
|
||||
if cache_path.as_ref().is_none_or(|p| p.is_empty()) {
|
||||
// If `cache_path` is unset, default to use `${data_home}` as the local read cache directory.
|
||||
if cache_path.is_empty() {
|
||||
let read_cache_path = data_home.to_string();
|
||||
tokio::fs::create_dir_all(Path::new(&read_cache_path))
|
||||
.await
|
||||
@@ -145,21 +116,19 @@ async fn build_cache_layer(
|
||||
name, &read_cache_path
|
||||
);
|
||||
|
||||
cache_path = Some(read_cache_path);
|
||||
cache_path = read_cache_path;
|
||||
}
|
||||
|
||||
if let Some(path) = cache_path.as_ref()
|
||||
&& !path.trim().is_empty()
|
||||
{
|
||||
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
|
||||
if !cache_path.trim().is_empty() {
|
||||
let atomic_temp_dir = join_dir(&cache_path, ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
|
||||
|
||||
// Compatible code. Remove this after a major release.
|
||||
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
|
||||
let old_atomic_temp_dir = join_dir(&cache_path, OLD_ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
|
||||
|
||||
let cache_store = Fs::default()
|
||||
.root(path)
|
||||
.root(&cache_path)
|
||||
.atomic_write_dir(&atomic_temp_dir)
|
||||
.build()
|
||||
.context(error::BuildCacheStoreSnafu)?;
|
||||
@@ -169,7 +138,7 @@ async fn build_cache_layer(
|
||||
cache_layer.recover_cache(false).await;
|
||||
info!(
|
||||
"Enabled local object storage cache, path: {}, capacity: {}.",
|
||||
path, cache_capacity
|
||||
cache_path, cache_capacity
|
||||
);
|
||||
|
||||
Ok(Some(cache_layer))
|
||||
|
||||
@@ -18,6 +18,8 @@ use common_base::readable_size::ReadableSize;
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
|
||||
|
||||
/// Object storage config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type")]
|
||||
@@ -70,17 +72,26 @@ impl ObjectStoreConfig {
|
||||
name
|
||||
}
|
||||
|
||||
/// Returns the object storage cache configuration.
|
||||
pub fn cache_config(&self) -> Option<&ObjectStorageCacheConfig> {
|
||||
match self {
|
||||
Self::File(_) => None,
|
||||
Self::S3(s3) => Some(&s3.cache),
|
||||
Self::Oss(oss) => Some(&oss.cache),
|
||||
Self::Azblob(az) => Some(&az.cache),
|
||||
Self::Gcs(gcs) => Some(&gcs.cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether to enable read cache. If not set, the read cache will be enabled by default.
|
||||
pub fn enable_read_cache(&self) -> bool {
|
||||
let enable_read_cache = match self {
|
||||
Self::File(_) => Some(false),
|
||||
match self {
|
||||
Self::File(_) => false,
|
||||
Self::S3(s3) => s3.cache.enable_read_cache,
|
||||
Self::Oss(oss) => oss.cache.enable_read_cache,
|
||||
Self::Azblob(az) => az.cache.enable_read_cache,
|
||||
Self::Gcs(gcs) => gcs.cache.enable_read_cache,
|
||||
};
|
||||
|
||||
enable_read_cache.unwrap_or(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -314,15 +325,26 @@ impl Default for HttpClientConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
pub struct ObjectStorageCacheConfig {
|
||||
/// Whether to enable read cache. If not set, the read cache will be enabled by default.
|
||||
pub enable_read_cache: Option<bool>,
|
||||
pub enable_read_cache: bool,
|
||||
/// The local file cache directory
|
||||
pub cache_path: Option<String>,
|
||||
pub cache_path: String,
|
||||
/// The cache capacity in bytes
|
||||
pub cache_capacity: Option<ReadableSize>,
|
||||
pub cache_capacity: ReadableSize,
|
||||
}
|
||||
|
||||
impl Default for ObjectStorageCacheConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enable_read_cache: true,
|
||||
// The cache directory is set to the value of data_home in the build_cache_layer process.
|
||||
cache_path: String::default(),
|
||||
cache_capacity: DEFAULT_OBJECT_STORE_CACHE_SIZE,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -229,10 +229,10 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
|
||||
let mut s3_config = s3_test_config();
|
||||
|
||||
if *store_type == StorageType::S3WithCache {
|
||||
s3_config.cache.cache_path = Some("/tmp/greptimedb_cache".to_string());
|
||||
s3_config.cache.cache_path = "/tmp/greptimedb_cache".to_string();
|
||||
} else {
|
||||
// An empty string means disabling.
|
||||
s3_config.cache.cache_path = Some("".to_string());
|
||||
s3_config.cache.cache_path = "".to_string();
|
||||
}
|
||||
|
||||
let mut builder = S3::default()
|
||||
|
||||
Reference in New Issue
Block a user