feat: Add metrics for cache hit/miss for object store cache (#1405)

* Add the cache hit/miss counter

* Verify the cache metrics are included

* Resolve comments

* Rename the error kind label name to be consistent with other metrics

* Rename the object store metric names

* Avoid using glob imports

* Format the code

* chore: Update src/object-store/src/metrics.rs mod doc

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Near
2023-04-18 14:08:19 +08:00
committed by GitHub
parent 0c88bb09e3
commit c6f024a171
6 changed files with 45 additions and 2 deletions

1
Cargo.lock generated
View File

@@ -5333,6 +5333,7 @@ dependencies = [
"common-test-util",
"futures",
"lru 0.9.0",
"metrics",
"opendal",
"pin-project",
"tokio",

View File

@@ -9,6 +9,7 @@ lru = "0.9"
async-trait = "0.1"
bytes = "1.4"
futures = { version = "0.3" }
metrics = "0.20"
opendal = { version = "0.30", features = ["layers-tracing", "layers-metrics"] }
pin-project = "1.0"
tokio.workspace = true

View File

@@ -19,12 +19,18 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
use lru::LruCache;
use metrics::increment_counter;
use opendal::ops::{OpDelete, OpList, OpRead, OpScan, OpWrite};
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{Accessor, Layer, LayeredAccessor, RpDelete, RpList, RpRead, RpScan, RpWrite};
use opendal::{ErrorKind, Result};
use tokio::sync::Mutex;
use crate::metrics::{
OBJECT_STORE_LRU_CACHE_ERROR, OBJECT_STORE_LRU_CACHE_ERROR_KIND, OBJECT_STORE_LRU_CACHE_HIT,
OBJECT_STORE_LRU_CACHE_MISS,
};
pub struct LruCacheLayer<C> {
cache: Arc<C>,
lru_cache: Arc<Mutex<LruCache<String, ()>>>,
@@ -89,12 +95,16 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
match self.cache.read(&cache_path, OpRead::default()).await {
Ok((rp, r)) => {
increment_counter!(OBJECT_STORE_LRU_CACHE_HIT);
// update lru when cache hit
let mut lru_cache = lru_cache.lock().await;
lru_cache.get_or_insert(cache_path.clone(), || ());
Ok(to_output_reader((rp, r)))
}
Err(err) if err.kind() == ErrorKind::NotFound => {
increment_counter!(OBJECT_STORE_LRU_CACHE_MISS);
let (rp, mut reader) = self.inner.read(&path, args.clone()).await?;
let size = rp.clone().into_metadata().content_length();
let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?;
@@ -122,7 +132,10 @@ impl<I: Accessor, C: Accessor> LayeredAccessor for LruCacheAccessor<I, C> {
Err(_) => return self.inner.read(&path, args).await.map(to_output_reader),
}
}
Err(_) => return self.inner.read(&path, args).await.map(to_output_reader),
Err(err) => {
increment_counter!(OBJECT_STORE_LRU_CACHE_ERROR, OBJECT_STORE_LRU_CACHE_ERROR_KIND => format!("{}", err.kind()));
return self.inner.read(&path, args).await.map(to_output_reader);
}
}
}

View File

@@ -20,5 +20,6 @@ pub use opendal::{
};
pub mod cache_policy;
mod metrics;
pub mod test_util;
pub mod util;

View File

@@ -0,0 +1,20 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! object-store metrics
pub const OBJECT_STORE_LRU_CACHE_HIT: &str = "object_store.lru_cache.hit";
pub const OBJECT_STORE_LRU_CACHE_MISS: &str = "object_store.lru_cache.miss";
pub const OBJECT_STORE_LRU_CACHE_ERROR: &str = "object_store.lru_cache.error";
pub const OBJECT_STORE_LRU_CACHE_ERROR_KIND: &str = "error";

View File

@@ -16,7 +16,7 @@ use std::env;
use std::sync::Arc;
use anyhow::Result;
use common_telemetry::logging;
use common_telemetry::{logging, metric};
use common_test_util::temp_dir::create_temp_dir;
use object_store::cache_policy::LruCacheLayer;
use object_store::services::{Fs, S3};
@@ -185,6 +185,7 @@ async fn assert_cache_files(
#[tokio::test]
async fn test_object_store_cache_policy() -> Result<()> {
common_telemetry::init_default_ut_logging();
common_telemetry::init_default_metrics_recorder();
// create file storage
let root_dir = create_temp_dir("test_fs_backend");
let store = OperatorBuilder::new(
@@ -258,5 +259,11 @@ async fn test_object_store_cache_policy() -> Result<()> {
)
.await?;
let handle = metric::try_handle().unwrap();
let metric_text = handle.render();
assert!(metric_text.contains("object_store_lru_cache_hit"));
assert!(metric_text.contains("object_store_lru_cache_miss"));
Ok(())
}