From 8bd4a36136b995e74c29660cfbaaf9d4882766ad Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 9 Jan 2024 12:40:22 +0800 Subject: [PATCH] feat(mito): Init the write cache in datanode (#3100) * feat: add builder to build cache manager * refactor: make MitoEngine::new async * refactor: refactor object store creation * refactor: add helper fn to attaches layers * feat: fn to build fs store * feat: add write cache to engine * feat: config write cache * style: fix clippy * test: fix test * feat: add warning * chore: add experimental prefix to configs * test: fix config test * test: test weighted size * feat: add switch to enable write cache * fix: update cache stats by using get * style: use then --- src/datanode/src/datanode.rs | 27 +++- src/datanode/src/error.rs | 7 + src/datanode/src/store.rs | 26 ++-- src/datanode/src/store/azblob.rs | 4 +- src/datanode/src/store/fs.rs | 7 +- src/datanode/src/store/gcs.rs | 4 +- src/datanode/src/store/oss.rs | 4 +- src/datanode/src/store/s3.rs | 4 +- src/mito2/src/access_layer.rs | 32 ++++- src/mito2/src/cache.rs | 156 +++++++++++++--------- src/mito2/src/cache/file_cache.rs | 31 +++-- src/mito2/src/cache/write_cache.rs | 28 ++-- src/mito2/src/config.rs | 27 +++- src/mito2/src/engine.rs | 35 ++--- src/mito2/src/error.rs | 18 ++- src/mito2/src/flush.rs | 2 +- src/mito2/src/read/projection.rs | 3 +- src/mito2/src/sst/parquet.rs | 7 +- src/mito2/src/test_util.rs | 14 ++ src/mito2/src/test_util/scheduler_util.rs | 6 +- src/mito2/src/worker.rs | 65 ++++++--- src/object-store/src/util.rs | 19 +++ tests-integration/tests/http.rs | 3 + 23 files changed, 362 insertions(+), 167 deletions(-) diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 69a59d0acc..3f0990cb49 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -42,7 +42,7 @@ use metric_engine::engine::MetricEngine; use mito2::config::MitoConfig; use mito2::engine::MitoEngine; use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef}; -use object_store::util::normalize_dir; +use object_store::util::{join_dir, normalize_dir}; use query::QueryEngineFactory; use servers::export_metrics::ExportMetricsTask; use servers::grpc::{GrpcServer, GrpcServerConfig}; @@ -60,9 +60,9 @@ use tokio::sync::Notify; use crate::config::{DatanodeOptions, RegionEngineConfig}; use crate::error::{ - CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, - ParseAddrSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, ShutdownServerSnafu, - StartServerSnafu, + BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, + MissingNodeIdSnafu, OpenLogStoreSnafu, ParseAddrSnafu, Result, RuntimeResourceSnafu, + ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu, }; use crate::event_listener::{ new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef, @@ -458,20 +458,33 @@ impl DatanodeBuilder { async fn build_mito_engine( opts: &DatanodeOptions, object_store_manager: ObjectStoreManagerRef, - config: MitoConfig, + mut config: MitoConfig, ) -> Result { + // Sets write cache path if it is empty. + if config.experimental_write_cache_path.is_empty() { + config.experimental_write_cache_path = join_dir(&opts.storage.data_home, "write_cache"); + info!( + "Sets write cache path to {}", + config.experimental_write_cache_path + ); + } + let mito_engine = match &opts.wal { WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new( config, Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config) .await?, object_store_manager, - ), + ) + .await + .context(BuildMitoEngineSnafu)?, WalConfig::Kafka(kafka_config) => MitoEngine::new( config, Self::build_kafka_log_store(kafka_config).await?, object_store_manager, - ), + ) + .await + .context(BuildMitoEngineSnafu)?, }; Ok(mito_engine) } diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 94724ffb95..d7873812cf 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -282,6 +282,12 @@ pub enum Error { source: metric_engine::error::Error, location: Location, }, + + #[snafu(display("Failed to build mito engine"))] + BuildMitoEngine { + source: mito2::error::Error, + location: Location, + }, } pub type Result = std::result::Result; @@ -352,6 +358,7 @@ impl ErrorExt for Error { StopRegionEngine { source, .. } => source.status_code(), FindLogicalRegions { source, .. } => source.status_code(), + BuildMitoEngine { source, .. } => source.status_code(), } } diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 6b6eac2a09..e748aa5a21 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -26,10 +26,10 @@ use std::{env, path}; use common_base::readable_size::ReadableSize; use common_telemetry::logging::info; -use object_store::layers::{LoggingLayer, LruCacheLayer, RetryLayer, TracingLayer}; -use object_store::services::Fs as FsBuilder; -use object_store::util::normalize_dir; -use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder}; +use object_store::layers::{LruCacheLayer, RetryLayer}; +use object_store::services::Fs; +use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; +use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; @@ -60,16 +60,7 @@ pub(crate) async fn new_object_store( object_store }; - let store = object_store - .layer( - LoggingLayer::default() - // Print the expected error only in DEBUG level. - // See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level - .with_error_level(Some("debug")) - .expect("input error level must be valid"), - ) - .layer(TracingLayer) - .layer(object_store::layers::PrometheusMetricsLayer); + let store = with_instrument_layers(object_store); Ok(store) } @@ -114,11 +105,10 @@ async fn create_object_store_with_cache( }; if let Some(path) = cache_path { - let path = util::normalize_dir(path); - let atomic_temp_dir = format!("{path}.tmp/"); + let atomic_temp_dir = join_dir(path, ".tmp/"); clean_temp_dir(&atomic_temp_dir)?; - let cache_store = FsBuilder::default() - .root(&path) + let cache_store = Fs::default() + .root(path) .atomic_write_dir(&atomic_temp_dir) .build() .context(error::InitBackendSnafu)?; diff --git a/src/datanode/src/store/azblob.rs b/src/datanode/src/store/azblob.rs index 53ea66a83d..dedd473a72 100644 --- a/src/datanode/src/store/azblob.rs +++ b/src/datanode/src/store/azblob.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_telemetry::logging::info; -use object_store::services::Azblob as AzureBuilder; +use object_store::services::Azblob; use object_store::{util, ObjectStore}; use secrecy::ExposeSecret; use snafu::prelude::*; @@ -30,7 +30,7 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res azblob_config.container, &root ); - let mut builder = AzureBuilder::default(); + let mut builder = Azblob::default(); let _ = builder .root(&root) .container(&azblob_config.container) diff --git a/src/datanode/src/store/fs.rs b/src/datanode/src/store/fs.rs index 51f1c79a83..e3a8513c6d 100644 --- a/src/datanode/src/store/fs.rs +++ b/src/datanode/src/store/fs.rs @@ -15,7 +15,8 @@ use std::{fs, path}; use common_telemetry::logging::info; -use object_store::services::Fs as FsBuilder; +use object_store::services::Fs; +use object_store::util::join_dir; use object_store::ObjectStore; use snafu::prelude::*; @@ -31,10 +32,10 @@ pub(crate) async fn new_fs_object_store( .context(error::CreateDirSnafu { dir: data_home })?; info!("The file storage home is: {}", data_home); - let atomic_write_dir = format!("{data_home}.tmp/"); + let atomic_write_dir = join_dir(data_home, ".tmp/"); store::clean_temp_dir(&atomic_write_dir)?; - let mut builder = FsBuilder::default(); + let mut builder = Fs::default(); let _ = builder.root(data_home).atomic_write_dir(&atomic_write_dir); let object_store = ObjectStore::new(builder) diff --git a/src/datanode/src/store/gcs.rs b/src/datanode/src/store/gcs.rs index 57e4a68c6d..1bf0919b3c 100644 --- a/src/datanode/src/store/gcs.rs +++ b/src/datanode/src/store/gcs.rs @@ -13,7 +13,7 @@ // limitations under the License. use common_telemetry::logging::info; -use object_store::services::Gcs as GCSBuilder; +use object_store::services::Gcs; use object_store::{util, ObjectStore}; use secrecy::ExposeSecret; use snafu::prelude::*; @@ -29,7 +29,7 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result Result Result, } + +/// Creates a fs object store with atomic write dir. +pub(crate) async fn new_fs_object_store(root: &str) -> Result { + let atomic_write_dir = join_dir(root, ".tmp/"); + clean_dir(&atomic_write_dir).await?; + + let mut builder = Fs::default(); + builder.root(root).atomic_write_dir(&atomic_write_dir); + let object_store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish(); + + // Add layers. + let object_store = with_instrument_layers(object_store); + Ok(object_store) +} + +/// Clean the directory. +async fn clean_dir(dir: &str) -> Result<()> { + if tokio::fs::try_exists(dir) + .await + .context(CleanDirSnafu { dir })? + { + tokio::fs::remove_dir_all(dir) + .await + .context(CleanDirSnafu { dir })?; + } + + Ok(()) +} diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 62c91b2b41..9372d57b3a 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -47,9 +47,10 @@ const PAGE_TYPE: &str = "page"; // Metrics type key for files on the local store. const FILE_TYPE: &str = "file"; -// TODO(yingwen): Builder for cache manager. - /// Manages cached data for the engine. +/// +/// All caches are disabled by default. +#[derive(Default)] pub struct CacheManager { /// Cache for SST metadata. sst_meta_cache: Option, @@ -58,70 +59,15 @@ pub struct CacheManager { /// Cache for SST pages. page_cache: Option, /// A Cache for writing files to object stores. - // TODO(yingwen): Remove this once the cache is ready. - #[allow(unused)] write_cache: Option, } pub type CacheManagerRef = Arc; impl CacheManager { - /// Creates a new manager with specific cache size in bytes. - pub fn new( - sst_meta_cache_size: u64, - vector_cache_size: u64, - page_cache_size: u64, - ) -> CacheManager { - let sst_meta_cache = if sst_meta_cache_size == 0 { - None - } else { - let cache = Cache::builder() - .max_capacity(sst_meta_cache_size) - .weigher(meta_cache_weight) - .eviction_listener(|k, v, _cause| { - let size = meta_cache_weight(&k, &v); - CACHE_BYTES - .with_label_values(&[SST_META_TYPE]) - .sub(size.into()); - }) - .build(); - Some(cache) - }; - let vector_cache = if vector_cache_size == 0 { - None - } else { - let cache = Cache::builder() - .max_capacity(vector_cache_size) - .weigher(vector_cache_weight) - .eviction_listener(|k, v, _cause| { - let size = vector_cache_weight(&k, &v); - CACHE_BYTES - .with_label_values(&[VECTOR_TYPE]) - .sub(size.into()); - }) - .build(); - Some(cache) - }; - let page_cache = if page_cache_size == 0 { - None - } else { - let cache = Cache::builder() - .max_capacity(page_cache_size) - .weigher(page_cache_weight) - .eviction_listener(|k, v, _cause| { - let size = page_cache_weight(&k, &v); - CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); - }) - .build(); - Some(cache) - }; - - CacheManager { - sst_meta_cache, - vector_cache, - page_cache, - write_cache: None, - } + /// Returns a builder to build the cache. + pub fn builder() -> CacheManagerBuilder { + CacheManagerBuilder::default() } /// Gets cached [ParquetMetaData]. @@ -201,6 +147,86 @@ impl CacheManager { } } +/// Builder to construct a [CacheManager]. +#[derive(Default)] +pub struct CacheManagerBuilder { + sst_meta_cache_size: u64, + vector_cache_size: u64, + page_cache_size: u64, + write_cache: Option, +} + +impl CacheManagerBuilder { + /// Sets meta cache size. + pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self { + self.sst_meta_cache_size = bytes; + self + } + + /// Sets vector cache size. + pub fn vector_cache_size(mut self, bytes: u64) -> Self { + self.vector_cache_size = bytes; + self + } + + /// Sets page cache size. + pub fn page_cache_size(mut self, bytes: u64) -> Self { + self.page_cache_size = bytes; + self + } + + /// Sets write cache. + pub fn write_cache(mut self, cache: Option) -> Self { + self.write_cache = cache; + self + } + + /// Builds the [CacheManager]. + pub fn build(self) -> CacheManager { + let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| { + Cache::builder() + .max_capacity(self.sst_meta_cache_size) + .weigher(meta_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = meta_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[SST_META_TYPE]) + .sub(size.into()); + }) + .build() + }); + let vector_cache = (self.vector_cache_size != 0).then(|| { + Cache::builder() + .max_capacity(self.vector_cache_size) + .weigher(vector_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = vector_cache_weight(&k, &v); + CACHE_BYTES + .with_label_values(&[VECTOR_TYPE]) + .sub(size.into()); + }) + .build() + }); + let page_cache = (self.page_cache_size != 0).then(|| { + Cache::builder() + .max_capacity(self.page_cache_size) + .weigher(page_cache_weight) + .eviction_listener(|k, v, _cause| { + let size = page_cache_weight(&k, &v); + CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into()); + }) + .build() + }); + + CacheManager { + sst_meta_cache, + vector_cache, + page_cache, + write_cache: self.write_cache, + } + } +} + fn meta_cache_weight(k: &SstMetaKey, v: &Arc) -> u32 { // We ignore the size of `Arc`. (k.estimated_size() + parquet_meta_size(v)) as u32 @@ -293,7 +319,7 @@ mod tests { #[test] fn test_disable_cache() { - let cache = CacheManager::new(0, 0, 0); + let cache = CacheManager::default(); assert!(cache.sst_meta_cache.is_none()); assert!(cache.vector_cache.is_none()); assert!(cache.page_cache.is_none()); @@ -318,11 +344,13 @@ mod tests { let pages = Arc::new(PageValue::new(Vec::new())); cache.put_pages(key.clone(), pages); assert!(cache.get_pages(&key).is_none()); + + assert!(cache.write_cache().is_none()); } #[test] fn test_parquet_meta_cache() { - let cache = CacheManager::new(2000, 0, 0); + let cache = CacheManager::builder().sst_meta_cache_size(2000).build(); let region_id = RegionId::new(1, 1); let file_id = FileId::random(); assert!(cache.get_parquet_meta_data(region_id, file_id).is_none()); @@ -335,7 +363,7 @@ mod tests { #[test] fn test_repeated_vector_cache() { - let cache = CacheManager::new(0, 4096, 0); + let cache = CacheManager::builder().vector_cache_size(4096).build(); let value = Value::Int64(10); assert!(cache.get_repeated_vector(&value).is_none()); let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10])); @@ -346,7 +374,7 @@ mod tests { #[test] fn test_page_cache() { - let cache = CacheManager::new(0, 0, 1000); + let cache = CacheManager::builder().page_cache_size(1000).build(); let region_id = RegionId::new(1, 1); let file_id = FileId::random(); let key = PageKey { diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs index 86624a78f3..fb0c3ec109 100644 --- a/src/mito2/src/cache/file_cache.rs +++ b/src/mito2/src/cache/file_cache.rs @@ -100,17 +100,11 @@ impl FileCache { self.memory_index.insert(key, value).await; } - async fn get_reader(&self, file_path: &str) -> object_store::Result> { - if self.local_store.is_exist(file_path).await? { - Ok(Some(self.local_store.reader(file_path).await?)) - } else { - Ok(None) - } - } - /// Reads a file from the cache. pub(crate) async fn reader(&self, key: IndexKey) -> Option { - if !self.memory_index.contains_key(&key) { + // We must use `get()` to update the estimator of the cache. + // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key + if self.memory_index.get(&key).await.is_none() { CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); return None; } @@ -194,6 +188,14 @@ impl FileCache { pub(crate) fn local_store(&self) -> ObjectStore { self.local_store.clone() } + + async fn get_reader(&self, file_path: &str) -> object_store::Result> { + if self.local_store.is_exist(file_path).await? { + Ok(Some(self.local_store.reader(file_path).await?)) + } else { + Ok(None) + } + } } /// Key of file cache index. @@ -271,6 +273,10 @@ mod tests { reader.read_to_string(&mut buf).await.unwrap(); assert_eq!("hello", buf); + // Get weighted size. + cache.memory_index.run_pending_tasks().await; + assert_eq!(5, cache.memory_index.weighted_size()); + // Remove the file. cache.remove(key).await; assert!(cache.reader(key).await.is_none()); @@ -280,6 +286,7 @@ mod tests { // The file also not exists. assert!(!local_store.is_exist(&file_path).await.unwrap()); + assert_eq!(0, cache.memory_index.weighted_size()); } #[tokio::test] @@ -321,6 +328,7 @@ mod tests { let region_id = RegionId::new(2000, 0); // Write N files. let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect(); + let mut total_size = 0; for (i, file_id) in file_ids.iter().enumerate() { let key = (region_id, *file_id); let file_path = cache.cache_file_path(key); @@ -336,6 +344,7 @@ mod tests { }, ) .await; + total_size += bytes.len(); } // Recover the cache. @@ -344,6 +353,10 @@ mod tests { assert!(cache.reader((region_id, file_ids[0])).await.is_none()); cache.recover().await.unwrap(); + // Check size. + cache.memory_index.run_pending_tasks().await; + assert_eq!(total_size, cache.memory_index.weighted_size() as usize); + for (i, file_id) in file_ids.iter().enumerate() { let key = (region_id, *file_id); let mut reader = cache.reader(key).await.unwrap(); diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index b640ba8966..9775f3d791 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -17,10 +17,12 @@ use std::sync::Arc; use common_base::readable_size::ReadableSize; +use common_telemetry::info; use object_store::manager::ObjectStoreManagerRef; use object_store::ObjectStore; use store_api::metadata::RegionMetadataRef; +use crate::access_layer::new_fs_object_store; use crate::cache::file_cache::{FileCache, FileCacheRef}; use crate::error::Result; use crate::read::Source; @@ -43,20 +45,30 @@ pub type WriteCacheRef = Arc; impl WriteCache { /// Create the cache with a `local_store` to cache files and a /// `object_store_manager` for all object stores. - pub fn new( + pub async fn new( local_store: ObjectStore, object_store_manager: ObjectStoreManagerRef, cache_capacity: ReadableSize, - ) -> Self { - Self { - file_cache: Arc::new(FileCache::new(local_store, cache_capacity)), + ) -> Result { + let file_cache = FileCache::new(local_store, cache_capacity); + file_cache.recover().await?; + + Ok(Self { + file_cache: Arc::new(file_cache), object_store_manager, - } + }) } - /// Recovers the write cache from local store. - pub async fn recover(&self) -> Result<()> { - self.file_cache.recover().await + /// Creates a write cache based on local fs. + pub async fn new_fs( + cache_dir: &str, + object_store_manager: ObjectStoreManagerRef, + cache_capacity: ReadableSize, + ) -> Result { + info!("Init write cache on {cache_dir}, capacity: {cache_capacity}"); + + let local_store = new_fs_object_store(cache_dir).await?; + Self::new(local_store, object_store_manager, cache_capacity).await } /// Writes SST to the cache and then uploads it to the remote object store. diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 2e779b7602..b56c16addf 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -19,6 +19,9 @@ use std::time::Duration; use common_base::readable_size::ReadableSize; use common_telemetry::warn; use serde::{Deserialize, Serialize}; +use snafu::ensure; + +use crate::error::{InvalidConfigSnafu, Result}; /// Default max running background job. const DEFAULT_MAX_BG_JOB: usize = 4; @@ -67,6 +70,12 @@ pub struct MitoConfig { pub vector_cache_size: ReadableSize, /// Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache. pub page_cache_size: ReadableSize, + /// Whether to enable the experimental write cache. + pub enable_experimental_write_cache: bool, + /// Path for write cache. + pub experimental_write_cache_path: String, + /// Capacity for write cache. + pub experimental_write_cache_size: ReadableSize, // Other configs: /// Buffer size for SST writing. @@ -95,6 +104,9 @@ impl Default for MitoConfig { sst_meta_cache_size: ReadableSize::mb(128), vector_cache_size: ReadableSize::mb(512), page_cache_size: ReadableSize::mb(512), + enable_experimental_write_cache: false, + experimental_write_cache_path: String::new(), + experimental_write_cache_size: ReadableSize::mb(512), sst_write_buffer_size: ReadableSize::mb(8), scan_parallelism: divide_num_cpus(4), parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, @@ -104,7 +116,9 @@ impl Default for MitoConfig { impl MitoConfig { /// Sanitize incorrect configurations. - pub(crate) fn sanitize(&mut self) { + /// + /// Returns an error if there is a configuration that unable to sanitize. + pub(crate) fn sanitize(&mut self) -> Result<()> { // Use default value if `num_workers` is 0. if self.num_workers == 0 { self.num_workers = divide_num_cpus(2); @@ -149,6 +163,17 @@ impl MitoConfig { self.parallel_scan_channel_size ); } + + if self.enable_experimental_write_cache { + ensure!( + !self.experimental_write_cache_path.is_empty(), + InvalidConfigSnafu { + reason: "experimental_write_cache_path should not be empty", + } + ); + } + + Ok(()) } } diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 2a718d88ea..577d5131d0 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -77,16 +77,16 @@ pub struct MitoEngine { impl MitoEngine { /// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`. - pub fn new( + pub async fn new( mut config: MitoConfig, log_store: Arc, object_store_manager: ObjectStoreManagerRef, - ) -> MitoEngine { - config.sanitize(); + ) -> Result { + config.sanitize()?; - MitoEngine { - inner: Arc::new(EngineInner::new(config, log_store, object_store_manager)), - } + Ok(MitoEngine { + inner: Arc::new(EngineInner::new(config, log_store, object_store_manager).await?), + }) } /// Returns true if the specific region exists. @@ -126,16 +126,16 @@ struct EngineInner { impl EngineInner { /// Returns a new [EngineInner] with specific `config`, `log_store` and `object_store`. - fn new( + async fn new( config: MitoConfig, log_store: Arc, object_store_manager: ObjectStoreManagerRef, - ) -> EngineInner { + ) -> Result { let config = Arc::new(config); - EngineInner { - workers: WorkerGroup::start(config.clone(), log_store, object_store_manager), + Ok(EngineInner { + workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?, config, - } + }) } /// Stop the inner engine. @@ -314,17 +314,17 @@ impl RegionEngine for MitoEngine { #[cfg(any(test, feature = "test"))] impl MitoEngine { /// Returns a new [MitoEngine] for tests. - pub fn new_for_test( + pub async fn new_for_test( mut config: MitoConfig, log_store: Arc, object_store_manager: ObjectStoreManagerRef, write_buffer_manager: Option, listener: Option, - ) -> MitoEngine { - config.sanitize(); + ) -> Result { + config.sanitize()?; let config = Arc::new(config); - MitoEngine { + Ok(MitoEngine { inner: Arc::new(EngineInner { workers: WorkerGroup::start_for_test( config.clone(), @@ -332,9 +332,10 @@ impl MitoEngine { object_store_manager, write_buffer_manager, listener, - ), + ) + .await?, config, }), - } + }) } } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 044a4be584..b630680728 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -429,35 +429,30 @@ pub enum Error { #[snafu(display("Failed to build index applier"))] BuildIndexApplier { - #[snafu(source)] source: index::inverted_index::error::Error, location: Location, }, #[snafu(display("Failed to convert value"))] ConvertValue { - #[snafu(source)] source: datatypes::error::Error, location: Location, }, #[snafu(display("Failed to apply index"))] ApplyIndex { - #[snafu(source)] source: index::inverted_index::error::Error, location: Location, }, #[snafu(display("Failed to read puffin metadata"))] PuffinReadMetadata { - #[snafu(source)] source: puffin::error::Error, location: Location, }, #[snafu(display("Failed to read puffin blob"))] PuffinReadBlob { - #[snafu(source)] source: puffin::error::Error, location: Location, }, @@ -467,6 +462,17 @@ pub enum Error { blob_type: String, location: Location, }, + + #[snafu(display("Failed to clean dir {dir}"))] + CleanDir { + dir: String, + #[snafu(source)] + error: std::io::Error, + location: Location, + }, + + #[snafu(display("Invalid config, {reason}"))] + InvalidConfig { reason: String, location: Location }, } pub type Result = std::result::Result; @@ -555,6 +561,8 @@ impl ErrorExt for Error { PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => { source.status_code() } + CleanDir { .. } => StatusCode::Unexpected, + InvalidConfig { .. } => StatusCode::InvalidArguments, } } diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 49c68e489f..381fd3b8c8 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -743,7 +743,7 @@ mod tests { listener: WorkerListener::default(), engine_config: Arc::new(MitoConfig::default()), row_group_size: None, - cache_manager: Arc::new(CacheManager::new(0, 0, 0)), + cache_manager: Arc::new(CacheManager::default()), }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index c5e6feefcb..5f4fd67edc 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -342,7 +342,8 @@ mod tests { assert_eq!([0, 1, 2, 3, 4], mapper.column_ids()); assert_eq!([3, 4], mapper.batch_fields()); - let cache = CacheManager::new(0, 1024, 0); + // With vector cache. + let cache = CacheManager::builder().vector_cache_size(1024).build(); let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); let expect = "\ diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 20259672e3..5b6a088729 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -170,7 +170,12 @@ mod tests { .unwrap() .unwrap(); - let cache = Some(Arc::new(CacheManager::new(0, 0, 64 * 1024 * 1024))); + // Enable page cache. + let cache = Some(Arc::new( + CacheManager::builder() + .page_cache_size(64 * 1024 * 1024) + .build(), + )); let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store) .cache(cache.clone()); for _ in 0..3 { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index b2006098b6..73795744ff 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -137,6 +137,8 @@ impl TestEnv { self.logstore = Some(logstore.clone()); self.object_store_manager = Some(object_store_manager.clone()); MitoEngine::new(config, logstore, object_store_manager) + .await + .unwrap() } /// Creates a new engine with specific config and existing logstore and object store manager. @@ -145,6 +147,8 @@ impl TestEnv { let object_store_manager = self.object_store_manager.as_ref().unwrap().clone(); MitoEngine::new(config, logstore, object_store_manager) + .await + .unwrap() } /// Creates a new engine with specific config and manager/listener under this env. @@ -161,6 +165,8 @@ impl TestEnv { self.logstore = Some(logstore.clone()); self.object_store_manager = Some(object_store_manager.clone()); MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener) + .await + .unwrap() } pub async fn create_engine_with_multiple_object_stores( @@ -190,6 +196,8 @@ impl TestEnv { self.logstore = Some(logstore.clone()); self.object_store_manager = Some(object_store_manager.clone()); MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener) + .await + .unwrap() } /// Reopen the engine. @@ -201,6 +209,8 @@ impl TestEnv { self.logstore.clone().unwrap(), self.object_store_manager.clone().unwrap(), ) + .await + .unwrap() } /// Open the engine. @@ -210,6 +220,8 @@ impl TestEnv { self.logstore.clone().unwrap(), self.object_store_manager.clone().unwrap(), ) + .await + .unwrap() } /// Only initializes the object store manager, returns the default object store. @@ -227,6 +239,8 @@ impl TestEnv { Arc::new(log_store), Arc::new(object_store_manager), ) + .await + .unwrap() } /// Returns the log store and object store manager. diff --git a/src/mito2/src/test_util/scheduler_util.rs b/src/mito2/src/test_util/scheduler_util.rs index 3cf69c8456..445151f12f 100644 --- a/src/mito2/src/test_util/scheduler_util.rs +++ b/src/mito2/src/test_util/scheduler_util.rs @@ -66,11 +66,7 @@ impl SchedulerEnv { ) -> CompactionScheduler { let scheduler = self.get_scheduler(); - CompactionScheduler::new( - scheduler, - request_sender, - Arc::new(CacheManager::new(0, 0, 0)), - ) + CompactionScheduler::new(scheduler, request_sender, Arc::new(CacheManager::default())) } /// Creates a new flush scheduler. diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 40bb14a401..09cb59aa1b 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -42,6 +42,7 @@ use store_api::storage::RegionId; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, oneshot, Mutex}; +use crate::cache::write_cache::{WriteCache, WriteCacheRef}; use crate::cache::{CacheManager, CacheManagerRef}; use crate::compaction::CompactionScheduler; use crate::config::MitoConfig; @@ -111,20 +112,24 @@ impl WorkerGroup { /// Starts a worker group. /// /// The number of workers should be power of two. - pub(crate) fn start( + pub(crate) async fn start( config: Arc, log_store: Arc, object_store_manager: ObjectStoreManagerRef, - ) -> WorkerGroup { + ) -> Result { let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new( config.global_write_buffer_size.as_bytes() as usize, )); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let cache_manager = Arc::new(CacheManager::new( - config.sst_meta_cache_size.as_bytes(), - config.vector_cache_size.as_bytes(), - config.page_cache_size.as_bytes(), - )); + let write_cache = write_cache_from_config(&config, object_store_manager.clone()).await?; + let cache_manager = Arc::new( + CacheManager::builder() + .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes()) + .vector_cache_size(config.vector_cache_size.as_bytes()) + .page_cache_size(config.page_cache_size.as_bytes()) + .write_cache(write_cache) + .build(), + ); let workers = (0..config.num_workers) .map(|id| { @@ -142,11 +147,11 @@ impl WorkerGroup { }) .collect(); - WorkerGroup { + Ok(WorkerGroup { workers, scheduler, cache_manager, - } + }) } /// Stops the worker group. @@ -204,24 +209,28 @@ impl WorkerGroup { /// Starts a worker group with `write_buffer_manager` and `listener` for tests. /// /// The number of workers should be power of two. - pub(crate) fn start_for_test( + pub(crate) async fn start_for_test( config: Arc, log_store: Arc, object_store_manager: ObjectStoreManagerRef, write_buffer_manager: Option, listener: Option, - ) -> WorkerGroup { + ) -> Result { let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| { Arc::new(WriteBufferManagerImpl::new( config.global_write_buffer_size.as_bytes() as usize, )) }); let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs)); - let cache_manager = Arc::new(CacheManager::new( - config.sst_meta_cache_size.as_bytes(), - config.vector_cache_size.as_bytes(), - config.page_cache_size.as_bytes(), - )); + let write_cache = write_cache_from_config(&config, object_store_manager.clone()).await?; + let cache_manager = Arc::new( + CacheManager::builder() + .sst_meta_cache_size(config.sst_meta_cache_size.as_bytes()) + .vector_cache_size(config.vector_cache_size.as_bytes()) + .page_cache_size(config.page_cache_size.as_bytes()) + .write_cache(write_cache) + .build(), + ); let workers = (0..config.num_workers) .map(|id| { @@ -239,11 +248,11 @@ impl WorkerGroup { }) .collect(); - WorkerGroup { + Ok(WorkerGroup { workers, scheduler, cache_manager, - } + }) } } @@ -251,6 +260,26 @@ fn value_to_index(value: usize, num_workers: usize) -> usize { value % num_workers } +async fn write_cache_from_config( + config: &MitoConfig, + object_store_manager: ObjectStoreManagerRef, +) -> Result> { + if !config.enable_experimental_write_cache { + return Ok(None); + } + + // TODO(yingwen): Remove this and document the config once the write cache is ready. + warn!("Write cache is an experimental feature"); + + let cache = WriteCache::new_fs( + &config.experimental_write_cache_path, + object_store_manager, + config.experimental_write_cache_size, + ) + .await?; + Ok(Some(Arc::new(cache))) +} + /// Worker start config. struct WorkerStarter { id: WorkerId, diff --git a/src/object-store/src/util.rs b/src/object-store/src/util.rs index 3ff71f8fce..a357fb4fa2 100644 --- a/src/object-store/src/util.rs +++ b/src/object-store/src/util.rs @@ -13,8 +13,13 @@ // limitations under the License. use futures::TryStreamExt; +use opendal::layers::{LoggingLayer, TracingLayer}; use opendal::{Entry, Lister}; +use crate::layers::PrometheusMetricsLayer; +use crate::ObjectStore; + +/// Collect all entries from the [Lister]. pub async fn collect(stream: Lister) -> Result, opendal::Error> { stream.try_collect::>().await } @@ -52,6 +57,20 @@ pub fn join_path(parent: &str, child: &str) -> String { opendal::raw::normalize_path(&output) } +/// Attaches instrument layers to the object store. +pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore { + object_store + .layer( + LoggingLayer::default() + // Print the expected error only in DEBUG level. + // See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level + .with_error_level(Some("debug")) + .expect("input error level must be valid"), + ) + .layer(TracingLayer) + .layer(PrometheusMetricsLayer) +} + #[cfg(test)] mod tests { use super::*; diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 8495c0b0f0..ade16f8bae 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -730,6 +730,9 @@ global_write_buffer_reject_size = "2GiB" sst_meta_cache_size = "128MiB" vector_cache_size = "512MiB" page_cache_size = "512MiB" +enable_experimental_write_cache = false +experimental_write_cache_path = "" +experimental_write_cache_size = "512MiB" sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32