Compare commits

...

4 Commits

8 changed files with 156 additions and 98 deletions

View File

@@ -103,6 +103,7 @@
| `storage` | -- | -- | The data storage options. | | `storage` | -- | -- | The data storage options. |
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. | | `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. | | `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. | | `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. | | `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. | | `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
@@ -494,6 +495,7 @@
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. | | `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. | | `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. | | `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default. |
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. | | `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. | | `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. | | `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |

View File

@@ -274,6 +274,9 @@ type = "File"
## @toml2docs:none-default ## @toml2docs:none-default
#+ cache_path = "" #+ cache_path = ""
## Whether to enable read cache. If not set, the read cache will be enabled by default.
enable_read_cache = true
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. ## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
## @toml2docs:none-default ## @toml2docs:none-default
cache_capacity = "5GiB" cache_capacity = "5GiB"

View File

@@ -361,6 +361,9 @@ data_home = "./greptimedb_data"
## - `Oss`: the data is stored in the Aliyun OSS. ## - `Oss`: the data is stored in the Aliyun OSS.
type = "File" type = "File"
## Whether to enable read cache. If not set, the read cache will be enabled by default.
enable_read_cache = true
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance. ## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
## A local file directory, defaults to `{data_home}`. An empty string means disabling. ## A local file directory, defaults to `{data_home}`. An empty string means disabling.
## @toml2docs:none-default ## @toml2docs:none-default

View File

@@ -218,6 +218,7 @@ impl Into<FrontendOptions> for StandaloneOptions {
} }
impl StandaloneOptions { impl StandaloneOptions {
/// Returns the `FrontendOptions` for the standalone instance.
pub fn frontend_options(&self) -> FrontendOptions { pub fn frontend_options(&self) -> FrontendOptions {
let cloned_opts = self.clone(); let cloned_opts = self.clone();
FrontendOptions { FrontendOptions {
@@ -241,6 +242,7 @@ impl StandaloneOptions {
} }
} }
/// Returns the `DatanodeOptions` for the standalone instance.
pub fn datanode_options(&self) -> DatanodeOptions { pub fn datanode_options(&self) -> DatanodeOptions {
let cloned_opts = self.clone(); let cloned_opts = self.clone();
DatanodeOptions { DatanodeOptions {
@@ -256,6 +258,17 @@ impl StandaloneOptions {
..Default::default() ..Default::default()
} }
} }
/// Sanitize the `StandaloneOptions` to ensure the config is valid.
pub fn sanitize(&mut self) {
if self.storage.is_object_storage() {
self.storage
.store
.cache_config_mut()
.unwrap()
.sanitize(&self.storage.data_home);
}
}
} }
pub struct Instance { pub struct Instance {
@@ -396,6 +409,7 @@ impl StartCommand {
.context(error::LoadLayeredConfigSnafu)?; .context(error::LoadLayeredConfigSnafu)?;
self.merge_with_cli_options(global_options, &mut opts.component)?; self.merge_with_cli_options(global_options, &mut opts.component)?;
opts.component.sanitize();
Ok(opts) Ok(opts)
} }
@@ -1147,4 +1161,22 @@ mod tests {
assert_eq!(options.logging, default_options.logging); assert_eq!(options.logging, default_options.logging);
assert_eq!(options.region_engine, default_options.region_engine); assert_eq!(options.region_engine, default_options.region_engine);
} }
#[test]
fn test_cache_config() {
let toml_str = r#"
[storage]
data_home = "test_data_home"
type = "S3"
[storage.cache_config]
enable_read_cache = true
"#;
let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
opts.sanitize();
assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
assert_eq!(
opts.storage.store.cache_config().unwrap().cache_path,
"test_data_home"
);
}
} }

View File

@@ -33,8 +33,6 @@ use servers::grpc::GrpcOptions;
use servers::heartbeat_options::HeartbeatOptions; use servers::heartbeat_options::HeartbeatOptions;
use servers::http::HttpOptions; use servers::http::HttpOptions;
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Storage engine config /// Storage engine config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)] #[serde(default)]
@@ -105,6 +103,14 @@ impl DatanodeOptions {
/// Sanitize the `DatanodeOptions` to ensure the config is valid. /// Sanitize the `DatanodeOptions` to ensure the config is valid.
pub fn sanitize(&mut self) { pub fn sanitize(&mut self) {
sanitize_workload_types(&mut self.workload_types); sanitize_workload_types(&mut self.workload_types);
if self.storage.is_object_storage() {
self.storage
.store
.cache_config_mut()
.unwrap()
.sanitize(&self.storage.data_home);
}
} }
} }
@@ -239,4 +245,22 @@ mod tests {
_ => panic!("Expected S3 config"), _ => panic!("Expected S3 config"),
} }
} }
#[test]
fn test_cache_config() {
let toml_str = r#"
[storage]
data_home = "test_data_home"
type = "S3"
[storage.cache_config]
enable_read_cache = true
"#;
let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
opts.sanitize();
assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
assert_eq!(
opts.storage.store.cache_config().unwrap().cache_path,
"test_data_home"
);
}
} }

View File

@@ -14,10 +14,10 @@
//! object storage utilities //! object storage utilities
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use common_telemetry::info; use common_telemetry::info;
use object_store::config::ObjectStorageCacheConfig;
use object_store::factory::new_raw_object_store; use object_store::factory::new_raw_object_store;
use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::layers::{LruCacheLayer, RetryLayer};
use object_store::services::Fs; use object_store::services::Fs;
@@ -27,8 +27,8 @@ use object_store::{
}; };
use snafu::prelude::*; use snafu::prelude::*;
use crate::config::{DEFAULT_OBJECT_STORE_CACHE_SIZE, ObjectStoreConfig}; use crate::config::ObjectStoreConfig;
use crate::error::{self, CreateDirSnafu, Result}; use crate::error::{self, Result};
fn with_retry_layers(object_store: ObjectStore) -> ObjectStore { fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
object_store.layer( object_store.layer(
@@ -66,11 +66,15 @@ pub(crate) async fn new_object_store(
.context(error::ObjectStoreSnafu)?; .context(error::ObjectStoreSnafu)?;
// Enable retry layer and cache layer for non-fs object storages // Enable retry layer and cache layer for non-fs object storages
let object_store = if store.is_object_storage() { let object_store = if store.is_object_storage() {
let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? { let object_store = {
// Adds cache layer // It's safe to unwrap here because we already checked above.
object_store.layer(cache_layer) let cache_config = store.cache_config().unwrap();
} else { if let Some(cache_layer) = build_cache_layer(cache_config).await? {
object_store // Adds cache layer
object_store.layer(cache_layer)
} else {
object_store
}
}; };
// Adds retry layer // Adds retry layer
@@ -84,94 +88,37 @@ pub(crate) async fn new_object_store(
} }
async fn build_cache_layer( async fn build_cache_layer(
store_config: &ObjectStoreConfig, cache_config: &ObjectStorageCacheConfig,
data_home: &str,
) -> Result<Option<LruCacheLayer<impl Access>>> { ) -> Result<Option<LruCacheLayer<impl Access>>> {
let (name, mut cache_path, cache_capacity) = match store_config { // No need to build cache layer if read cache is disabled.
ObjectStoreConfig::S3(s3_config) => { if !cache_config.enable_read_cache {
let path = s3_config.cache.cache_path.clone(); return Ok(None);
let name = &s3_config.name;
let capacity = s3_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(name, path, capacity)
}
ObjectStoreConfig::Oss(oss_config) => {
let path = oss_config.cache.cache_path.clone();
let name = &oss_config.name;
let capacity = oss_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(name, path, capacity)
}
ObjectStoreConfig::Azblob(azblob_config) => {
let path = azblob_config.cache.cache_path.clone();
let name = &azblob_config.name;
let capacity = azblob_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(name, path, capacity)
}
ObjectStoreConfig::Gcs(gcs_config) => {
let path = gcs_config.cache.cache_path.clone();
let name = &gcs_config.name;
let capacity = gcs_config
.cache
.cache_capacity
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
(name, path, capacity)
}
_ => unreachable!("Already checked above"),
};
// Enable object cache by default
// Set the cache_path to be `${data_home}` by default
// if it's not present
if cache_path.is_none() {
let read_cache_path = data_home.to_string();
tokio::fs::create_dir_all(Path::new(&read_cache_path))
.await
.context(CreateDirSnafu {
dir: &read_cache_path,
})?;
info!(
"The object storage cache path is not set for '{}', using the default path: '{}'",
name, &read_cache_path
);
cache_path = Some(read_cache_path);
} }
if let Some(path) = cache_path.as_ref() let atomic_temp_dir = join_dir(&cache_config.cache_path, ATOMIC_WRITE_DIR);
&& !path.trim().is_empty() clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
{
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
// Compatible code. Remove this after a major release. // Compatible code. Remove this after a major release.
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR); let old_atomic_temp_dir = join_dir(&cache_config.cache_path, OLD_ATOMIC_WRITE_DIR);
clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?; clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
let cache_store = Fs::default() let cache_store = Fs::default()
.root(path) .root(&cache_config.cache_path)
.atomic_write_dir(&atomic_temp_dir) .atomic_write_dir(&atomic_temp_dir)
.build() .build()
.context(error::BuildCacheStoreSnafu)?; .context(error::BuildCacheStoreSnafu)?;
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) let cache_layer = LruCacheLayer::new(
.context(error::BuildCacheStoreSnafu)?; Arc::new(cache_store),
cache_layer.recover_cache(false).await; cache_config.cache_capacity.0 as usize,
info!( )
"Enabled local object storage cache, path: {}, capacity: {}.", .context(error::BuildCacheStoreSnafu)?;
path, cache_capacity cache_layer.recover_cache(false).await;
);
Ok(Some(cache_layer)) info!(
} else { "Enabled local object storage cache, path: {}, capacity: {}.",
Ok(None) cache_config.cache_path, cache_config.cache_capacity
} );
Ok(Some(cache_layer))
} }

View File

@@ -18,6 +18,8 @@ use common_base::readable_size::ReadableSize;
use common_base::secrets::{ExposeSecret, SecretString}; use common_base::secrets::{ExposeSecret, SecretString};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
/// Object storage config /// Object storage config
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")] #[serde(tag = "type")]
@@ -69,6 +71,28 @@ impl ObjectStoreConfig {
name name
} }
/// Returns the object storage cache configuration.
pub fn cache_config(&self) -> Option<&ObjectStorageCacheConfig> {
match self {
Self::File(_) => None,
Self::S3(s3) => Some(&s3.cache),
Self::Oss(oss) => Some(&oss.cache),
Self::Azblob(az) => Some(&az.cache),
Self::Gcs(gcs) => Some(&gcs.cache),
}
}
/// Returns the mutable object storage cache configuration.
pub fn cache_config_mut(&mut self) -> Option<&mut ObjectStorageCacheConfig> {
match self {
Self::File(_) => None,
Self::S3(s3) => Some(&mut s3.cache),
Self::Oss(oss) => Some(&mut oss.cache),
Self::Azblob(az) => Some(&mut az.cache),
Self::Gcs(gcs) => Some(&mut gcs.cache),
}
}
} }
#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)] #[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
@@ -301,13 +325,36 @@ impl Default for HttpClientConfig {
} }
} }
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(default)] #[serde(default)]
pub struct ObjectStorageCacheConfig { pub struct ObjectStorageCacheConfig {
/// Whether to enable read cache. If not set, the read cache will be enabled by default.
pub enable_read_cache: bool,
/// The local file cache directory /// The local file cache directory
pub cache_path: Option<String>, pub cache_path: String,
/// The cache capacity in bytes /// The cache capacity in bytes
pub cache_capacity: Option<ReadableSize>, pub cache_capacity: ReadableSize,
}
impl Default for ObjectStorageCacheConfig {
fn default() -> Self {
Self {
enable_read_cache: true,
// The cache directory is set to the value of data_home in the build_cache_layer process.
cache_path: String::default(),
cache_capacity: DEFAULT_OBJECT_STORE_CACHE_SIZE,
}
}
}
impl ObjectStorageCacheConfig {
/// Sanitize the `ObjectStorageCacheConfig` to ensure the config is valid.
pub fn sanitize(&mut self, data_home: &str) {
// If `cache_path` is unset, default to use `${data_home}` as the local read cache directory.
if self.cache_path.is_empty() {
self.cache_path = data_home.to_string();
}
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -229,10 +229,10 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
let mut s3_config = s3_test_config(); let mut s3_config = s3_test_config();
if *store_type == StorageType::S3WithCache { if *store_type == StorageType::S3WithCache {
s3_config.cache.cache_path = Some("/tmp/greptimedb_cache".to_string()); s3_config.cache.cache_path = "/tmp/greptimedb_cache".to_string();
} else { } else {
// An empty string means disabling. // An empty string means disabling.
s3_config.cache.cache_path = Some("".to_string()); s3_config.cache.cache_path = "".to_string();
} }
let mut builder = S3::default() let mut builder = S3::default()