mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-10 07:12:54 +00:00
feat(metric-engine): add metadata region cache (#6657)
* feat(metric-engine): add metadata region cache Signed-off-by: WenyXu <wenymedia@gmail.com> * feat: use lru Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: rename Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: rename Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: add comments Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: default ttl Signed-off-by: WenyXu <wenymedia@gmail.com> * chore: longer ttl Signed-off-by: WenyXu <wenymedia@gmail.com> --------- Signed-off-by: WenyXu <wenymedia@gmail.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7493,6 +7493,7 @@ dependencies = [
|
||||
"lazy_static",
|
||||
"mito-codec",
|
||||
"mito2",
|
||||
"moka",
|
||||
"mur3",
|
||||
"object-store",
|
||||
"prometheus",
|
||||
|
||||
@@ -29,6 +29,7 @@ itertools.workspace = true
|
||||
lazy_static = "1.4"
|
||||
mito-codec.workspace = true
|
||||
mito2.workspace = true
|
||||
moka.workspace = true
|
||||
mur3 = "0.1"
|
||||
object-store.workspace = true
|
||||
prometheus.workspace = true
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::any::Any;
|
||||
use std::sync::Arc;
|
||||
|
||||
use common_error::ext::{BoxedError, ErrorExt};
|
||||
use common_error::status_code::StatusCode;
|
||||
@@ -304,6 +305,13 @@ pub enum Error {
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
|
||||
#[snafu(display("Get value from cache"))]
|
||||
CacheGet {
|
||||
source: Arc<Error>,
|
||||
#[snafu(implicit)]
|
||||
location: Location,
|
||||
},
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
@@ -362,6 +370,8 @@ impl ErrorExt for Error {
|
||||
StartRepeatedTask { source, .. } => source.status_code(),
|
||||
|
||||
MetricManifestInfo { .. } => StatusCode::Internal,
|
||||
|
||||
CacheGet { source, .. } => source.status_code(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,19 +13,23 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use api::v1::value::ValueData;
|
||||
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
|
||||
use async_stream::try_stream;
|
||||
use base64::engine::general_purpose::STANDARD_NO_PAD;
|
||||
use base64::Engine;
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
|
||||
use datafusion::prelude::{col, lit};
|
||||
use futures_util::stream::BoxStream;
|
||||
use futures_util::TryStreamExt;
|
||||
use mito2::engine::MitoEngine;
|
||||
use moka::future::Cache;
|
||||
use moka::policy::EvictionPolicy;
|
||||
use snafu::{OptionExt, ResultExt};
|
||||
use store_api::metadata::ColumnMetadata;
|
||||
use store_api::metric_engine_consts::{
|
||||
@@ -39,9 +43,9 @@ use store_api::storage::{RegionId, ScanRequest};
|
||||
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
|
||||
|
||||
use crate::error::{
|
||||
CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu,
|
||||
LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu,
|
||||
ParseRegionIdSnafu, Result,
|
||||
CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
|
||||
DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
|
||||
MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
|
||||
};
|
||||
use crate::utils;
|
||||
|
||||
@@ -62,6 +66,11 @@ const COLUMN_PREFIX: &str = "__column_";
|
||||
/// itself.
|
||||
pub struct MetadataRegion {
|
||||
pub(crate) mito: MitoEngine,
|
||||
/// The cache for contents(key-value pairs) of region metadata.
|
||||
///
|
||||
/// The cache should be invalidated when any new values are put into the metadata region or any
|
||||
/// values are deleted from the metadata region.
|
||||
cache: Cache<RegionId, RegionMetadataCacheEntry>,
|
||||
/// Logical lock for operations that need to be serialized. Like update & read region columns.
|
||||
///
|
||||
/// Region entry will be registered on creating and opening logical region, and deregistered on
|
||||
@@ -69,10 +78,30 @@ pub struct MetadataRegion {
|
||||
logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct RegionMetadataCacheEntry {
|
||||
key_values: Arc<BTreeMap<String, String>>,
|
||||
size: usize,
|
||||
}
|
||||
|
||||
/// The max size of the region metadata cache.
|
||||
const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
|
||||
/// The TTL of the region metadata cache.
|
||||
const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
|
||||
|
||||
impl MetadataRegion {
|
||||
pub fn new(mito: MitoEngine) -> Self {
|
||||
let cache = Cache::builder()
|
||||
.max_capacity(MAX_CACHE_SIZE)
|
||||
// Use the LRU eviction policy to minimize frequent mito scans.
|
||||
// Recently accessed items are retained longer in the cache.
|
||||
.eviction_policy(EvictionPolicy::lru())
|
||||
.time_to_live(CACHE_TTL)
|
||||
.weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
|
||||
.build();
|
||||
Self {
|
||||
mito,
|
||||
cache,
|
||||
logical_region_lock: RwLock::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
@@ -351,21 +380,60 @@ impl MetadataRegion {
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_all_with_prefix(
|
||||
&self,
|
||||
region_id: RegionId,
|
||||
prefix: &str,
|
||||
) -> Result<HashMap<String, String>> {
|
||||
let scan_req = MetadataRegion::build_prefix_read_request(prefix, false);
|
||||
fn build_read_request() -> ScanRequest {
|
||||
let projection = vec![
|
||||
METADATA_SCHEMA_KEY_COLUMN_INDEX,
|
||||
METADATA_SCHEMA_VALUE_COLUMN_INDEX,
|
||||
];
|
||||
ScanRequest {
|
||||
projection: Some(projection),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
|
||||
let scan_req = MetadataRegion::build_read_request();
|
||||
let record_batch_stream = self
|
||||
.mito
|
||||
.scan_to_stream(region_id, scan_req)
|
||||
.scan_to_stream(metadata_region_id, scan_req)
|
||||
.await
|
||||
.context(MitoReadOperationSnafu)?;
|
||||
|
||||
decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
|
||||
.try_collect::<HashMap<_, _>>()
|
||||
let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
|
||||
.try_collect::<BTreeMap<_, _>>()
|
||||
.await?;
|
||||
let mut size = 0;
|
||||
for (k, v) in kv.iter() {
|
||||
size += k.len();
|
||||
size += v.len();
|
||||
}
|
||||
let kv = Arc::new(kv);
|
||||
Ok(RegionMetadataCacheEntry {
|
||||
key_values: kv,
|
||||
size,
|
||||
})
|
||||
}
|
||||
|
||||
async fn get_all_with_prefix(
|
||||
&self,
|
||||
metadata_region_id: RegionId,
|
||||
prefix: &str,
|
||||
) -> Result<HashMap<String, String>> {
|
||||
let region_metadata = self
|
||||
.cache
|
||||
.try_get_with(metadata_region_id, self.load_all(metadata_region_id))
|
||||
.await
|
||||
.context(CacheGetSnafu)?;
|
||||
|
||||
let range = region_metadata.key_values.range(prefix.to_string()..);
|
||||
let mut result = HashMap::new();
|
||||
for (k, v) in range {
|
||||
if !k.starts_with(prefix) {
|
||||
break;
|
||||
}
|
||||
result.insert(k.to_string(), v.to_string());
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn get_all_key_with_prefix(
|
||||
@@ -387,15 +455,18 @@ impl MetadataRegion {
|
||||
|
||||
/// Delete the given keys. For performance consideration, this method
|
||||
/// doesn't check if those keys exist or not.
|
||||
async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> {
|
||||
async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
|
||||
let delete_request = Self::build_delete_request(keys);
|
||||
self.mito
|
||||
.handle_request(
|
||||
region_id,
|
||||
metadata_region_id,
|
||||
store_api::region_request::RegionRequest::Delete(delete_request),
|
||||
)
|
||||
.await
|
||||
.context(MitoWriteOperationSnafu)?;
|
||||
// Invalidates the region metadata cache if any values are deleted from the metadata region.
|
||||
self.cache.invalidate(&metadata_region_id).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -485,7 +556,7 @@ impl MetadataRegion {
|
||||
write_region_id: bool,
|
||||
logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
|
||||
) -> Result<()> {
|
||||
let region_id = utils::to_metadata_region_id(physical_region_id);
|
||||
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
|
||||
let iter = logical_regions
|
||||
.into_iter()
|
||||
.flat_map(|(logical_region_id, column_metadatas)| {
|
||||
@@ -512,11 +583,13 @@ impl MetadataRegion {
|
||||
let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
|
||||
self.mito
|
||||
.handle_request(
|
||||
region_id,
|
||||
metadata_region_id,
|
||||
store_api::region_request::RegionRequest::Put(put_request),
|
||||
)
|
||||
.await
|
||||
.context(MitoWriteOperationSnafu)?;
|
||||
// Invalidates the region metadata cache if any new values are put into the metadata region.
|
||||
self.cache.invalidate(&metadata_region_id).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user