fix: correct write cache's metric labels (#5227)

* refactor: remove unused field in WriteCache

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refactor: unify read and write cache path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* update config and fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unnecessary methods and adapt test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change the default path

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove remote-home

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-12-25 15:26:21 +08:00
committed by Yingwen
parent 1111a8bd57
commit 6ca7a305ae
12 changed files with 49 additions and 67 deletions

View File

@@ -28,7 +28,7 @@ use common_telemetry::{info, warn};
use object_store::layers::{LruCacheLayer, RetryInterceptor, RetryLayer};
use object_store::services::Fs;
use object_store::util::{join_dir, normalize_dir, with_instrument_layers};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder, OBJECT_CACHE_DIR};
use object_store::{Access, Error, HttpClient, ObjectStore, ObjectStoreBuilder};
use snafu::prelude::*;
use crate::config::{HttpClientConfig, ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE};
@@ -147,12 +147,10 @@ async fn build_cache_layer(
};
// Enable object cache by default
// Set the cache_path to be `${data_home}/object_cache/read/{name}` by default
// Set the cache_path to be `${data_home}` by default
// if it's not present
if cache_path.is_none() {
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
let read_cache_path = join_dir(&object_cache_path, "read");
let read_cache_path = join_dir(&read_cache_path, &name.to_lowercase());
let read_cache_path = data_home.to_string();
tokio::fs::create_dir_all(Path::new(&read_cache_path))
.await
.context(CreateDirSnafu {

View File

@@ -37,8 +37,10 @@ use crate::sst::file::FileId;
use crate::sst::parquet::helper::fetch_byte_ranges;
use crate::sst::parquet::metadata::MetadataLoader;
/// Subdirectory of cached files.
const FILE_DIR: &str = "files/";
/// Subdirectory of cached files for write.
///
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
const FILE_DIR: &str = "cache/object/write/";
/// A file cache manages files on local store and evict files based
/// on size.

View File

@@ -20,7 +20,6 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::{debug, info};
use futures::AsyncWriteExt;
use object_store::manager::ObjectStoreManagerRef;
use object_store::ObjectStore;
use snafu::ResultExt;
@@ -44,10 +43,6 @@ use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
pub struct WriteCache {
/// Local file cache.
file_cache: FileCacheRef,
/// Object store manager.
#[allow(unused)]
/// TODO: Remove unused after implementing async write cache
object_store_manager: ObjectStoreManagerRef,
/// Puffin manager factory for index.
puffin_manager_factory: PuffinManagerFactory,
/// Intermediate manager for index.
@@ -61,7 +56,6 @@ impl WriteCache {
/// `object_store_manager` for all object stores.
pub async fn new(
local_store: ObjectStore,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
puffin_manager_factory: PuffinManagerFactory,
@@ -72,7 +66,6 @@ impl WriteCache {
Ok(Self {
file_cache,
object_store_manager,
puffin_manager_factory,
intermediate_manager,
})
@@ -81,7 +74,6 @@ impl WriteCache {
/// Creates a write cache based on local fs.
pub async fn new_fs(
cache_dir: &str,
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
puffin_manager_factory: PuffinManagerFactory,
@@ -92,7 +84,6 @@ impl WriteCache {
let local_store = new_fs_cache_store(cache_dir).await?;
Self::new(
local_store,
object_store_manager,
cache_capacity,
ttl,
puffin_manager_factory,

View File

@@ -20,8 +20,6 @@ use std::time::Duration;
use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use object_store::util::join_dir;
use object_store::OBJECT_CACHE_DIR;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
@@ -97,7 +95,7 @@ pub struct MitoConfig {
pub selector_result_cache_size: ReadableSize,
/// Whether to enable the experimental write cache.
pub enable_experimental_write_cache: bool,
/// File system path for write cache, defaults to `{data_home}/object_cache/write`.
/// File system path for write cache dir's root, defaults to `{data_home}`.
pub experimental_write_cache_path: String,
/// Capacity for write cache.
pub experimental_write_cache_size: ReadableSize,
@@ -234,8 +232,7 @@ impl MitoConfig {
// Sets write cache path if it is empty.
if self.experimental_write_cache_path.trim().is_empty() {
let object_cache_path = join_dir(data_home, OBJECT_CACHE_DIR);
self.experimental_write_cache_path = join_dir(&object_cache_path, "write");
self.experimental_write_cache_path = data_home.to_string();
}
self.index.sanitize(data_home, &self.inverted_index)?;

View File

@@ -644,16 +644,9 @@ impl TestEnv {
.unwrap();
let object_store_manager = self.get_object_store_manager().unwrap();
let write_cache = WriteCache::new(
local_store,
object_store_manager,
capacity,
None,
puffin_mgr,
intm_mgr,
)
.await
.unwrap();
let write_cache = WriteCache::new(local_store, capacity, None, puffin_mgr, intm_mgr)
.await
.unwrap();
Arc::new(write_cache)
}

View File

@@ -157,7 +157,6 @@ impl WorkerGroup {
let purge_scheduler = Arc::new(LocalScheduler::new(config.max_background_purges));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
puffin_manager_factory.clone(),
intermediate_manager.clone(),
)
@@ -303,7 +302,6 @@ impl WorkerGroup {
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
puffin_manager_factory.clone(),
intermediate_manager.clone(),
)
@@ -364,7 +362,6 @@ fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
async fn write_cache_from_config(
config: &MitoConfig,
object_store_manager: ObjectStoreManagerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Option<WriteCacheRef>> {
@@ -383,7 +380,6 @@ async fn write_cache_from_config(
let cache = WriteCache::new_fs(
&config.experimental_write_cache_path,
object_store_manager,
config.experimental_write_cache_size,
config.experimental_write_cache_ttl,
puffin_manager_factory,

View File

@@ -25,14 +25,14 @@ mod prometheus {
static PROMETHEUS_LAYER: OnceLock<Mutex<PrometheusLayer>> = OnceLock::new();
/// This logical tries to extract parent path from the object storage operation
/// the function also relies on assumption that the region path is built from
/// pattern `<data|index>/catalog/schema/table_id/...` OR `greptimedb/object_cache/<read|write>/...`
///
/// We'll get the data/catalog/schema from path.
pub fn build_prometheus_metrics_layer(with_path_label: bool) -> PrometheusLayer {
PROMETHEUS_LAYER
.get_or_init(|| {
// This logical tries to extract parent path from the object storage operation
// the function also relies on assumption that the region path is built from
// pattern `<data|index>/catalog/schema/table_id/....`
//
// We'll get the data/catalog/schema from path.
let path_level = if with_path_label { 3 } else { 0 };
let layer = PrometheusLayer::builder()

View File

@@ -117,9 +117,7 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let result = self.inner.write(path, args).await;
self.read_cache
.invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)))
.await;
self.read_cache.invalidate_entries_with_prefix(path);
result
}
@@ -127,9 +125,7 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
async fn delete(&self, path: &str, args: OpDelete) -> Result<RpDelete> {
let result = self.inner.delete(path, args).await;
self.read_cache
.invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)))
.await;
self.read_cache.invalidate_entries_with_prefix(path);
result
}
@@ -146,8 +142,7 @@ impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
let result = self.inner.blocking_write(path, args);
self.read_cache
.blocking_invalidate_entries_with_prefix(format!("{:x}", md5::compute(path)));
self.read_cache.invalidate_entries_with_prefix(path);
result
}

View File

@@ -20,7 +20,7 @@ use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result};
use opendal::{EntryMode, Error as OpendalError, ErrorKind, Metakey, OperatorBuilder, Result};
use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
@@ -28,6 +28,10 @@ use crate::metrics::{
};
const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
/// Subdirectory of cached files for read.
///
/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
const READ_CACHE_DIR: &str = "cache/object/read";
/// Cache value for read file
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
@@ -56,12 +60,20 @@ fn can_cache(path: &str) -> bool {
/// Generate a unique cache key for the read path and range.
fn read_cache_key(path: &str, args: &OpRead) -> String {
format!(
"{:x}.cache-{}",
"{READ_CACHE_DIR}/{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}
fn read_cache_root() -> String {
format!("/{READ_CACHE_DIR}/")
}
fn read_cache_key_prefix(path: &str) -> String {
format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
}
/// Local read cache for files in object storage
#[derive(Debug)]
pub(crate) struct ReadCache<C> {
@@ -125,16 +137,9 @@ impl<C: Access> ReadCache<C> {
(self.mem_cache.entry_count(), self.mem_cache.weighted_size())
}
/// Invalidate all cache items which key starts with `prefix`.
pub(crate) async fn invalidate_entries_with_prefix(&self, prefix: String) {
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
.ok();
}
/// Blocking version of `invalidate_entries_with_prefix`.
pub(crate) fn blocking_invalidate_entries_with_prefix(&self, prefix: String) {
/// Invalidate all cache items belong to the specific path.
pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) {
let prefix = read_cache_key_prefix(path);
// Safety: always ok when building cache with `support_invalidation_closures`.
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
@@ -145,8 +150,9 @@ impl<C: Access> ReadCache<C> {
/// Return entry count and total approximate entry size in bytes.
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let op = OperatorBuilder::new(self.file_cache.clone()).finish();
let root = read_cache_root();
let mut entries = op
.list_with("/")
.list_with(&root)
.metakey(Metakey::ContentLength | Metakey::ContentType)
.concurrent(RECOVER_CACHE_LIST_CONCURRENT)
.await?;
@@ -157,7 +163,7 @@ impl<C: Access> ReadCache<C> {
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
// ignore root path
if entry.path() != "/" {
if entry.metadata().mode() == EntryMode::FILE {
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;

View File

@@ -27,6 +27,9 @@ use opendal::raw::{Access, OpList, OpRead};
use opendal::services::{Azblob, Gcs, Oss};
use opendal::{EntryMode, OperatorBuilder};
/// Duplicate of the constant in `src/layers/lru_cache/read_cache.rs`
const READ_CACHE_DIR: &str = "cache/object/read";
async fn test_object_crud(store: &ObjectStore) -> Result<()> {
// Create object handler.
// Write data info object;
@@ -267,7 +270,8 @@ async fn test_file_backend_with_lru_cache() -> Result<()> {
async fn assert_lru_cache<C: Access>(cache_layer: &LruCacheLayer<C>, file_names: &[&str]) {
for file_name in file_names {
assert!(cache_layer.contains_file(file_name).await, "{file_name}");
let file_path = format!("{READ_CACHE_DIR}/{file_name}");
assert!(cache_layer.contains_file(&file_path).await, "{file_path:?}");
}
}