Compare commits

...

1 Commits

Author SHA1 Message Date
Ruihang Xia
1b7ab2957b feat: cache logical region's metadata
Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
2024-10-12 16:16:25 +08:00
4 changed files with 105 additions and 18 deletions

View File

@@ -64,15 +64,19 @@ impl MetricEngineInner {
/// Return the physical region id behind this logical region /// Return the physical region id behind this logical region
async fn alter_logical_region( async fn alter_logical_region(
&self, &self,
region_id: RegionId, logical_region_id: RegionId,
request: RegionAlterRequest, request: RegionAlterRequest,
) -> Result<RegionId> { ) -> Result<RegionId> {
let physical_region_id = { let physical_region_id = {
let state = &self.state.read().unwrap(); let state = &self.state.read().unwrap();
state.get_physical_region_id(region_id).with_context(|| { state
error!("Trying to alter an nonexistent region {region_id}"); .get_physical_region_id(logical_region_id)
LogicalRegionNotFoundSnafu { region_id } .with_context(|| {
})? error!("Trying to alter an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})?
}; };
// only handle adding column // only handle adding column
@@ -87,7 +91,7 @@ impl MetricEngineInner {
.metadata_region .metadata_region
.column_semantic_type( .column_semantic_type(
metadata_region_id, metadata_region_id,
region_id, logical_region_id,
&col.column_metadata.column_schema.name, &col.column_metadata.column_schema.name,
) )
.await? .await?
@@ -102,7 +106,7 @@ impl MetricEngineInner {
self.add_columns_to_physical_data_region( self.add_columns_to_physical_data_region(
data_region_id, data_region_id,
metadata_region_id, metadata_region_id,
region_id, logical_region_id,
columns_to_add, columns_to_add,
) )
.await?; .await?;
@@ -110,10 +114,16 @@ impl MetricEngineInner {
// register columns to logical region // register columns to logical region
for col in columns { for col in columns {
self.metadata_region self.metadata_region
.add_column(metadata_region_id, region_id, &col.column_metadata) .add_column(metadata_region_id, logical_region_id, &col.column_metadata)
.await?; .await?;
} }
// invalid logical column cache
self.state
.write()
.unwrap()
.invalid_logical_column_cache(logical_region_id);
Ok(physical_region_id) Ok(physical_region_id)
} }

View File

@@ -169,11 +169,11 @@ impl MetricEngineInner {
) -> Result<Vec<usize>> { ) -> Result<Vec<usize>> {
// project on logical columns // project on logical columns
let all_logical_columns = self let all_logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id) .load_logical_column_names(physical_region_id, logical_region_id)
.await?; .await?;
let projected_logical_names = origin_projection let projected_logical_names = origin_projection
.iter() .iter()
.map(|i| all_logical_columns[*i].column_schema.name.clone()) .map(|i| all_logical_columns[*i].clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// generate physical projection // generate physical projection
@@ -200,10 +200,8 @@ impl MetricEngineInner {
logical_region_id: RegionId, logical_region_id: RegionId,
) -> Result<Vec<usize>> { ) -> Result<Vec<usize>> {
let logical_columns = self let logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id) .load_logical_column_names(physical_region_id, logical_region_id)
.await? .await?;
.into_iter()
.map(|col| col.column_schema.name);
let mut projection = Vec::with_capacity(logical_columns.len()); let mut projection = Vec::with_capacity(logical_columns.len());
let data_region_id = utils::to_data_region_id(physical_region_id); let data_region_id = utils::to_data_region_id(physical_region_id);
let physical_metadata = self let physical_metadata = self

View File

@@ -23,13 +23,25 @@ use crate::error::Result;
impl MetricEngineInner { impl MetricEngineInner {
/// Load column metadata of a logical region. /// Load column metadata of a logical region.
/// ///
/// The return value is ordered on [ColumnId]. /// The return value is ordered on column name.
pub async fn load_logical_columns( pub async fn load_logical_columns(
&self, &self,
physical_region_id: RegionId, physical_region_id: RegionId,
logical_region_id: RegionId, logical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> { ) -> Result<Vec<ColumnMetadata>> {
// load logical and physical columns, and intersect them to get logical column metadata // First try to load from state cache
if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
return Ok(columns.clone());
}
// Else load from metadata region and update the cache.
// Load logical and physical columns, and intersect them to get logical column metadata.
let mut logical_column_metadata = self let mut logical_column_metadata = self
.metadata_region .metadata_region
.logical_columns(physical_region_id, logical_region_id) .logical_columns(physical_region_id, logical_region_id)
@@ -37,11 +49,48 @@ impl MetricEngineInner {
.into_iter() .into_iter()
.map(|(_, column_metadata)| column_metadata) .map(|(_, column_metadata)| column_metadata)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
// Sort columns on column name to ensure the order
// sort columns on column id to ensure the order
logical_column_metadata logical_column_metadata
.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name)); .sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));
// Update cache
self.state
.write()
.unwrap()
.add_logical_columns(logical_region_id, logical_column_metadata.clone());
Ok(logical_column_metadata) Ok(logical_column_metadata)
} }
/// Load logical column names of a logical region.
///
/// The return value is ordered on column name alphabetically.
pub async fn load_logical_column_names(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<String>> {
// First try to load from state cache
if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
return Ok(columns
.iter()
.map(|c| c.column_schema.name.clone())
.collect());
}
// Else load from metadata region
let columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.map(|c| c.column_schema.name)
.collect::<Vec<_>>();
Ok(columns)
}
} }

View File

@@ -17,6 +17,7 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use snafu::OptionExt; use snafu::OptionExt;
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId; use store_api::storage::RegionId;
use crate::error::{PhysicalRegionNotFoundSnafu, Result}; use crate::error::{PhysicalRegionNotFoundSnafu, Result};
@@ -35,6 +36,10 @@ pub(crate) struct MetricEngineState {
/// Cache for the columns of physical regions. /// Cache for the columns of physical regions.
/// The region id in key is the data region id. /// The region id in key is the data region id.
physical_columns: HashMap<RegionId, HashSet<String>>, physical_columns: HashMap<RegionId, HashSet<String>>,
/// Cache for the column metadata of logical regions.
/// The column order is the same with the order in the metadata, which is
/// alphabetically ordered on column name.
logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
} }
impl MetricEngineState { impl MetricEngineState {
@@ -80,6 +85,21 @@ impl MetricEngineState {
.insert(logical_region_id, physical_region_id); .insert(logical_region_id, physical_region_id);
} }
/// Add and reorder logical columns.
///
/// Caller should make sure:
/// 1. there is no duplicate columns
/// 2. the column order is the same with the order in the metadata, which is
/// alphabetically ordered on column name.
pub fn add_logical_columns(
&mut self,
logical_region_id: RegionId,
new_columns: impl IntoIterator<Item = ColumnMetadata>,
) {
let columns = self.logical_columns.entry(logical_region_id).or_default();
columns.extend(new_columns);
}
pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> { pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
self.logical_regions.get(&logical_region_id).copied() self.logical_regions.get(&logical_region_id).copied()
} }
@@ -88,6 +108,10 @@ impl MetricEngineState {
&self.physical_columns &self.physical_columns
} }
pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
&self.logical_columns
}
pub fn physical_regions(&self) -> &HashMap<RegionId, HashSet<RegionId>> { pub fn physical_regions(&self) -> &HashMap<RegionId, HashSet<RegionId>> {
&self.physical_regions &self.physical_regions
} }
@@ -129,9 +153,15 @@ impl MetricEngineState {
.unwrap() // Safety: physical_region_id is got from physical_regions .unwrap() // Safety: physical_region_id is got from physical_regions
.remove(&logical_region_id); .remove(&logical_region_id);
self.logical_columns.remove(&logical_region_id);
Ok(()) Ok(())
} }
pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) {
self.logical_columns.remove(&logical_region_id);
}
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool { pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
self.logical_regions().contains_key(&logical_region_id) self.logical_regions().contains_key(&logical_region_id)
} }