diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index c99942dec6..73518f003c 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -64,15 +64,19 @@ impl MetricEngineInner { /// Return the physical region id behind this logical region async fn alter_logical_region( &self, - region_id: RegionId, + logical_region_id: RegionId, request: RegionAlterRequest, ) -> Result { let physical_region_id = { let state = &self.state.read().unwrap(); - state.get_physical_region_id(region_id).with_context(|| { - error!("Trying to alter an nonexistent region {region_id}"); - LogicalRegionNotFoundSnafu { region_id } - })? + state + .get_physical_region_id(logical_region_id) + .with_context(|| { + error!("Trying to alter an nonexistent region {logical_region_id}"); + LogicalRegionNotFoundSnafu { + region_id: logical_region_id, + } + })? }; // only handle adding column @@ -87,7 +91,7 @@ impl MetricEngineInner { .metadata_region .column_semantic_type( metadata_region_id, - region_id, + logical_region_id, &col.column_metadata.column_schema.name, ) .await? @@ -102,7 +106,7 @@ impl MetricEngineInner { self.add_columns_to_physical_data_region( data_region_id, metadata_region_id, - region_id, + logical_region_id, columns_to_add, ) .await?; @@ -110,10 +114,16 @@ impl MetricEngineInner { // register columns to logical region for col in columns { self.metadata_region - .add_column(metadata_region_id, region_id, &col.column_metadata) + .add_column(metadata_region_id, logical_region_id, &col.column_metadata) .await?; } + // invalid logical column cache + self.state + .write() + .unwrap() + .invalid_logical_column_cache(logical_region_id); + Ok(physical_region_id) } diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index af01e1bb9e..2ba4c213c6 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -169,11 +169,11 @@ impl MetricEngineInner { ) -> Result> { // project on logical columns let all_logical_columns = self - .load_logical_columns(physical_region_id, logical_region_id) + .load_logical_column_names(physical_region_id, logical_region_id) .await?; let projected_logical_names = origin_projection .iter() - .map(|i| all_logical_columns[*i].column_schema.name.clone()) + .map(|i| all_logical_columns[*i].clone()) .collect::>(); // generate physical projection @@ -200,10 +200,8 @@ impl MetricEngineInner { logical_region_id: RegionId, ) -> Result> { let logical_columns = self - .load_logical_columns(physical_region_id, logical_region_id) - .await? - .into_iter() - .map(|col| col.column_schema.name); + .load_logical_column_names(physical_region_id, logical_region_id) + .await?; let mut projection = Vec::with_capacity(logical_columns.len()); let data_region_id = utils::to_data_region_id(physical_region_id); let physical_metadata = self diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs index 1fa33ec39d..25059bdbc3 100644 --- a/src/metric-engine/src/engine/region_metadata.rs +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -23,13 +23,25 @@ use crate::error::Result; impl MetricEngineInner { /// Load column metadata of a logical region. /// - /// The return value is ordered on [ColumnId]. + /// The return value is ordered on column name. pub async fn load_logical_columns( &self, physical_region_id: RegionId, logical_region_id: RegionId, ) -> Result> { - // load logical and physical columns, and intersect them to get logical column metadata + // First try to load from state cache + if let Some(columns) = self + .state + .read() + .unwrap() + .logical_columns() + .get(&logical_region_id) + { + return Ok(columns.clone()); + } + + // Else load from metadata region and update the cache. + // Load logical and physical columns, and intersect them to get logical column metadata. let mut logical_column_metadata = self .metadata_region .logical_columns(physical_region_id, logical_region_id) @@ -37,11 +49,48 @@ impl MetricEngineInner { .into_iter() .map(|(_, column_metadata)| column_metadata) .collect::>(); - - // sort columns on column id to ensure the order + // Sort columns on column name to ensure the order logical_column_metadata .sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name)); + // Update cache + self.state + .write() + .unwrap() + .add_logical_columns(logical_region_id, logical_column_metadata.clone()); Ok(logical_column_metadata) } + + /// Load logical column names of a logical region. + /// + /// The return value is ordered on column name alphabetically. + pub async fn load_logical_column_names( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) -> Result> { + // First try to load from state cache + if let Some(columns) = self + .state + .read() + .unwrap() + .logical_columns() + .get(&logical_region_id) + { + return Ok(columns + .iter() + .map(|c| c.column_schema.name.clone()) + .collect()); + } + + // Else load from metadata region + let columns = self + .load_logical_columns(physical_region_id, logical_region_id) + .await? + .into_iter() + .map(|c| c.column_schema.name) + .collect::>(); + + Ok(columns) + } } diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 5214ae1fbf..24ab5a31bf 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet}; use snafu::OptionExt; +use store_api::metadata::ColumnMetadata; use store_api::storage::RegionId; use crate::error::{PhysicalRegionNotFoundSnafu, Result}; @@ -35,6 +36,10 @@ pub(crate) struct MetricEngineState { /// Cache for the columns of physical regions. /// The region id in key is the data region id. physical_columns: HashMap>, + /// Cache for the column metadata of logical regions. + /// The column order is the same with the order in the metadata, which is + /// alphabetically ordered on column name. + logical_columns: HashMap>, } impl MetricEngineState { @@ -80,6 +85,21 @@ impl MetricEngineState { .insert(logical_region_id, physical_region_id); } + /// Add and reorder logical columns. + /// + /// Caller should make sure: + /// 1. there is no duplicate columns + /// 2. the column order is the same with the order in the metadata, which is + /// alphabetically ordered on column name. + pub fn add_logical_columns( + &mut self, + logical_region_id: RegionId, + new_columns: impl IntoIterator, + ) { + let columns = self.logical_columns.entry(logical_region_id).or_default(); + columns.extend(new_columns); + } + pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option { self.logical_regions.get(&logical_region_id).copied() } @@ -88,6 +108,10 @@ impl MetricEngineState { &self.physical_columns } + pub fn logical_columns(&self) -> &HashMap> { + &self.logical_columns + } + pub fn physical_regions(&self) -> &HashMap> { &self.physical_regions } @@ -129,9 +153,15 @@ impl MetricEngineState { .unwrap() // Safety: physical_region_id is got from physical_regions .remove(&logical_region_id); + self.logical_columns.remove(&logical_region_id); + Ok(()) } + pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) { + self.logical_columns.remove(&logical_region_id); + } + pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool { self.logical_regions().contains_key(&logical_region_id) }