feat(metric-engine): add metadata region cache

feat: use lru

Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
WenyXu
2025-08-05 16:24:05 +00:00
parent f44816cc15
commit b363c044f9
4 changed files with 86 additions and 12 deletions

1
Cargo.lock generated
View File

@@ -7275,6 +7275,7 @@ dependencies = [
"lazy_static",
"mito-codec",
"mito2",
"moka",
"mur3",
"object-store",
"prometheus",

View File

@@ -29,6 +29,7 @@ itertools.workspace = true
lazy_static = "1.4"
mito-codec.workspace = true
mito2.workspace = true
moka.workspace = true
mur3 = "0.1"
object-store.workspace = true
prometheus.workspace = true

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::any::Any;
use std::sync::Arc;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
@@ -304,6 +305,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Get value from cache"))]
CacheGet {
source: Arc<Error>,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -362,6 +370,8 @@ impl ErrorExt for Error {
StartRepeatedTask { source, .. } => source.status_code(),
MetricManifestInfo { .. } => StatusCode::Internal,
CacheGet { source, .. } => source.status_code(),
}
}

View File

@@ -13,19 +13,23 @@
// limitations under the License.
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use std::time::Duration;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use async_stream::try_stream;
use base64::engine::general_purpose::STANDARD_NO_PAD;
use base64::Engine;
use common_base::readable_size::ReadableSize;
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
use datafusion::prelude::{col, lit};
use futures_util::stream::BoxStream;
use futures_util::TryStreamExt;
use mito2::engine::MitoEngine;
use moka::future::Cache;
use moka::policy::EvictionPolicy;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
@@ -39,9 +43,9 @@ use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use crate::error::{
CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu,
LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu,
ParseRegionIdSnafu, Result,
CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
};
use crate::utils;
@@ -62,6 +66,7 @@ const COLUMN_PREFIX: &str = "__column_";
/// itself.
pub struct MetadataRegion {
pub(crate) mito: MitoEngine,
cache: Cache<RegionId, RegionMetadataCacheEntry>,
/// Logical lock for operations that need to be serialized. Like update & read region columns.
///
/// Region entry will be registered on creating and opening logical region, and deregistered on
@@ -69,10 +74,25 @@ pub struct MetadataRegion {
logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
}
#[derive(Clone)]
struct RegionMetadataCacheEntry {
key_values: Arc<BTreeMap<String, String>>,
size: usize,
}
const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
impl MetadataRegion {
pub fn new(mito: MitoEngine) -> Self {
let cache = Cache::builder()
.max_capacity(MAX_CACHE_SIZE)
.eviction_policy(EvictionPolicy::lru())
.time_to_live(Duration::from_secs(60))
.weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
.build();
Self {
mito,
cache,
logical_region_lock: RwLock::new(HashMap::new()),
}
}
@@ -351,21 +371,60 @@ impl MetadataRegion {
}
}
pub async fn get_all_with_prefix(
&self,
region_id: RegionId,
prefix: &str,
) -> Result<HashMap<String, String>> {
let scan_req = MetadataRegion::build_prefix_read_request(prefix, false);
fn build_read_request() -> ScanRequest {
let projection = vec![
METADATA_SCHEMA_KEY_COLUMN_INDEX,
METADATA_SCHEMA_VALUE_COLUMN_INDEX,
];
ScanRequest {
projection: Some(projection),
..Default::default()
}
}
async fn load_all(&self, region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
let scan_req = MetadataRegion::build_read_request();
let record_batch_stream = self
.mito
.scan_to_stream(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
.try_collect::<HashMap<_, _>>()
let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
.try_collect::<BTreeMap<_, _>>()
.await?;
let mut size = 0;
for (k, v) in kv.iter() {
size += k.len();
size += v.len();
}
let kv = Arc::new(kv);
Ok(RegionMetadataCacheEntry {
key_values: kv,
size,
})
}
async fn get_all_with_prefix(
&self,
region_id: RegionId,
prefix: &str,
) -> Result<HashMap<String, String>> {
let region_metadata = self
.cache
.try_get_with(region_id, self.load_all(region_id))
.await
.context(CacheGetSnafu)?;
let range = region_metadata.key_values.range(prefix.to_string()..);
let mut result = HashMap::new();
for (k, v) in range {
if !k.starts_with(prefix) {
break;
}
result.insert(k.to_string(), v.to_string());
}
Ok(result)
}
pub async fn get_all_key_with_prefix(
@@ -396,6 +455,8 @@ impl MetadataRegion {
)
.await
.context(MitoWriteOperationSnafu)?;
self.cache.invalidate(&region_id).await;
Ok(())
}
@@ -517,6 +578,7 @@ impl MetadataRegion {
)
.await
.context(MitoWriteOperationSnafu)?;
self.cache.invalidate(&region_id).await;
Ok(())
}