diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index a28e4f0426..e4868b0475 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -43,7 +43,7 @@ index.workspace = true lazy_static = "1.4" log-store = { workspace = true, optional = true } memcomparable = "0.2" -moka = { workspace = true, features = ["sync"] } +moka = { workspace = true, features = ["sync", "future"] } num_cpus = "1.13" object-store.workspace = true parquet = { workspace = true, features = ["async"] } diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 529e5d3d4e..cc02a2d037 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -15,6 +15,9 @@ //! Cache for the engine. mod cache_size; +// TODO(yingwen): Remove this after the write cache is ready. +#[allow(unused)] +pub(crate) mod file_cache; #[cfg(test)] pub(crate) mod test_util; @@ -38,6 +41,8 @@ const SST_META_TYPE: &str = "sst_meta"; const VECTOR_TYPE: &str = "vector"; // Metrics type key for pages. const PAGE_TYPE: &str = "page"; +// Metrics type key for files on the local store. +const FILE_TYPE: &str = "file"; /// Manages cached data for the engine. pub struct CacheManager { diff --git a/src/mito2/src/cache/file_cache.rs b/src/mito2/src/cache/file_cache.rs new file mode 100644 index 0000000000..25fd5d6d62 --- /dev/null +++ b/src/mito2/src/cache/file_cache.rs @@ -0,0 +1,407 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A cache for files. + +use std::time::Instant; + +use common_base::readable_size::ReadableSize; +use common_telemetry::{info, warn}; +use futures::{FutureExt, TryStreamExt}; +use moka::future::Cache; +use moka::notification::RemovalCause; +use object_store::util::{join_dir, join_path}; +use object_store::{ErrorKind, Metakey, ObjectStore, Reader}; +use snafu::ResultExt; +use store_api::storage::RegionId; + +use crate::cache::FILE_TYPE; +use crate::error::{OpenDalSnafu, Result}; +use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS}; +use crate::sst::file::FileId; + +/// Subdirectory of cached files. +const FILE_DIR: &str = "files"; + +/// A file cache manages files on local store and evict files based +/// on size. +#[derive(Debug)] +pub(crate) struct FileCache { + /// Local store to cache files. + local_store: ObjectStore, + /// Cached file directory under cache home. + file_dir: String, + /// Index to track cached files. + /// + /// File id is enough to identity a file uniquely. + memory_index: Cache, +} + +impl FileCache { + /// Creates a new file cache. + pub(crate) fn new( + local_store: ObjectStore, + cache_home: String, + capacity: ReadableSize, + ) -> FileCache { + // Stores files under `cache_home/{FILE_DIR}`. + let file_dir = cache_file_dir(&cache_home); + let cache_store = local_store.clone(); + let cache_file_dir = file_dir.clone(); + let memory_index = Cache::builder() + .weigher(|_key, value: &IndexValue| -> u32 { + // We only measure space on local store. + value.file_size + }) + .max_capacity(capacity.as_bytes()) + .async_eviction_listener(move |key, value, cause| { + let store = cache_store.clone(); + let file_path = cache_file_path(&cache_file_dir, *key); + async move { + if let RemovalCause::Replaced = cause { + // The cache is replaced by another file. This is unexpected, we don't remove the same + // file but updates the metrics as the file is already replaced by users. + CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); + warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.0); + return; + } + + match store.delete(&file_path).await { + Ok(()) => { + CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into()); + } + Err(e) => { + warn!(e; "Failed to delete cached file {} for region {}", file_path, key.0); + } + } + } + .boxed() + }) + .build(); + FileCache { + local_store, + file_dir, + memory_index, + } + } + + /// Puts a file into the cache index. + /// + /// The `WriteCache` should ensure the file is in the correct path. + pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) { + CACHE_BYTES + .with_label_values(&[FILE_TYPE]) + .add(value.file_size.into()); + self.memory_index.insert(key, value).await; + } + + /// Reads a file from the cache. + pub(crate) async fn reader(&self, key: IndexKey) -> Option { + if !self.memory_index.contains_key(&key) { + CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + return None; + } + + let file_path = self.cache_file_path(key); + match self.local_store.reader(&file_path).await { + Ok(reader) => { + CACHE_HIT.with_label_values(&[FILE_TYPE]).inc(); + Some(reader) + } + Err(e) => { + if e.kind() != ErrorKind::NotFound { + warn!("Failed to get file for key {:?}, err: {}", key, e); + } + // We removes the file from the index. + self.memory_index.remove(&key).await; + CACHE_MISS.with_label_values(&[FILE_TYPE]).inc(); + None + } + } + } + + /// Removes a file from the cache explicitly. + pub(crate) async fn remove(&self, key: IndexKey) { + let file_path = self.cache_file_path(key); + if let Err(e) = self.local_store.delete(&file_path).await { + warn!(e; "Failed to delete a cached file {}", file_path); + } + } + + /// Recovers the index from local store. + pub(crate) async fn recover(&self) -> Result<()> { + let now = Instant::now(); + + let mut lister = self + .local_store + .lister_with(&self.file_dir) + .metakey(Metakey::ContentLength) + .await + .context(OpenDalSnafu)?; + let (mut total_size, mut total_keys) = (0, 0); + while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? { + let meta = entry.metadata(); + if !meta.is_file() { + continue; + } + let Some(key) = parse_index_key(entry.name()) else { + continue; + }; + let file_size = meta.content_length() as u32; + self.memory_index + .insert(key, IndexValue { file_size }) + .await; + total_size += file_size; + total_keys += 1; + } + // The metrics is a signed int gauge so we can updates it finally. + CACHE_BYTES + .with_label_values(&[FILE_TYPE]) + .add(total_size.into()); + + info!( + "Recovered file cache, num_keys: {}, num_bytes: {}, cost: {:?}", + total_keys, + total_size, + now.elapsed() + ); + + Ok(()) + } + + /// Returns the cache file path for the key. + pub(crate) fn cache_file_path(&self, key: IndexKey) -> String { + cache_file_path(&self.file_dir, key) + } + + /// Returns the local store of the file cache. + pub(crate) fn local_store(&self) -> ObjectStore { + self.local_store.clone() + } +} + +/// Key of file cache index. +pub(crate) type IndexKey = (RegionId, FileId); + +/// An entity that describes the file in the file cache. +/// +/// It should only keep minimal information needed by the cache. +#[derive(Debug, Clone)] +pub(crate) struct IndexValue { + /// Size of the file in bytes. + file_size: u32, +} + +/// Returns the directory to store files. +fn cache_file_dir(cache_home: &str) -> String { + join_dir(cache_home, FILE_DIR) +} + +/// Generates the path to the cached file. +/// +/// The file name format is `{region_id}.{file_id}` +fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String { + join_path(cache_file_dir, &format!("{}.{}", key.0.as_u64(), key.1)) +} + +/// Parse index key from the file name. +fn parse_index_key(name: &str) -> Option { + let mut splited = name.splitn(2, '.'); + let region_id = splited.next().and_then(|s| { + let id = s.parse::().ok()?; + Some(RegionId::from_u64(id)) + })?; + let file_id = splited.next().and_then(|s| FileId::parse_str(s).ok())?; + + Some((region_id, file_id)) +} + +#[cfg(test)] +mod tests { + use common_test_util::temp_dir::create_temp_dir; + use futures::AsyncReadExt; + use object_store::services::Fs; + + use super::*; + + fn new_fs_store(path: &str) -> ObjectStore { + let mut builder = Fs::default(); + builder.root(path); + ObjectStore::new(builder).unwrap().finish() + } + + #[tokio::test] + async fn test_file_cache_basic() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let cache_home = "cache".to_string(); + + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + let region_id = RegionId::new(2000, 0); + let file_id = FileId::random(); + let key = (region_id, file_id); + let file_path = cache.cache_file_path(key); + + // Get an empty file. + assert!(cache.reader(key).await.is_none()); + + // Write a file. + local_store + .write(&file_path, b"hello".as_slice()) + .await + .unwrap(); + // Add to the cache. + cache + .put((region_id, file_id), IndexValue { file_size: 5 }) + .await; + + // Read file content. + let mut reader = cache.reader(key).await.unwrap(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).await.unwrap(); + assert_eq!("hello", buf); + + // Remove the file. + cache.remove(key).await; + assert!(cache.reader(key).await.is_none()); + + // Ensure all pending tasks of the moka cache is done before assertion. + cache.memory_index.run_pending_tasks().await; + + // The file also not exists. + assert!(!local_store.is_exist(&file_path).await.unwrap()); + } + + #[tokio::test] + async fn test_file_cache_file_removed() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let cache_home = "cache".to_string(); + + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + let region_id = RegionId::new(2000, 0); + let file_id = FileId::random(); + let key = (region_id, file_id); + let file_path = cache.cache_file_path(key); + + // Write a file. + local_store + .write(&file_path, b"hello".as_slice()) + .await + .unwrap(); + // Add to the cache. + cache + .put((region_id, file_id), IndexValue { file_size: 5 }) + .await; + + // Remove the file but keep the index. + local_store.delete(&file_path).await.unwrap(); + + // Reader is none. + assert!(cache.reader(key).await.is_none()); + // Key is removed. + assert!(!cache.memory_index.contains_key(&key)); + } + + #[tokio::test] + async fn test_file_cache_recover() { + let dir = create_temp_dir(""); + let local_store = new_fs_store(dir.path().to_str().unwrap()); + let cache_home = "cache".to_string(); + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + + let region_id = RegionId::new(2000, 0); + // Write N files. + let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect(); + for (i, file_id) in file_ids.iter().enumerate() { + let key = (region_id, *file_id); + let file_path = cache.cache_file_path(key); + let bytes = i.to_string().into_bytes(); + local_store.write(&file_path, bytes.clone()).await.unwrap(); + + // Add to the cache. + cache + .put( + (region_id, *file_id), + IndexValue { + file_size: bytes.len() as u32, + }, + ) + .await; + } + + // Recover the cache. + let cache = FileCache::new( + local_store.clone(), + cache_home.clone(), + ReadableSize::mb(10), + ); + // No entry before recovery. + assert!(cache.reader((region_id, file_ids[0])).await.is_none()); + cache.recover().await.unwrap(); + + for (i, file_id) in file_ids.iter().enumerate() { + let key = (region_id, *file_id); + let mut reader = cache.reader(key).await.unwrap(); + let mut buf = String::new(); + reader.read_to_string(&mut buf).await.unwrap(); + assert_eq!(i.to_string(), buf); + } + } + + #[test] + fn test_cache_file_path() { + let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); + assert_eq!( + "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095", + cache_file_path("test_dir", (RegionId::new(1234, 5), file_id)) + ); + assert_eq!( + "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095", + cache_file_path("test_dir/", (RegionId::new(1234, 5), file_id)) + ); + } + + #[test] + fn test_parse_file_name() { + let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap(); + let region_id = RegionId::new(1234, 5); + assert_eq!( + (region_id, file_id), + parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").unwrap() + ); + assert!(parse_index_key("").is_none()); + assert!(parse_index_key(".").is_none()); + assert!(parse_index_key("5299989643269").is_none()); + assert!(parse_index_key("5299989643269.").is_none()); + assert!(parse_index_key(".5299989643269").is_none()); + assert!(parse_index_key("5299989643269.").is_none()); + assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none()); + assert!( + parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").is_none() + ); + } +}