feat(mito): Init the write cache in datanode (#3100)

* feat: add builder to build cache manager

* refactor: make MitoEngine::new async

* refactor: refactor object store creation

* refactor: add helper fn to attaches layers

* feat: fn to build fs store

* feat: add write cache to engine

* feat: config write cache

* style: fix clippy

* test: fix test

* feat: add warning

* chore: add experimental prefix to configs

* test: fix config test

* test: test weighted size

* feat: add switch to enable write cache

* fix: update cache stats by using get

* style: use then
This commit is contained in:
Yingwen
2024-01-09 12:40:22 +08:00
committed by GitHub
parent af0c4c068a
commit 8bd4a36136
23 changed files with 362 additions and 167 deletions

View File

@@ -42,7 +42,7 @@ use metric_engine::engine::MetricEngine;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::util::normalize_dir;
use object_store::util::{join_dir, normalize_dir};
use query::QueryEngineFactory;
use servers::export_metrics::ExportMetricsTask;
use servers::grpc::{GrpcServer, GrpcServerConfig};
@@ -60,9 +60,9 @@ use tokio::sync::Notify;
use crate::config::{DatanodeOptions, RegionEngineConfig};
use crate::error::{
CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu,
ParseAddrSnafu, Result, RuntimeResourceSnafu, ShutdownInstanceSnafu, ShutdownServerSnafu,
StartServerSnafu,
BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu, MissingKvBackendSnafu,
MissingNodeIdSnafu, OpenLogStoreSnafu, ParseAddrSnafu, Result, RuntimeResourceSnafu,
ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
};
use crate::event_listener::{
new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
@@ -458,20 +458,33 @@ impl DatanodeBuilder {
async fn build_mito_engine(
opts: &DatanodeOptions,
object_store_manager: ObjectStoreManagerRef,
config: MitoConfig,
mut config: MitoConfig,
) -> Result<MitoEngine> {
// Sets write cache path if it is empty.
if config.experimental_write_cache_path.is_empty() {
config.experimental_write_cache_path = join_dir(&opts.storage.data_home, "write_cache");
info!(
"Sets write cache path to {}",
config.experimental_write_cache_path
);
}
let mito_engine = match &opts.wal {
WalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
config,
Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
.await?,
object_store_manager,
),
)
.await
.context(BuildMitoEngineSnafu)?,
WalConfig::Kafka(kafka_config) => MitoEngine::new(
config,
Self::build_kafka_log_store(kafka_config).await?,
object_store_manager,
),
)
.await
.context(BuildMitoEngineSnafu)?,
};
Ok(mito_engine)
}

View File

@@ -282,6 +282,12 @@ pub enum Error {
source: metric_engine::error::Error,
location: Location,
},
#[snafu(display("Failed to build mito engine"))]
BuildMitoEngine {
source: mito2::error::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -352,6 +358,7 @@ impl ErrorExt for Error {
StopRegionEngine { source, .. } => source.status_code(),
FindLogicalRegions { source, .. } => source.status_code(),
BuildMitoEngine { source, .. } => source.status_code(),
}
}

View File

@@ -26,10 +26,10 @@ use std::{env, path};
use common_base::readable_size::ReadableSize;
use common_telemetry::logging::info;
use object_store::layers::{LoggingLayer, LruCacheLayer, RetryLayer, TracingLayer};
use object_store::services::Fs as FsBuilder;
use object_store::util::normalize_dir;
use object_store::{util, HttpClient, ObjectStore, ObjectStoreBuilder};
use object_store::layers::{LruCacheLayer, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
@@ -60,16 +60,7 @@ pub(crate) async fn new_object_store(
object_store
};
let store = object_store
.layer(
LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some("debug"))
.expect("input error level must be valid"),
)
.layer(TracingLayer)
.layer(object_store::layers::PrometheusMetricsLayer);
let store = with_instrument_layers(object_store);
Ok(store)
}
@@ -114,11 +105,10 @@ async fn create_object_store_with_cache(
};
if let Some(path) = cache_path {
let path = util::normalize_dir(path);
let atomic_temp_dir = format!("{path}.tmp/");
let atomic_temp_dir = join_dir(path, ".tmp/");
clean_temp_dir(&atomic_temp_dir)?;
let cache_store = FsBuilder::default()
.root(&path)
let cache_store = Fs::default()
.root(path)
.atomic_write_dir(&atomic_temp_dir)
.build()
.context(error::InitBackendSnafu)?;

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_telemetry::logging::info;
use object_store::services::Azblob as AzureBuilder;
use object_store::services::Azblob;
use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
@@ -30,7 +30,7 @@ pub(crate) async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Res
azblob_config.container, &root
);
let mut builder = AzureBuilder::default();
let mut builder = Azblob::default();
let _ = builder
.root(&root)
.container(&azblob_config.container)

View File

@@ -15,7 +15,8 @@
use std::{fs, path};
use common_telemetry::logging::info;
use object_store::services::Fs as FsBuilder;
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use snafu::prelude::*;
@@ -31,10 +32,10 @@ pub(crate) async fn new_fs_object_store(
.context(error::CreateDirSnafu { dir: data_home })?;
info!("The file storage home is: {}", data_home);
let atomic_write_dir = format!("{data_home}.tmp/");
let atomic_write_dir = join_dir(data_home, ".tmp/");
store::clean_temp_dir(&atomic_write_dir)?;
let mut builder = FsBuilder::default();
let mut builder = Fs::default();
let _ = builder.root(data_home).atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder)

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_telemetry::logging::info;
use object_store::services::Gcs as GCSBuilder;
use object_store::services::Gcs;
use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
@@ -29,7 +29,7 @@ pub(crate) async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<Objec
gcs_config.bucket, &root
);
let mut builder = GCSBuilder::default();
let mut builder = Gcs::default();
builder
.root(&root)
.bucket(&gcs_config.bucket)

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_telemetry::logging::info;
use object_store::services::Oss as OSSBuilder;
use object_store::services::Oss;
use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
@@ -29,7 +29,7 @@ pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result<Objec
oss_config.bucket, &root
);
let mut builder = OSSBuilder::default();
let mut builder = Oss::default();
let _ = builder
.root(&root)
.bucket(&oss_config.bucket)

View File

@@ -13,7 +13,7 @@
// limitations under the License.
use common_telemetry::logging::info;
use object_store::services::S3 as S3Builder;
use object_store::services::S3;
use object_store::{util, ObjectStore};
use secrecy::ExposeSecret;
use snafu::prelude::*;
@@ -30,7 +30,7 @@ pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectSt
s3_config.bucket, &root
);
let mut builder = S3Builder::default();
let mut builder = S3::default();
let _ = builder
.root(&root)
.bucket(&s3_config.bucket)

View File

@@ -14,13 +14,15 @@
use std::sync::Arc;
use object_store::services::Fs;
use object_store::util::{join_dir, with_instrument_layers};
use object_store::ObjectStore;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::error::{DeleteSstSnafu, Result};
use crate::error::{CleanDirSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::sst::file::{FileHandle, FileId};
use crate::sst::location;
@@ -119,3 +121,31 @@ pub(crate) struct SstWriteRequest {
pub(crate) cache_manager: CacheManagerRef,
pub(crate) storage: Option<String>,
}
/// Creates a fs object store with atomic write dir.
pub(crate) async fn new_fs_object_store(root: &str) -> Result<ObjectStore> {
let atomic_write_dir = join_dir(root, ".tmp/");
clean_dir(&atomic_write_dir).await?;
let mut builder = Fs::default();
builder.root(root).atomic_write_dir(&atomic_write_dir);
let object_store = ObjectStore::new(builder).context(OpenDalSnafu)?.finish();
// Add layers.
let object_store = with_instrument_layers(object_store);
Ok(object_store)
}
/// Clean the directory.
async fn clean_dir(dir: &str) -> Result<()> {
if tokio::fs::try_exists(dir)
.await
.context(CleanDirSnafu { dir })?
{
tokio::fs::remove_dir_all(dir)
.await
.context(CleanDirSnafu { dir })?;
}
Ok(())
}

View File

@@ -47,9 +47,10 @@ const PAGE_TYPE: &str = "page";
// Metrics type key for files on the local store.
const FILE_TYPE: &str = "file";
// TODO(yingwen): Builder for cache manager.
/// Manages cached data for the engine.
///
/// All caches are disabled by default.
#[derive(Default)]
pub struct CacheManager {
/// Cache for SST metadata.
sst_meta_cache: Option<SstMetaCache>,
@@ -58,70 +59,15 @@ pub struct CacheManager {
/// Cache for SST pages.
page_cache: Option<PageCache>,
/// A Cache for writing files to object stores.
// TODO(yingwen): Remove this once the cache is ready.
#[allow(unused)]
write_cache: Option<WriteCacheRef>,
}
pub type CacheManagerRef = Arc<CacheManager>;
impl CacheManager {
/// Creates a new manager with specific cache size in bytes.
pub fn new(
sst_meta_cache_size: u64,
vector_cache_size: u64,
page_cache_size: u64,
) -> CacheManager {
let sst_meta_cache = if sst_meta_cache_size == 0 {
None
} else {
let cache = Cache::builder()
.max_capacity(sst_meta_cache_size)
.weigher(meta_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = meta_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[SST_META_TYPE])
.sub(size.into());
})
.build();
Some(cache)
};
let vector_cache = if vector_cache_size == 0 {
None
} else {
let cache = Cache::builder()
.max_capacity(vector_cache_size)
.weigher(vector_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = vector_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.sub(size.into());
})
.build();
Some(cache)
};
let page_cache = if page_cache_size == 0 {
None
} else {
let cache = Cache::builder()
.max_capacity(page_cache_size)
.weigher(page_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = page_cache_weight(&k, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
})
.build();
Some(cache)
};
CacheManager {
sst_meta_cache,
vector_cache,
page_cache,
write_cache: None,
}
/// Returns a builder to build the cache.
pub fn builder() -> CacheManagerBuilder {
CacheManagerBuilder::default()
}
/// Gets cached [ParquetMetaData].
@@ -201,6 +147,86 @@ impl CacheManager {
}
}
/// Builder to construct a [CacheManager].
#[derive(Default)]
pub struct CacheManagerBuilder {
sst_meta_cache_size: u64,
vector_cache_size: u64,
page_cache_size: u64,
write_cache: Option<WriteCacheRef>,
}
impl CacheManagerBuilder {
/// Sets meta cache size.
pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
self.sst_meta_cache_size = bytes;
self
}
/// Sets vector cache size.
pub fn vector_cache_size(mut self, bytes: u64) -> Self {
self.vector_cache_size = bytes;
self
}
/// Sets page cache size.
pub fn page_cache_size(mut self, bytes: u64) -> Self {
self.page_cache_size = bytes;
self
}
/// Sets write cache.
pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
self.write_cache = cache;
self
}
/// Builds the [CacheManager].
pub fn build(self) -> CacheManager {
let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.sst_meta_cache_size)
.weigher(meta_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = meta_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[SST_META_TYPE])
.sub(size.into());
})
.build()
});
let vector_cache = (self.vector_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.vector_cache_size)
.weigher(vector_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = vector_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.sub(size.into());
})
.build()
});
let page_cache = (self.page_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.page_cache_size)
.weigher(page_cache_weight)
.eviction_listener(|k, v, _cause| {
let size = page_cache_weight(&k, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
})
.build()
});
CacheManager {
sst_meta_cache,
vector_cache,
page_cache,
write_cache: self.write_cache,
}
}
}
fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
// We ignore the size of `Arc`.
(k.estimated_size() + parquet_meta_size(v)) as u32
@@ -293,7 +319,7 @@ mod tests {
#[test]
fn test_disable_cache() {
let cache = CacheManager::new(0, 0, 0);
let cache = CacheManager::default();
assert!(cache.sst_meta_cache.is_none());
assert!(cache.vector_cache.is_none());
assert!(cache.page_cache.is_none());
@@ -318,11 +344,13 @@ mod tests {
let pages = Arc::new(PageValue::new(Vec::new()));
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_none());
assert!(cache.write_cache().is_none());
}
#[test]
fn test_parquet_meta_cache() {
let cache = CacheManager::new(2000, 0, 0);
let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
assert!(cache.get_parquet_meta_data(region_id, file_id).is_none());
@@ -335,7 +363,7 @@ mod tests {
#[test]
fn test_repeated_vector_cache() {
let cache = CacheManager::new(0, 4096, 0);
let cache = CacheManager::builder().vector_cache_size(4096).build();
let value = Value::Int64(10);
assert!(cache.get_repeated_vector(&value).is_none());
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
@@ -346,7 +374,7 @@ mod tests {
#[test]
fn test_page_cache() {
let cache = CacheManager::new(0, 0, 1000);
let cache = CacheManager::builder().page_cache_size(1000).build();
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let key = PageKey {

View File

@@ -100,17 +100,11 @@ impl FileCache {
self.memory_index.insert(key, value).await;
}
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
if self.local_store.is_exist(file_path).await? {
Ok(Some(self.local_store.reader(file_path).await?))
} else {
Ok(None)
}
}
/// Reads a file from the cache.
pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
if !self.memory_index.contains_key(&key) {
// We must use `get()` to update the estimator of the cache.
// See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
if self.memory_index.get(&key).await.is_none() {
CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
return None;
}
@@ -194,6 +188,14 @@ impl FileCache {
pub(crate) fn local_store(&self) -> ObjectStore {
self.local_store.clone()
}
async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
if self.local_store.is_exist(file_path).await? {
Ok(Some(self.local_store.reader(file_path).await?))
} else {
Ok(None)
}
}
}
/// Key of file cache index.
@@ -271,6 +273,10 @@ mod tests {
reader.read_to_string(&mut buf).await.unwrap();
assert_eq!("hello", buf);
// Get weighted size.
cache.memory_index.run_pending_tasks().await;
assert_eq!(5, cache.memory_index.weighted_size());
// Remove the file.
cache.remove(key).await;
assert!(cache.reader(key).await.is_none());
@@ -280,6 +286,7 @@ mod tests {
// The file also not exists.
assert!(!local_store.is_exist(&file_path).await.unwrap());
assert_eq!(0, cache.memory_index.weighted_size());
}
#[tokio::test]
@@ -321,6 +328,7 @@ mod tests {
let region_id = RegionId::new(2000, 0);
// Write N files.
let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
let mut total_size = 0;
for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let file_path = cache.cache_file_path(key);
@@ -336,6 +344,7 @@ mod tests {
},
)
.await;
total_size += bytes.len();
}
// Recover the cache.
@@ -344,6 +353,10 @@ mod tests {
assert!(cache.reader((region_id, file_ids[0])).await.is_none());
cache.recover().await.unwrap();
// Check size.
cache.memory_index.run_pending_tasks().await;
assert_eq!(total_size, cache.memory_index.weighted_size() as usize);
for (i, file_id) in file_ids.iter().enumerate() {
let key = (region_id, *file_id);
let mut reader = cache.reader(key).await.unwrap();

View File

@@ -17,10 +17,12 @@
use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use store_api::metadata::RegionMetadataRef;
use crate::access_layer::new_fs_object_store;
use crate::cache::file_cache::{FileCache, FileCacheRef};
use crate::error::Result;
use crate::read::Source;
@@ -43,20 +45,30 @@ pub type WriteCacheRef = Arc<WriteCache>;
impl WriteCache {
/// Create the cache with a `local_store` to cache files and a
/// `object_store_manager` for all object stores.
pub fn new(
pub async fn new(
local_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
) -> Self {
Self {
file_cache: Arc::new(FileCache::new(local_store, cache_capacity)),
) -> Result<Self> {
let file_cache = FileCache::new(local_store, cache_capacity);
file_cache.recover().await?;
Ok(Self {
file_cache: Arc::new(file_cache),
object_store_manager,
}
})
}
/// Recovers the write cache from local store.
pub async fn recover(&self) -> Result<()> {
self.file_cache.recover().await
/// Creates a write cache based on local fs.
pub async fn new_fs(
cache_dir: &str,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
) -> Result<Self> {
info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
let local_store = new_fs_object_store(cache_dir).await?;
Self::new(local_store, object_store_manager, cache_capacity).await
}
/// Writes SST to the cache and then uploads it to the remote object store.

View File

@@ -19,6 +19,9 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use serde::{Deserialize, Serialize};
use snafu::ensure;
use crate::error::{InvalidConfigSnafu, Result};
/// Default max running background job.
const DEFAULT_MAX_BG_JOB: usize = 4;
@@ -67,6 +70,12 @@ pub struct MitoConfig {
pub vector_cache_size: ReadableSize,
/// Cache size for pages of SST row groups (default 512MB). Setting it to 0 to disable the cache.
pub page_cache_size: ReadableSize,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
/// Path for write cache.
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub experimental_write_cache_size: ReadableSize,
// Other configs:
/// Buffer size for SST writing.
@@ -95,6 +104,9 @@ impl Default for MitoConfig {
sst_meta_cache_size: ReadableSize::mb(128),
vector_cache_size: ReadableSize::mb(512),
page_cache_size: ReadableSize::mb(512),
enable_experimental_write_cache: false,
experimental_write_cache_path: String::new(),
experimental_write_cache_size: ReadableSize::mb(512),
sst_write_buffer_size: ReadableSize::mb(8),
scan_parallelism: divide_num_cpus(4),
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
@@ -104,7 +116,9 @@ impl Default for MitoConfig {
impl MitoConfig {
/// Sanitize incorrect configurations.
pub(crate) fn sanitize(&mut self) {
///
/// Returns an error if there is a configuration that unable to sanitize.
pub(crate) fn sanitize(&mut self) -> Result<()> {
// Use default value if `num_workers` is 0.
if self.num_workers == 0 {
self.num_workers = divide_num_cpus(2);
@@ -149,6 +163,17 @@ impl MitoConfig {
self.parallel_scan_channel_size
);
}
if self.enable_experimental_write_cache {
ensure!(
!self.experimental_write_cache_path.is_empty(),
InvalidConfigSnafu {
reason: "experimental_write_cache_path should not be empty",
}
);
}
Ok(())
}
}

View File

@@ -77,16 +77,16 @@ pub struct MitoEngine {
impl MitoEngine {
/// Returns a new [MitoEngine] with specific `config`, `log_store` and `object_store`.
pub fn new<S: LogStore>(
pub async fn new<S: LogStore>(
mut config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
) -> MitoEngine {
config.sanitize();
) -> Result<MitoEngine> {
config.sanitize()?;
MitoEngine {
inner: Arc::new(EngineInner::new(config, log_store, object_store_manager)),
}
Ok(MitoEngine {
inner: Arc::new(EngineInner::new(config, log_store, object_store_manager).await?),
})
}
/// Returns true if the specific region exists.
@@ -126,16 +126,16 @@ struct EngineInner {
impl EngineInner {
/// Returns a new [EngineInner] with specific `config`, `log_store` and `object_store`.
fn new<S: LogStore>(
async fn new<S: LogStore>(
config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
) -> EngineInner {
) -> Result<EngineInner> {
let config = Arc::new(config);
EngineInner {
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager),
Ok(EngineInner {
workers: WorkerGroup::start(config.clone(), log_store, object_store_manager).await?,
config,
}
})
}
/// Stop the inner engine.
@@ -314,17 +314,17 @@ impl RegionEngine for MitoEngine {
#[cfg(any(test, feature = "test"))]
impl MitoEngine {
/// Returns a new [MitoEngine] for tests.
pub fn new_for_test<S: LogStore>(
pub async fn new_for_test<S: LogStore>(
mut config: MitoConfig,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<crate::flush::WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> MitoEngine {
config.sanitize();
) -> Result<MitoEngine> {
config.sanitize()?;
let config = Arc::new(config);
MitoEngine {
Ok(MitoEngine {
inner: Arc::new(EngineInner {
workers: WorkerGroup::start_for_test(
config.clone(),
@@ -332,9 +332,10 @@ impl MitoEngine {
object_store_manager,
write_buffer_manager,
listener,
),
)
.await?,
config,
}),
}
})
}
}

View File

@@ -429,35 +429,30 @@ pub enum Error {
#[snafu(display("Failed to build index applier"))]
BuildIndexApplier {
#[snafu(source)]
source: index::inverted_index::error::Error,
location: Location,
},
#[snafu(display("Failed to convert value"))]
ConvertValue {
#[snafu(source)]
source: datatypes::error::Error,
location: Location,
},
#[snafu(display("Failed to apply index"))]
ApplyIndex {
#[snafu(source)]
source: index::inverted_index::error::Error,
location: Location,
},
#[snafu(display("Failed to read puffin metadata"))]
PuffinReadMetadata {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},
#[snafu(display("Failed to read puffin blob"))]
PuffinReadBlob {
#[snafu(source)]
source: puffin::error::Error,
location: Location,
},
@@ -467,6 +462,17 @@ pub enum Error {
blob_type: String,
location: Location,
},
#[snafu(display("Failed to clean dir {dir}"))]
CleanDir {
dir: String,
#[snafu(source)]
error: std::io::Error,
location: Location,
},
#[snafu(display("Invalid config, {reason}"))]
InvalidConfig { reason: String, location: Location },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -555,6 +561,8 @@ impl ErrorExt for Error {
PuffinReadMetadata { source, .. } | PuffinReadBlob { source, .. } => {
source.status_code()
}
CleanDir { .. } => StatusCode::Unexpected,
InvalidConfig { .. } => StatusCode::InvalidArguments,
}
}

View File

@@ -743,7 +743,7 @@ mod tests {
listener: WorkerListener::default(),
engine_config: Arc::new(MitoConfig::default()),
row_group_size: None,
cache_manager: Arc::new(CacheManager::new(0, 0, 0)),
cache_manager: Arc::new(CacheManager::default()),
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler

View File

@@ -342,7 +342,8 @@ mod tests {
assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
assert_eq!([3, 4], mapper.batch_fields());
let cache = CacheManager::new(0, 1024, 0);
// With vector cache.
let cache = CacheManager::builder().vector_cache_size(1024).build();
let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
let record_batch = mapper.convert(&batch, Some(&cache)).unwrap();
let expect = "\

View File

@@ -170,7 +170,12 @@ mod tests {
.unwrap()
.unwrap();
let cache = Some(Arc::new(CacheManager::new(0, 0, 64 * 1024 * 1024)));
// Enable page cache.
let cache = Some(Arc::new(
CacheManager::builder()
.page_cache_size(64 * 1024 * 1024)
.build(),
));
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
.cache(cache.clone());
for _ in 0..3 {

View File

@@ -137,6 +137,8 @@ impl TestEnv {
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new(config, logstore, object_store_manager)
.await
.unwrap()
}
/// Creates a new engine with specific config and existing logstore and object store manager.
@@ -145,6 +147,8 @@ impl TestEnv {
let object_store_manager = self.object_store_manager.as_ref().unwrap().clone();
MitoEngine::new(config, logstore, object_store_manager)
.await
.unwrap()
}
/// Creates a new engine with specific config and manager/listener under this env.
@@ -161,6 +165,8 @@ impl TestEnv {
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener)
.await
.unwrap()
}
pub async fn create_engine_with_multiple_object_stores(
@@ -190,6 +196,8 @@ impl TestEnv {
self.logstore = Some(logstore.clone());
self.object_store_manager = Some(object_store_manager.clone());
MitoEngine::new_for_test(config, logstore, object_store_manager, manager, listener)
.await
.unwrap()
}
/// Reopen the engine.
@@ -201,6 +209,8 @@ impl TestEnv {
self.logstore.clone().unwrap(),
self.object_store_manager.clone().unwrap(),
)
.await
.unwrap()
}
/// Open the engine.
@@ -210,6 +220,8 @@ impl TestEnv {
self.logstore.clone().unwrap(),
self.object_store_manager.clone().unwrap(),
)
.await
.unwrap()
}
/// Only initializes the object store manager, returns the default object store.
@@ -227,6 +239,8 @@ impl TestEnv {
Arc::new(log_store),
Arc::new(object_store_manager),
)
.await
.unwrap()
}
/// Returns the log store and object store manager.

View File

@@ -66,11 +66,7 @@ impl SchedulerEnv {
) -> CompactionScheduler {
let scheduler = self.get_scheduler();
CompactionScheduler::new(
scheduler,
request_sender,
Arc::new(CacheManager::new(0, 0, 0)),
)
CompactionScheduler::new(scheduler, request_sender, Arc::new(CacheManager::default()))
}
/// Creates a new flush scheduler.

View File

@@ -42,6 +42,7 @@ use store_api::storage::RegionId;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{mpsc, oneshot, Mutex};
use crate::cache::write_cache::{WriteCache, WriteCacheRef};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::CompactionScheduler;
use crate::config::MitoConfig;
@@ -111,20 +112,24 @@ impl WorkerGroup {
/// Starts a worker group.
///
/// The number of workers should be power of two.
pub(crate) fn start<S: LogStore>(
pub(crate) async fn start<S: LogStore>(
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
) -> WorkerGroup {
) -> Result<WorkerGroup> {
let write_buffer_manager = Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
));
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let cache_manager = Arc::new(CacheManager::new(
config.sst_meta_cache_size.as_bytes(),
config.vector_cache_size.as_bytes(),
config.page_cache_size.as_bytes(),
));
let write_cache = write_cache_from_config(&config, object_store_manager.clone()).await?;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
);
let workers = (0..config.num_workers)
.map(|id| {
@@ -142,11 +147,11 @@ impl WorkerGroup {
})
.collect();
WorkerGroup {
Ok(WorkerGroup {
workers,
scheduler,
cache_manager,
}
})
}
/// Stops the worker group.
@@ -204,24 +209,28 @@ impl WorkerGroup {
/// Starts a worker group with `write_buffer_manager` and `listener` for tests.
///
/// The number of workers should be power of two.
pub(crate) fn start_for_test<S: LogStore>(
pub(crate) async fn start_for_test<S: LogStore>(
config: Arc<MitoConfig>,
log_store: Arc<S>,
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: Option<WriteBufferManagerRef>,
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerGroup {
) -> Result<WorkerGroup> {
let write_buffer_manager = write_buffer_manager.unwrap_or_else(|| {
Arc::new(WriteBufferManagerImpl::new(
config.global_write_buffer_size.as_bytes() as usize,
))
});
let scheduler = Arc::new(LocalScheduler::new(config.max_background_jobs));
let cache_manager = Arc::new(CacheManager::new(
config.sst_meta_cache_size.as_bytes(),
config.vector_cache_size.as_bytes(),
config.page_cache_size.as_bytes(),
));
let write_cache = write_cache_from_config(&config, object_store_manager.clone()).await?;
let cache_manager = Arc::new(
CacheManager::builder()
.sst_meta_cache_size(config.sst_meta_cache_size.as_bytes())
.vector_cache_size(config.vector_cache_size.as_bytes())
.page_cache_size(config.page_cache_size.as_bytes())
.write_cache(write_cache)
.build(),
);
let workers = (0..config.num_workers)
.map(|id| {
@@ -239,11 +248,11 @@ impl WorkerGroup {
})
.collect();
WorkerGroup {
Ok(WorkerGroup {
workers,
scheduler,
cache_manager,
}
})
}
}
@@ -251,6 +260,26 @@ fn value_to_index(value: usize, num_workers: usize) -> usize {
value % num_workers
}
async fn write_cache_from_config(
config: &MitoConfig,
object_store_manager: ObjectStoreManagerRef,
) -> Result<Option<WriteCacheRef>> {
if !config.enable_experimental_write_cache {
return Ok(None);
}
// TODO(yingwen): Remove this and document the config once the write cache is ready.
warn!("Write cache is an experimental feature");
let cache = WriteCache::new_fs(
&config.experimental_write_cache_path,
object_store_manager,
config.experimental_write_cache_size,
)
.await?;
Ok(Some(Arc::new(cache)))
}
/// Worker start config.
struct WorkerStarter<S> {
id: WorkerId,

View File

@@ -13,8 +13,13 @@
// limitations under the License.
use futures::TryStreamExt;
use opendal::layers::{LoggingLayer, TracingLayer};
use opendal::{Entry, Lister};
use crate::layers::PrometheusMetricsLayer;
use crate::ObjectStore;
/// Collect all entries from the [Lister].
pub async fn collect(stream: Lister) -> Result<Vec<Entry>, opendal::Error> {
stream.try_collect::<Vec<_>>().await
}
@@ -52,6 +57,20 @@ pub fn join_path(parent: &str, child: &str) -> String {
opendal::raw::normalize_path(&output)
}
/// Attaches instrument layers to the object store.
pub fn with_instrument_layers(object_store: ObjectStore) -> ObjectStore {
object_store
.layer(
LoggingLayer::default()
// Print the expected error only in DEBUG level.
// See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level
.with_error_level(Some("debug"))
.expect("input error level must be valid"),
)
.layer(TracingLayer)
.layer(PrometheusMetricsLayer)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -730,6 +730,9 @@ global_write_buffer_reject_size = "2GiB"
sst_meta_cache_size = "128MiB"
vector_cache_size = "512MiB"
page_cache_size = "512MiB"
enable_experimental_write_cache = false
experimental_write_cache_path = ""
experimental_write_cache_size = "512MiB"
sst_write_buffer_size = "8MiB"
parallel_scan_channel_size = 32