diff --git a/Cargo.lock b/Cargo.lock index 66f1404760..73a28f1eed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7493,6 +7493,7 @@ dependencies = [ "lazy_static", "mito-codec", "mito2", + "moka", "mur3", "object-store", "prometheus", diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index ec3f77a32c..5c750499e2 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -29,6 +29,7 @@ itertools.workspace = true lazy_static = "1.4" mito-codec.workspace = true mito2.workspace = true +moka.workspace = true mur3 = "0.1" object-store.workspace = true prometheus.workspace = true diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index d4fcb4e5b2..91881b5624 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::sync::Arc; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; @@ -304,6 +305,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Get value from cache"))] + CacheGet { + source: Arc, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -362,6 +370,8 @@ impl ErrorExt for Error { StartRepeatedTask { source, .. } => source.status_code(), MetricManifestInfo { .. } => StatusCode::Internal, + + CacheGet { source, .. } => source.status_code(), } } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 5e6532517a..c4e4878d0f 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -13,19 +13,23 @@ // limitations under the License. use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use std::time::Duration; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use async_stream::try_stream; use base64::engine::general_purpose::STANDARD_NO_PAD; use base64::Engine; +use common_base::readable_size::ReadableSize; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion::prelude::{col, lit}; use futures_util::stream::BoxStream; use futures_util::TryStreamExt; use mito2::engine::MitoEngine; +use moka::future::Cache; +use moka::policy::EvictionPolicy; use snafu::{OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ @@ -39,9 +43,9 @@ use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; use crate::error::{ - CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu, - LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, - ParseRegionIdSnafu, Result, + CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, + DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, + MitoWriteOperationSnafu, ParseRegionIdSnafu, Result, }; use crate::utils; @@ -62,6 +66,11 @@ const COLUMN_PREFIX: &str = "__column_"; /// itself. pub struct MetadataRegion { pub(crate) mito: MitoEngine, + /// The cache for contents(key-value pairs) of region metadata. + /// + /// The cache should be invalidated when any new values are put into the metadata region or any + /// values are deleted from the metadata region. + cache: Cache, /// Logical lock for operations that need to be serialized. Like update & read region columns. /// /// Region entry will be registered on creating and opening logical region, and deregistered on @@ -69,10 +78,30 @@ pub struct MetadataRegion { logical_region_lock: RwLock>>>, } +#[derive(Clone)] +struct RegionMetadataCacheEntry { + key_values: Arc>, + size: usize, +} + +/// The max size of the region metadata cache. +const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes(); +/// The TTL of the region metadata cache. +const CACHE_TTL: Duration = Duration::from_secs(5 * 60); + impl MetadataRegion { pub fn new(mito: MitoEngine) -> Self { + let cache = Cache::builder() + .max_capacity(MAX_CACHE_SIZE) + // Use the LRU eviction policy to minimize frequent mito scans. + // Recently accessed items are retained longer in the cache. + .eviction_policy(EvictionPolicy::lru()) + .time_to_live(CACHE_TTL) + .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32) + .build(); Self { mito, + cache, logical_region_lock: RwLock::new(HashMap::new()), } } @@ -351,21 +380,60 @@ impl MetadataRegion { } } - pub async fn get_all_with_prefix( - &self, - region_id: RegionId, - prefix: &str, - ) -> Result> { - let scan_req = MetadataRegion::build_prefix_read_request(prefix, false); + fn build_read_request() -> ScanRequest { + let projection = vec![ + METADATA_SCHEMA_KEY_COLUMN_INDEX, + METADATA_SCHEMA_VALUE_COLUMN_INDEX, + ]; + ScanRequest { + projection: Some(projection), + ..Default::default() + } + } + + async fn load_all(&self, metadata_region_id: RegionId) -> Result { + let scan_req = MetadataRegion::build_read_request(); let record_batch_stream = self .mito - .scan_to_stream(region_id, scan_req) + .scan_to_stream(metadata_region_id, scan_req) .await .context(MitoReadOperationSnafu)?; - decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value) - .try_collect::>() + let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value) + .try_collect::>() + .await?; + let mut size = 0; + for (k, v) in kv.iter() { + size += k.len(); + size += v.len(); + } + let kv = Arc::new(kv); + Ok(RegionMetadataCacheEntry { + key_values: kv, + size, + }) + } + + async fn get_all_with_prefix( + &self, + metadata_region_id: RegionId, + prefix: &str, + ) -> Result> { + let region_metadata = self + .cache + .try_get_with(metadata_region_id, self.load_all(metadata_region_id)) .await + .context(CacheGetSnafu)?; + + let range = region_metadata.key_values.range(prefix.to_string()..); + let mut result = HashMap::new(); + for (k, v) in range { + if !k.starts_with(prefix) { + break; + } + result.insert(k.to_string(), v.to_string()); + } + Ok(result) } pub async fn get_all_key_with_prefix( @@ -387,15 +455,18 @@ impl MetadataRegion { /// Delete the given keys. For performance consideration, this method /// doesn't check if those keys exist or not. - async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> { + async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> { let delete_request = Self::build_delete_request(keys); self.mito .handle_request( - region_id, + metadata_region_id, store_api::region_request::RegionRequest::Delete(delete_request), ) .await .context(MitoWriteOperationSnafu)?; + // Invalidates the region metadata cache if any values are deleted from the metadata region. + self.cache.invalidate(&metadata_region_id).await; + Ok(()) } @@ -485,7 +556,7 @@ impl MetadataRegion { write_region_id: bool, logical_regions: impl Iterator)>, ) -> Result<()> { - let region_id = utils::to_metadata_region_id(physical_region_id); + let metadata_region_id = utils::to_metadata_region_id(physical_region_id); let iter = logical_regions .into_iter() .flat_map(|(logical_region_id, column_metadatas)| { @@ -512,11 +583,13 @@ impl MetadataRegion { let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter()); self.mito .handle_request( - region_id, + metadata_region_id, store_api::region_request::RegionRequest::Put(put_request), ) .await .context(MitoWriteOperationSnafu)?; + // Invalidates the region metadata cache if any new values are put into the metadata region. + self.cache.invalidate(&metadata_region_id).await; Ok(()) }