diff --git a/Cargo.lock b/Cargo.lock index 72da8b122d..491fe43e65 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5333,6 +5333,7 @@ dependencies = [ "common-test-util", "futures", "lru 0.9.0", + "metrics", "opendal", "pin-project", "tokio", diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index d38a93e85a..79cf2ac5bd 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -9,6 +9,7 @@ lru = "0.9" async-trait = "0.1" bytes = "1.4" futures = { version = "0.3" } +metrics = "0.20" opendal = { version = "0.30", features = ["layers-tracing", "layers-metrics"] } pin-project = "1.0" tokio.workspace = true diff --git a/src/object-store/src/cache_policy.rs b/src/object-store/src/cache_policy.rs index 8965ebd53c..cb08a26d99 100644 --- a/src/object-store/src/cache_policy.rs +++ b/src/object-store/src/cache_policy.rs @@ -19,12 +19,18 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; use lru::LruCache; +use metrics::increment_counter; use opendal::ops::{OpDelete, OpList, OpRead, OpScan, OpWrite}; use opendal::raw::oio::{Read, Reader, Write}; use opendal::raw::{Accessor, Layer, LayeredAccessor, RpDelete, RpList, RpRead, RpScan, RpWrite}; use opendal::{ErrorKind, Result}; use tokio::sync::Mutex; +use crate::metrics::{ + OBJECT_STORE_LRU_CACHE_ERROR, OBJECT_STORE_LRU_CACHE_ERROR_KIND, OBJECT_STORE_LRU_CACHE_HIT, + OBJECT_STORE_LRU_CACHE_MISS, +}; + pub struct LruCacheLayer { cache: Arc, lru_cache: Arc>>, @@ -89,12 +95,16 @@ impl LayeredAccessor for LruCacheAccessor { match self.cache.read(&cache_path, OpRead::default()).await { Ok((rp, r)) => { + increment_counter!(OBJECT_STORE_LRU_CACHE_HIT); + // update lru when cache hit let mut lru_cache = lru_cache.lock().await; lru_cache.get_or_insert(cache_path.clone(), || ()); Ok(to_output_reader((rp, r))) } Err(err) if err.kind() == ErrorKind::NotFound => { + increment_counter!(OBJECT_STORE_LRU_CACHE_MISS); + let (rp, mut reader) = self.inner.read(&path, args.clone()).await?; let size = rp.clone().into_metadata().content_length(); let (_, mut writer) = self.cache.write(&cache_path, OpWrite::new()).await?; @@ -122,7 +132,10 @@ impl LayeredAccessor for LruCacheAccessor { Err(_) => return self.inner.read(&path, args).await.map(to_output_reader), } } - Err(_) => return self.inner.read(&path, args).await.map(to_output_reader), + Err(err) => { + increment_counter!(OBJECT_STORE_LRU_CACHE_ERROR, OBJECT_STORE_LRU_CACHE_ERROR_KIND => format!("{}", err.kind())); + return self.inner.read(&path, args).await.map(to_output_reader); + } } } diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 7cfbb71211..5e74aa0cec 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -20,5 +20,6 @@ pub use opendal::{ }; pub mod cache_policy; +mod metrics; pub mod test_util; pub mod util; diff --git a/src/object-store/src/metrics.rs b/src/object-store/src/metrics.rs new file mode 100644 index 0000000000..6a8f0ee189 --- /dev/null +++ b/src/object-store/src/metrics.rs @@ -0,0 +1,20 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! object-store metrics + +pub const OBJECT_STORE_LRU_CACHE_HIT: &str = "object_store.lru_cache.hit"; +pub const OBJECT_STORE_LRU_CACHE_MISS: &str = "object_store.lru_cache.miss"; +pub const OBJECT_STORE_LRU_CACHE_ERROR: &str = "object_store.lru_cache.error"; +pub const OBJECT_STORE_LRU_CACHE_ERROR_KIND: &str = "error"; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 820973996f..4efc2ac72c 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -16,7 +16,7 @@ use std::env; use std::sync::Arc; use anyhow::Result; -use common_telemetry::logging; +use common_telemetry::{logging, metric}; use common_test_util::temp_dir::create_temp_dir; use object_store::cache_policy::LruCacheLayer; use object_store::services::{Fs, S3}; @@ -185,6 +185,7 @@ async fn assert_cache_files( #[tokio::test] async fn test_object_store_cache_policy() -> Result<()> { common_telemetry::init_default_ut_logging(); + common_telemetry::init_default_metrics_recorder(); // create file storage let root_dir = create_temp_dir("test_fs_backend"); let store = OperatorBuilder::new( @@ -258,5 +259,11 @@ async fn test_object_store_cache_policy() -> Result<()> { ) .await?; + let handle = metric::try_handle().unwrap(); + let metric_text = handle.render(); + + assert!(metric_text.contains("object_store_lru_cache_hit")); + assert!(metric_text.contains("object_store_lru_cache_miss")); + Ok(()) }