diff --git a/Cargo.lock b/Cargo.lock index 88679b9262..8f4c4ae8a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7275,6 +7275,7 @@ dependencies = [ "lazy_static", "mito-codec", "mito2", + "moka", "mur3", "object-store", "prometheus", diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 18abcb0ddd..04cfcf0284 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -29,6 +29,7 @@ itertools.workspace = true lazy_static = "1.4" mito-codec.workspace = true mito2.workspace = true +moka.workspace = true mur3 = "0.1" object-store.workspace = true prometheus.workspace = true diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index d4fcb4e5b2..91881b5624 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::any::Any; +use std::sync::Arc; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; @@ -304,6 +305,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Get value from cache"))] + CacheGet { + source: Arc, + #[snafu(implicit)] + location: Location, + }, } pub type Result = std::result::Result; @@ -362,6 +370,8 @@ impl ErrorExt for Error { StartRepeatedTask { source, .. } => source.status_code(), MetricManifestInfo { .. } => StatusCode::Internal, + + CacheGet { source, .. } => source.status_code(), } } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 5e6532517a..1d577e2b0b 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -13,19 +13,23 @@ // limitations under the License. use std::collections::hash_map::Entry; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use std::time::Duration; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; use async_stream::try_stream; use base64::engine::general_purpose::STANDARD_NO_PAD; use base64::Engine; +use common_base::readable_size::ReadableSize; use common_recordbatch::{RecordBatch, SendableRecordBatchStream}; use datafusion::prelude::{col, lit}; use futures_util::stream::BoxStream; use futures_util::TryStreamExt; use mito2::engine::MitoEngine; +use moka::future::Cache; +use moka::policy::EvictionPolicy; use snafu::{OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ @@ -39,9 +43,9 @@ use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; use crate::error::{ - CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu, - LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, - ParseRegionIdSnafu, Result, + CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, + DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, + MitoWriteOperationSnafu, ParseRegionIdSnafu, Result, }; use crate::utils; @@ -62,6 +66,7 @@ const COLUMN_PREFIX: &str = "__column_"; /// itself. pub struct MetadataRegion { pub(crate) mito: MitoEngine, + cache: Cache, /// Logical lock for operations that need to be serialized. Like update & read region columns. /// /// Region entry will be registered on creating and opening logical region, and deregistered on @@ -69,10 +74,25 @@ pub struct MetadataRegion { logical_region_lock: RwLock>>>, } +#[derive(Clone)] +struct RegionMetadataCacheEntry { + key_values: Arc>, + size: usize, +} + +const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes(); + impl MetadataRegion { pub fn new(mito: MitoEngine) -> Self { + let cache = Cache::builder() + .max_capacity(MAX_CACHE_SIZE) + .eviction_policy(EvictionPolicy::lru()) + .time_to_live(Duration::from_secs(60)) + .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32) + .build(); Self { mito, + cache, logical_region_lock: RwLock::new(HashMap::new()), } } @@ -351,21 +371,60 @@ impl MetadataRegion { } } - pub async fn get_all_with_prefix( - &self, - region_id: RegionId, - prefix: &str, - ) -> Result> { - let scan_req = MetadataRegion::build_prefix_read_request(prefix, false); + fn build_read_request() -> ScanRequest { + let projection = vec![ + METADATA_SCHEMA_KEY_COLUMN_INDEX, + METADATA_SCHEMA_VALUE_COLUMN_INDEX, + ]; + ScanRequest { + projection: Some(projection), + ..Default::default() + } + } + + async fn load_all(&self, region_id: RegionId) -> Result { + let scan_req = MetadataRegion::build_read_request(); let record_batch_stream = self .mito .scan_to_stream(region_id, scan_req) .await .context(MitoReadOperationSnafu)?; - decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value) - .try_collect::>() + let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value) + .try_collect::>() + .await?; + let mut size = 0; + for (k, v) in kv.iter() { + size += k.len(); + size += v.len(); + } + let kv = Arc::new(kv); + Ok(RegionMetadataCacheEntry { + key_values: kv, + size, + }) + } + + async fn get_all_with_prefix( + &self, + region_id: RegionId, + prefix: &str, + ) -> Result> { + let region_metadata = self + .cache + .try_get_with(region_id, self.load_all(region_id)) .await + .context(CacheGetSnafu)?; + + let range = region_metadata.key_values.range(prefix.to_string()..); + let mut result = HashMap::new(); + for (k, v) in range { + if !k.starts_with(prefix) { + break; + } + result.insert(k.to_string(), v.to_string()); + } + Ok(result) } pub async fn get_all_key_with_prefix( @@ -396,6 +455,8 @@ impl MetadataRegion { ) .await .context(MitoWriteOperationSnafu)?; + self.cache.invalidate(®ion_id).await; + Ok(()) } @@ -517,6 +578,7 @@ impl MetadataRegion { ) .await .context(MitoWriteOperationSnafu)?; + self.cache.invalidate(®ion_id).await; Ok(()) }