mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-25 23:49:58 +00:00
refactor!: add enable_read_cache config to support disable read cache explicitly (#6834)
* refactor: add `enable_read_cache` config to support disable read cache explicitly Signed-off-by: zyy17 <zyylsxm@gmail.com> * refactor: if `cache_path` is empty and `enable_read_cache` is true, set the default cache dir Signed-off-by: zyy17 <zyylsxm@gmail.com> * refactor: remove the unessary Option type for `ObjectStorageCacheConfig` Signed-off-by: zyy17 <zyylsxm@gmail.com> * refactor: sanitize cache config in `DatanodeOptions` and `StandaloneOptions` Signed-off-by: zyy17 <zyylsxm@gmail.com> * chore: code review comment Signed-off-by: zyy17 <zyylsxm@gmail.com> * chore: apply code review comments Signed-off-by: zyy17 <zyylsxm@gmail.com> --------- Signed-off-by: zyy17 <zyylsxm@gmail.com>
This commit is contained in:
@@ -103,6 +103,7 @@
|
||||
| `storage` | -- | -- | The data storage options. |
|
||||
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
|
||||
@@ -494,6 +495,7 @@
|
||||
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
|
||||
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
|
||||
|
||||
@@ -274,6 +274,9 @@ type = "File"
|
||||
## @toml2docs:none-default
|
||||
#+ cache_path = ""
|
||||
|
||||
## Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage.
|
||||
#+ enable_read_cache = true
|
||||
|
||||
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
|
||||
## @toml2docs:none-default
|
||||
cache_capacity = "5GiB"
|
||||
|
||||
@@ -361,6 +361,9 @@ data_home = "./greptimedb_data"
|
||||
## - `Oss`: the data is stored in the Aliyun OSS.
|
||||
type = "File"
|
||||
|
||||
## Whether to enable read cache. If not set, the read cache will be enabled by default when using object storage.
|
||||
#+ enable_read_cache = true
|
||||
|
||||
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
|
||||
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
|
||||
## @toml2docs:none-default
|
||||
|
||||
@@ -247,6 +247,7 @@ impl StartCommand {
|
||||
.context(error::LoadLayeredConfigSnafu)?;
|
||||
|
||||
self.merge_with_cli_options(global_options, &mut opts.component)?;
|
||||
opts.component.sanitize();
|
||||
|
||||
Ok(opts)
|
||||
}
|
||||
@@ -866,4 +867,22 @@ mod tests {
|
||||
assert_eq!(options.logging, default_options.logging);
|
||||
assert_eq!(options.region_engine, default_options.region_engine);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cache_config() {
|
||||
let toml_str = r#"
|
||||
[storage]
|
||||
data_home = "test_data_home"
|
||||
type = "S3"
|
||||
[storage.cache_config]
|
||||
enable_read_cache = true
|
||||
"#;
|
||||
let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
|
||||
opts.sanitize();
|
||||
assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
|
||||
assert_eq!(
|
||||
opts.storage.store.cache_config().unwrap().cache_path,
|
||||
"test_data_home"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,8 +33,6 @@ use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
|
||||
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
|
||||
|
||||
/// Storage engine config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
@@ -105,6 +103,14 @@ impl DatanodeOptions {
|
||||
/// Sanitize the `DatanodeOptions` to ensure the config is valid.
|
||||
pub fn sanitize(&mut self) {
|
||||
sanitize_workload_types(&mut self.workload_types);
|
||||
|
||||
if self.storage.is_object_storage() {
|
||||
self.storage
|
||||
.store
|
||||
.cache_config_mut()
|
||||
.unwrap()
|
||||
.sanitize(&self.storage.data_home);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -242,4 +248,22 @@ mod tests {
|
||||
_ => panic!("Expected S3 config"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cache_config() {
|
||||
let toml_str = r#"
|
||||
[storage]
|
||||
data_home = "test_data_home"
|
||||
type = "S3"
|
||||
[storage.cache_config]
|
||||
enable_read_cache = true
|
||||
"#;
|
||||
let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
|
||||
opts.sanitize();
|
||||
assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
|
||||
assert_eq!(
|
||||
opts.storage.store.cache_config().unwrap().cache_path,
|
||||
"test_data_home"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,21 +14,19 @@
|
||||
|
||||
//! object storage utilities
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use object_store::config::ObjectStorageCacheConfig;
|
||||
use object_store::factory::new_raw_object_store;
|
||||
use object_store::layers::LruCacheLayer;
|
||||
use object_store::services::Fs;
|
||||
use object_store::util::{clean_temp_dir, join_dir, with_instrument_layers, with_retry_layers};
|
||||
use object_store::{
|
||||
ATOMIC_WRITE_DIR, Access, OLD_ATOMIC_WRITE_DIR, ObjectStore, ObjectStoreBuilder,
|
||||
};
|
||||
use object_store::{ATOMIC_WRITE_DIR, Access, ObjectStore, ObjectStoreBuilder};
|
||||
use snafu::prelude::*;
|
||||
|
||||
use crate::config::{DEFAULT_OBJECT_STORE_CACHE_SIZE, ObjectStoreConfig};
|
||||
use crate::error::{self, CreateDirSnafu, Result};
|
||||
use crate::config::ObjectStoreConfig;
|
||||
use crate::error::{self, Result};
|
||||
|
||||
pub(crate) async fn new_object_store_without_cache(
|
||||
store: &ObjectStoreConfig,
|
||||
@@ -58,11 +56,15 @@ pub(crate) async fn new_object_store(
|
||||
.context(error::ObjectStoreSnafu)?;
|
||||
// Enable retry layer and cache layer for non-fs object storages
|
||||
let object_store = if store.is_object_storage() {
|
||||
let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
|
||||
let object_store = {
|
||||
// It's safe to unwrap here because we already checked above.
|
||||
let cache_config = store.cache_config().unwrap();
|
||||
if let Some(cache_layer) = build_cache_layer(cache_config).await? {
|
||||
// Adds cache layer
|
||||
object_store.layer(cache_layer)
|
||||
} else {
|
||||
object_store
|
||||
}
|
||||
};
|
||||
|
||||
// Adds retry layer
|
||||
@@ -76,94 +78,33 @@ pub(crate) async fn new_object_store(
|
||||
}
|
||||
|
||||
async fn build_cache_layer(
|
||||
store_config: &ObjectStoreConfig,
|
||||
data_home: &str,
|
||||
cache_config: &ObjectStorageCacheConfig,
|
||||
) -> Result<Option<LruCacheLayer<impl Access>>> {
|
||||
let (name, mut cache_path, cache_capacity) = match store_config {
|
||||
ObjectStoreConfig::S3(s3_config) => {
|
||||
let path = s3_config.cache.cache_path.clone();
|
||||
let name = &s3_config.name;
|
||||
let capacity = s3_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Oss(oss_config) => {
|
||||
let path = oss_config.cache.cache_path.clone();
|
||||
let name = &oss_config.name;
|
||||
let capacity = oss_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Azblob(azblob_config) => {
|
||||
let path = azblob_config.cache.cache_path.clone();
|
||||
let name = &azblob_config.name;
|
||||
let capacity = azblob_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Gcs(gcs_config) => {
|
||||
let path = gcs_config.cache.cache_path.clone();
|
||||
let name = &gcs_config.name;
|
||||
let capacity = gcs_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
_ => unreachable!("Already checked above"),
|
||||
};
|
||||
|
||||
// Enable object cache by default
|
||||
// Set the cache_path to be `${data_home}` by default
|
||||
// if it's not present
|
||||
if cache_path.is_none() {
|
||||
let read_cache_path = data_home.to_string();
|
||||
tokio::fs::create_dir_all(Path::new(&read_cache_path))
|
||||
.await
|
||||
.context(CreateDirSnafu {
|
||||
dir: &read_cache_path,
|
||||
})?;
|
||||
|
||||
info!(
|
||||
"The object storage cache path is not set for '{}', using the default path: '{}'",
|
||||
name, &read_cache_path
|
||||
);
|
||||
|
||||
cache_path = Some(read_cache_path);
|
||||
// No need to build cache layer if read cache is disabled.
|
||||
if !cache_config.enable_read_cache {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if let Some(path) = cache_path.as_ref()
|
||||
&& !path.trim().is_empty()
|
||||
{
|
||||
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
|
||||
let atomic_temp_dir = join_dir(&cache_config.cache_path, ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
|
||||
|
||||
// Compatible code. Remove this after a major release.
|
||||
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
|
||||
|
||||
let cache_store = Fs::default()
|
||||
.root(path)
|
||||
.root(&cache_config.cache_path)
|
||||
.atomic_write_dir(&atomic_temp_dir)
|
||||
.build()
|
||||
.context(error::BuildCacheStoreSnafu)?;
|
||||
|
||||
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
|
||||
let cache_layer = LruCacheLayer::new(
|
||||
Arc::new(cache_store),
|
||||
cache_config.cache_capacity.0 as usize,
|
||||
)
|
||||
.context(error::BuildCacheStoreSnafu)?;
|
||||
cache_layer.recover_cache(false).await;
|
||||
|
||||
info!(
|
||||
"Enabled local object storage cache, path: {}, capacity: {}.",
|
||||
path, cache_capacity
|
||||
cache_config.cache_path, cache_config.cache_capacity
|
||||
);
|
||||
|
||||
Ok(Some(cache_layer))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::util;
|
||||
|
||||
const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
|
||||
|
||||
/// Object storage config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type")]
|
||||
@@ -72,6 +74,28 @@ impl ObjectStoreConfig {
|
||||
|
||||
name
|
||||
}
|
||||
|
||||
/// Returns the object storage cache configuration.
|
||||
pub fn cache_config(&self) -> Option<&ObjectStorageCacheConfig> {
|
||||
match self {
|
||||
Self::File(_) => None,
|
||||
Self::S3(s3) => Some(&s3.cache),
|
||||
Self::Oss(oss) => Some(&oss.cache),
|
||||
Self::Azblob(az) => Some(&az.cache),
|
||||
Self::Gcs(gcs) => Some(&gcs.cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the mutable object storage cache configuration.
|
||||
pub fn cache_config_mut(&mut self) -> Option<&mut ObjectStorageCacheConfig> {
|
||||
match self {
|
||||
Self::File(_) => None,
|
||||
Self::S3(s3) => Some(&mut s3.cache),
|
||||
Self::Oss(oss) => Some(&mut oss.cache),
|
||||
Self::Azblob(az) => Some(&mut az.cache),
|
||||
Self::Gcs(gcs) => Some(&mut gcs.cache),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
|
||||
@@ -279,13 +303,36 @@ impl Default for HttpClientConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
pub struct ObjectStorageCacheConfig {
|
||||
/// Whether to enable read cache. If not set, the read cache will be enabled by default.
|
||||
pub enable_read_cache: bool,
|
||||
/// The local file cache directory
|
||||
pub cache_path: Option<String>,
|
||||
pub cache_path: String,
|
||||
/// The cache capacity in bytes
|
||||
pub cache_capacity: Option<ReadableSize>,
|
||||
pub cache_capacity: ReadableSize,
|
||||
}
|
||||
|
||||
impl Default for ObjectStorageCacheConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enable_read_cache: true,
|
||||
// The cache directory is set to the value of data_home in the build_cache_layer process.
|
||||
cache_path: String::default(),
|
||||
cache_capacity: DEFAULT_OBJECT_STORE_CACHE_SIZE,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ObjectStorageCacheConfig {
|
||||
/// Sanitize the `ObjectStorageCacheConfig` to ensure the config is valid.
|
||||
pub fn sanitize(&mut self, data_home: &str) {
|
||||
// If `cache_path` is unset, default to use `${data_home}` as the local read cache directory.
|
||||
if self.cache_path.is_empty() {
|
||||
self.cache_path = data_home.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -155,4 +155,15 @@ impl StandaloneOptions {
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Sanitize the `StandaloneOptions` to ensure the config is valid.
|
||||
pub fn sanitize(&mut self) {
|
||||
if self.storage.is_object_storage() {
|
||||
self.storage
|
||||
.store
|
||||
.cache_config_mut()
|
||||
.unwrap()
|
||||
.sanitize(&self.storage.data_home);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -212,10 +212,9 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
|
||||
let mut s3_config = s3_test_config();
|
||||
|
||||
if *store_type == StorageType::S3WithCache {
|
||||
s3_config.cache.cache_path = Some("/tmp/greptimedb_cache".to_string());
|
||||
s3_config.cache.cache_path = "/tmp/greptimedb_cache".to_string();
|
||||
} else {
|
||||
// An empty string means disabling.
|
||||
s3_config.cache.cache_path = Some("".to_string());
|
||||
s3_config.cache.enable_read_cache = false;
|
||||
}
|
||||
|
||||
let builder = S3::from(&s3_config.connection);
|
||||
|
||||
@@ -1594,6 +1594,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
|
||||
"max_background_flushes =",
|
||||
"max_background_compactions =",
|
||||
"max_background_purges =",
|
||||
"enable_read_cache =",
|
||||
];
|
||||
|
||||
input
|
||||
|
||||
Reference in New Issue
Block a user