mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-22 22:20:02 +00:00
Compare commits
4 Commits
e0c1566e92
...
refactor/a
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5302650728 | ||
|
|
b14daa7313 | ||
|
|
4c1870332c | ||
|
|
512d2926cb |
@@ -103,6 +103,7 @@
|
||||
| `storage` | -- | -- | The data storage options. |
|
||||
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
|
||||
@@ -494,6 +495,7 @@
|
||||
| `storage.data_home` | String | `./greptimedb_data` | The working home directory. |
|
||||
| `storage.type` | String | `File` | The storage type used to store the data.<br/>- `File`: the data is stored in the local file system.<br/>- `S3`: the data is stored in the S3 object storage.<br/>- `Gcs`: the data is stored in the Google Cloud Storage.<br/>- `Azblob`: the data is stored in the Azure Blob Storage.<br/>- `Oss`: the data is stored in the Aliyun OSS. |
|
||||
| `storage.cache_path` | String | Unset | Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.<br/>A local file directory, defaults to `{data_home}`. An empty string means disabling. |
|
||||
| `storage.enable_read_cache` | Bool | `true` | Whether to enable read cache. If not set, the read cache will be enabled by default. |
|
||||
| `storage.cache_capacity` | String | Unset | The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger. |
|
||||
| `storage.bucket` | String | Unset | The S3 bucket name.<br/>**It's only used when the storage type is `S3`, `Oss` and `Gcs`**. |
|
||||
| `storage.root` | String | Unset | The S3 data will be stored in the specified prefix, for example, `s3://${bucket}/${root}`.<br/>**It's only used when the storage type is `S3`, `Oss` and `Azblob`**. |
|
||||
|
||||
@@ -274,6 +274,9 @@ type = "File"
|
||||
## @toml2docs:none-default
|
||||
#+ cache_path = ""
|
||||
|
||||
## Whether to enable read cache. If not set, the read cache will be enabled by default.
|
||||
enable_read_cache = true
|
||||
|
||||
## The local file cache capacity in bytes. If your disk space is sufficient, it is recommended to set it larger.
|
||||
## @toml2docs:none-default
|
||||
cache_capacity = "5GiB"
|
||||
|
||||
@@ -361,6 +361,9 @@ data_home = "./greptimedb_data"
|
||||
## - `Oss`: the data is stored in the Aliyun OSS.
|
||||
type = "File"
|
||||
|
||||
## Whether to enable read cache. If not set, the read cache will be enabled by default.
|
||||
enable_read_cache = true
|
||||
|
||||
## Read cache configuration for object storage such as 'S3' etc, it's configured by default when using object storage. It is recommended to configure it when using object storage for better performance.
|
||||
## A local file directory, defaults to `{data_home}`. An empty string means disabling.
|
||||
## @toml2docs:none-default
|
||||
|
||||
@@ -218,6 +218,7 @@ impl Into<FrontendOptions> for StandaloneOptions {
|
||||
}
|
||||
|
||||
impl StandaloneOptions {
|
||||
/// Returns the `FrontendOptions` for the standalone instance.
|
||||
pub fn frontend_options(&self) -> FrontendOptions {
|
||||
let cloned_opts = self.clone();
|
||||
FrontendOptions {
|
||||
@@ -241,6 +242,7 @@ impl StandaloneOptions {
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the `DatanodeOptions` for the standalone instance.
|
||||
pub fn datanode_options(&self) -> DatanodeOptions {
|
||||
let cloned_opts = self.clone();
|
||||
DatanodeOptions {
|
||||
@@ -256,6 +258,17 @@ impl StandaloneOptions {
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Sanitize the `StandaloneOptions` to ensure the config is valid.
|
||||
pub fn sanitize(&mut self) {
|
||||
if self.storage.is_object_storage() {
|
||||
self.storage
|
||||
.store
|
||||
.cache_config_mut()
|
||||
.unwrap()
|
||||
.sanitize(&self.storage.data_home);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Instance {
|
||||
@@ -396,6 +409,7 @@ impl StartCommand {
|
||||
.context(error::LoadLayeredConfigSnafu)?;
|
||||
|
||||
self.merge_with_cli_options(global_options, &mut opts.component)?;
|
||||
opts.component.sanitize();
|
||||
|
||||
Ok(opts)
|
||||
}
|
||||
@@ -1147,4 +1161,22 @@ mod tests {
|
||||
assert_eq!(options.logging, default_options.logging);
|
||||
assert_eq!(options.region_engine, default_options.region_engine);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cache_config() {
|
||||
let toml_str = r#"
|
||||
[storage]
|
||||
data_home = "test_data_home"
|
||||
type = "S3"
|
||||
[storage.cache_config]
|
||||
enable_read_cache = true
|
||||
"#;
|
||||
let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
|
||||
opts.sanitize();
|
||||
assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
|
||||
assert_eq!(
|
||||
opts.storage.store.cache_config().unwrap().cache_path,
|
||||
"test_data_home"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,8 +33,6 @@ use servers::grpc::GrpcOptions;
|
||||
use servers::heartbeat_options::HeartbeatOptions;
|
||||
use servers::http::HttpOptions;
|
||||
|
||||
pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
|
||||
|
||||
/// Storage engine config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
@@ -105,6 +103,14 @@ impl DatanodeOptions {
|
||||
/// Sanitize the `DatanodeOptions` to ensure the config is valid.
|
||||
pub fn sanitize(&mut self) {
|
||||
sanitize_workload_types(&mut self.workload_types);
|
||||
|
||||
if self.storage.is_object_storage() {
|
||||
self.storage
|
||||
.store
|
||||
.cache_config_mut()
|
||||
.unwrap()
|
||||
.sanitize(&self.storage.data_home);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -239,4 +245,22 @@ mod tests {
|
||||
_ => panic!("Expected S3 config"),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cache_config() {
|
||||
let toml_str = r#"
|
||||
[storage]
|
||||
data_home = "test_data_home"
|
||||
type = "S3"
|
||||
[storage.cache_config]
|
||||
enable_read_cache = true
|
||||
"#;
|
||||
let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
|
||||
opts.sanitize();
|
||||
assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
|
||||
assert_eq!(
|
||||
opts.storage.store.cache_config().unwrap().cache_path,
|
||||
"test_data_home"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,10 +14,10 @@
|
||||
|
||||
//! object storage utilities
|
||||
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_telemetry::info;
|
||||
use object_store::config::ObjectStorageCacheConfig;
|
||||
use object_store::factory::new_raw_object_store;
|
||||
use object_store::layers::{LruCacheLayer, RetryLayer};
|
||||
use object_store::services::Fs;
|
||||
@@ -27,8 +27,8 @@ use object_store::{
|
||||
};
|
||||
use snafu::prelude::*;
|
||||
|
||||
use crate::config::{DEFAULT_OBJECT_STORE_CACHE_SIZE, ObjectStoreConfig};
|
||||
use crate::error::{self, CreateDirSnafu, Result};
|
||||
use crate::config::ObjectStoreConfig;
|
||||
use crate::error::{self, Result};
|
||||
|
||||
fn with_retry_layers(object_store: ObjectStore) -> ObjectStore {
|
||||
object_store.layer(
|
||||
@@ -66,11 +66,15 @@ pub(crate) async fn new_object_store(
|
||||
.context(error::ObjectStoreSnafu)?;
|
||||
// Enable retry layer and cache layer for non-fs object storages
|
||||
let object_store = if store.is_object_storage() {
|
||||
let object_store = if let Some(cache_layer) = build_cache_layer(&store, data_home).await? {
|
||||
// Adds cache layer
|
||||
object_store.layer(cache_layer)
|
||||
} else {
|
||||
object_store
|
||||
let object_store = {
|
||||
// It's safe to unwrap here because we already checked above.
|
||||
let cache_config = store.cache_config().unwrap();
|
||||
if let Some(cache_layer) = build_cache_layer(cache_config).await? {
|
||||
// Adds cache layer
|
||||
object_store.layer(cache_layer)
|
||||
} else {
|
||||
object_store
|
||||
}
|
||||
};
|
||||
|
||||
// Adds retry layer
|
||||
@@ -84,94 +88,37 @@ pub(crate) async fn new_object_store(
|
||||
}
|
||||
|
||||
async fn build_cache_layer(
|
||||
store_config: &ObjectStoreConfig,
|
||||
data_home: &str,
|
||||
cache_config: &ObjectStorageCacheConfig,
|
||||
) -> Result<Option<LruCacheLayer<impl Access>>> {
|
||||
let (name, mut cache_path, cache_capacity) = match store_config {
|
||||
ObjectStoreConfig::S3(s3_config) => {
|
||||
let path = s3_config.cache.cache_path.clone();
|
||||
let name = &s3_config.name;
|
||||
let capacity = s3_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Oss(oss_config) => {
|
||||
let path = oss_config.cache.cache_path.clone();
|
||||
let name = &oss_config.name;
|
||||
let capacity = oss_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Azblob(azblob_config) => {
|
||||
let path = azblob_config.cache.cache_path.clone();
|
||||
let name = &azblob_config.name;
|
||||
let capacity = azblob_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
ObjectStoreConfig::Gcs(gcs_config) => {
|
||||
let path = gcs_config.cache.cache_path.clone();
|
||||
let name = &gcs_config.name;
|
||||
let capacity = gcs_config
|
||||
.cache
|
||||
.cache_capacity
|
||||
.unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE);
|
||||
(name, path, capacity)
|
||||
}
|
||||
_ => unreachable!("Already checked above"),
|
||||
};
|
||||
|
||||
// Enable object cache by default
|
||||
// Set the cache_path to be `${data_home}` by default
|
||||
// if it's not present
|
||||
if cache_path.is_none() {
|
||||
let read_cache_path = data_home.to_string();
|
||||
tokio::fs::create_dir_all(Path::new(&read_cache_path))
|
||||
.await
|
||||
.context(CreateDirSnafu {
|
||||
dir: &read_cache_path,
|
||||
})?;
|
||||
|
||||
info!(
|
||||
"The object storage cache path is not set for '{}', using the default path: '{}'",
|
||||
name, &read_cache_path
|
||||
);
|
||||
|
||||
cache_path = Some(read_cache_path);
|
||||
// No need to build cache layer if read cache is disabled.
|
||||
if !cache_config.enable_read_cache {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
if let Some(path) = cache_path.as_ref()
|
||||
&& !path.trim().is_empty()
|
||||
{
|
||||
let atomic_temp_dir = join_dir(path, ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
|
||||
let atomic_temp_dir = join_dir(&cache_config.cache_path, ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&atomic_temp_dir).context(error::ObjectStoreSnafu)?;
|
||||
|
||||
// Compatible code. Remove this after a major release.
|
||||
let old_atomic_temp_dir = join_dir(path, OLD_ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
|
||||
// Compatible code. Remove this after a major release.
|
||||
let old_atomic_temp_dir = join_dir(&cache_config.cache_path, OLD_ATOMIC_WRITE_DIR);
|
||||
clean_temp_dir(&old_atomic_temp_dir).context(error::ObjectStoreSnafu)?;
|
||||
|
||||
let cache_store = Fs::default()
|
||||
.root(path)
|
||||
.atomic_write_dir(&atomic_temp_dir)
|
||||
.build()
|
||||
.context(error::BuildCacheStoreSnafu)?;
|
||||
let cache_store = Fs::default()
|
||||
.root(&cache_config.cache_path)
|
||||
.atomic_write_dir(&atomic_temp_dir)
|
||||
.build()
|
||||
.context(error::BuildCacheStoreSnafu)?;
|
||||
|
||||
let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize)
|
||||
.context(error::BuildCacheStoreSnafu)?;
|
||||
cache_layer.recover_cache(false).await;
|
||||
info!(
|
||||
"Enabled local object storage cache, path: {}, capacity: {}.",
|
||||
path, cache_capacity
|
||||
);
|
||||
let cache_layer = LruCacheLayer::new(
|
||||
Arc::new(cache_store),
|
||||
cache_config.cache_capacity.0 as usize,
|
||||
)
|
||||
.context(error::BuildCacheStoreSnafu)?;
|
||||
cache_layer.recover_cache(false).await;
|
||||
|
||||
Ok(Some(cache_layer))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
info!(
|
||||
"Enabled local object storage cache, path: {}, capacity: {}.",
|
||||
cache_config.cache_path, cache_config.cache_capacity
|
||||
);
|
||||
|
||||
Ok(Some(cache_layer))
|
||||
}
|
||||
|
||||
@@ -18,6 +18,8 @@ use common_base::readable_size::ReadableSize;
|
||||
use common_base::secrets::{ExposeSecret, SecretString};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
|
||||
|
||||
/// Object storage config
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type")]
|
||||
@@ -69,6 +71,28 @@ impl ObjectStoreConfig {
|
||||
|
||||
name
|
||||
}
|
||||
|
||||
/// Returns the object storage cache configuration.
|
||||
pub fn cache_config(&self) -> Option<&ObjectStorageCacheConfig> {
|
||||
match self {
|
||||
Self::File(_) => None,
|
||||
Self::S3(s3) => Some(&s3.cache),
|
||||
Self::Oss(oss) => Some(&oss.cache),
|
||||
Self::Azblob(az) => Some(&az.cache),
|
||||
Self::Gcs(gcs) => Some(&gcs.cache),
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the mutable object storage cache configuration.
|
||||
pub fn cache_config_mut(&mut self) -> Option<&mut ObjectStorageCacheConfig> {
|
||||
match self {
|
||||
Self::File(_) => None,
|
||||
Self::S3(s3) => Some(&mut s3.cache),
|
||||
Self::Oss(oss) => Some(&mut oss.cache),
|
||||
Self::Azblob(az) => Some(&mut az.cache),
|
||||
Self::Gcs(gcs) => Some(&mut gcs.cache),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
|
||||
@@ -301,13 +325,36 @@ impl Default for HttpClientConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(default)]
|
||||
pub struct ObjectStorageCacheConfig {
|
||||
/// Whether to enable read cache. If not set, the read cache will be enabled by default.
|
||||
pub enable_read_cache: bool,
|
||||
/// The local file cache directory
|
||||
pub cache_path: Option<String>,
|
||||
pub cache_path: String,
|
||||
/// The cache capacity in bytes
|
||||
pub cache_capacity: Option<ReadableSize>,
|
||||
pub cache_capacity: ReadableSize,
|
||||
}
|
||||
|
||||
impl Default for ObjectStorageCacheConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enable_read_cache: true,
|
||||
// The cache directory is set to the value of data_home in the build_cache_layer process.
|
||||
cache_path: String::default(),
|
||||
cache_capacity: DEFAULT_OBJECT_STORE_CACHE_SIZE,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ObjectStorageCacheConfig {
|
||||
/// Sanitize the `ObjectStorageCacheConfig` to ensure the config is valid.
|
||||
pub fn sanitize(&mut self, data_home: &str) {
|
||||
// If `cache_path` is unset, default to use `${data_home}` as the local read cache directory.
|
||||
if self.cache_path.is_empty() {
|
||||
self.cache_path = data_home.to_string();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
||||
@@ -229,10 +229,10 @@ pub fn get_test_store_config(store_type: &StorageType) -> (ObjectStoreConfig, Te
|
||||
let mut s3_config = s3_test_config();
|
||||
|
||||
if *store_type == StorageType::S3WithCache {
|
||||
s3_config.cache.cache_path = Some("/tmp/greptimedb_cache".to_string());
|
||||
s3_config.cache.cache_path = "/tmp/greptimedb_cache".to_string();
|
||||
} else {
|
||||
// An empty string means disabling.
|
||||
s3_config.cache.cache_path = Some("".to_string());
|
||||
s3_config.cache.cache_path = "".to_string();
|
||||
}
|
||||
|
||||
let mut builder = S3::default()
|
||||
|
||||
Reference in New Issue
Block a user