diff --git a/src/metric-engine/src/engine/alter.rs b/src/metric-engine/src/engine/alter.rs index c99942dec6..76066ab97a 100644 --- a/src/metric-engine/src/engine/alter.rs +++ b/src/metric-engine/src/engine/alter.rs @@ -64,15 +64,19 @@ impl MetricEngineInner { /// Return the physical region id behind this logical region async fn alter_logical_region( &self, - region_id: RegionId, + logical_region_id: RegionId, request: RegionAlterRequest, ) -> Result { let physical_region_id = { let state = &self.state.read().unwrap(); - state.get_physical_region_id(region_id).with_context(|| { - error!("Trying to alter an nonexistent region {region_id}"); - LogicalRegionNotFoundSnafu { region_id } - })? + state + .get_physical_region_id(logical_region_id) + .with_context(|| { + error!("Trying to alter an nonexistent region {logical_region_id}"); + LogicalRegionNotFoundSnafu { + region_id: logical_region_id, + } + })? }; // only handle adding column @@ -80,6 +84,12 @@ impl MetricEngineInner { return Ok(physical_region_id); }; + // lock metadata region for this logical region id + let _write_guard = self + .metadata_region + .write_lock_logical_region(logical_region_id) + .await; + let metadata_region_id = to_metadata_region_id(physical_region_id); let mut columns_to_add = vec![]; for col in &columns { @@ -87,7 +97,7 @@ impl MetricEngineInner { .metadata_region .column_semantic_type( metadata_region_id, - region_id, + logical_region_id, &col.column_metadata.column_schema.name, ) .await? @@ -102,7 +112,7 @@ impl MetricEngineInner { self.add_columns_to_physical_data_region( data_region_id, metadata_region_id, - region_id, + logical_region_id, columns_to_add, ) .await?; @@ -110,10 +120,16 @@ impl MetricEngineInner { // register columns to logical region for col in columns { self.metadata_region - .add_column(metadata_region_id, region_id, &col.column_metadata) + .add_column(metadata_region_id, logical_region_id, &col.column_metadata) .await?; } + // invalid logical column cache + self.state + .write() + .unwrap() + .invalid_logical_column_cache(logical_region_id); + Ok(physical_region_id) } diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index 815ffc1143..97b049e01d 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -134,17 +134,26 @@ impl MetricEngineInner { .await?; let logical_region_num = logical_regions.len(); - let mut state = self.state.write().unwrap(); - // recover physical column names - let physical_column_names = physical_columns - .into_iter() - .map(|col| col.column_schema.name) - .collect(); - state.add_physical_region(physical_region_id, physical_column_names); - // recover logical regions - for logical_region_id in logical_regions { - state.add_logical_region(physical_region_id, logical_region_id); + { + let mut state = self.state.write().unwrap(); + // recover physical column names + let physical_column_names = physical_columns + .into_iter() + .map(|col| col.column_schema.name) + .collect(); + state.add_physical_region(physical_region_id, physical_column_names); + // recover logical regions + for logical_region_id in &logical_regions { + state.add_logical_region(physical_region_id, *logical_region_id); + } } + + for logical_region_id in logical_regions { + self.metadata_region + .open_logical_region(logical_region_id) + .await; + } + LOGICAL_REGION_COUNT.add(logical_region_num as i64); Ok(()) diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index af01e1bb9e..2ba4c213c6 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -169,11 +169,11 @@ impl MetricEngineInner { ) -> Result> { // project on logical columns let all_logical_columns = self - .load_logical_columns(physical_region_id, logical_region_id) + .load_logical_column_names(physical_region_id, logical_region_id) .await?; let projected_logical_names = origin_projection .iter() - .map(|i| all_logical_columns[*i].column_schema.name.clone()) + .map(|i| all_logical_columns[*i].clone()) .collect::>(); // generate physical projection @@ -200,10 +200,8 @@ impl MetricEngineInner { logical_region_id: RegionId, ) -> Result> { let logical_columns = self - .load_logical_columns(physical_region_id, logical_region_id) - .await? - .into_iter() - .map(|col| col.column_schema.name); + .load_logical_column_names(physical_region_id, logical_region_id) + .await?; let mut projection = Vec::with_capacity(logical_columns.len()); let data_region_id = utils::to_data_region_id(physical_region_id); let physical_metadata = self diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs index 1fa33ec39d..171480c589 100644 --- a/src/metric-engine/src/engine/region_metadata.rs +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -23,13 +23,29 @@ use crate::error::Result; impl MetricEngineInner { /// Load column metadata of a logical region. /// - /// The return value is ordered on [ColumnId]. + /// The return value is ordered on column name. pub async fn load_logical_columns( &self, physical_region_id: RegionId, logical_region_id: RegionId, ) -> Result> { - // load logical and physical columns, and intersect them to get logical column metadata + // First try to load from state cache + if let Some(columns) = self + .state + .read() + .unwrap() + .logical_columns() + .get(&logical_region_id) + { + return Ok(columns.clone()); + } + + // Else load from metadata region and update the cache. + let _read_guard = self + .metadata_region + .read_lock_logical_region(logical_region_id) + .await; + // Load logical and physical columns, and intersect them to get logical column metadata. let mut logical_column_metadata = self .metadata_region .logical_columns(physical_region_id, logical_region_id) @@ -37,11 +53,48 @@ impl MetricEngineInner { .into_iter() .map(|(_, column_metadata)| column_metadata) .collect::>(); - - // sort columns on column id to ensure the order + // Sort columns on column name to ensure the order logical_column_metadata .sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name)); + // Update cache + self.state + .write() + .unwrap() + .add_logical_columns(logical_region_id, logical_column_metadata.clone()); Ok(logical_column_metadata) } + + /// Load logical column names of a logical region. + /// + /// The return value is ordered on column name alphabetically. + pub async fn load_logical_column_names( + &self, + physical_region_id: RegionId, + logical_region_id: RegionId, + ) -> Result> { + // First try to load from state cache + if let Some(columns) = self + .state + .read() + .unwrap() + .logical_columns() + .get(&logical_region_id) + { + return Ok(columns + .iter() + .map(|c| c.column_schema.name.clone()) + .collect()); + } + + // Else load from metadata region + let columns = self + .load_logical_columns(physical_region_id, logical_region_id) + .await? + .into_iter() + .map(|c| c.column_schema.name) + .collect::>(); + + Ok(columns) + } } diff --git a/src/metric-engine/src/engine/state.rs b/src/metric-engine/src/engine/state.rs index 5214ae1fbf..24ab5a31bf 100644 --- a/src/metric-engine/src/engine/state.rs +++ b/src/metric-engine/src/engine/state.rs @@ -17,6 +17,7 @@ use std::collections::{HashMap, HashSet}; use snafu::OptionExt; +use store_api::metadata::ColumnMetadata; use store_api::storage::RegionId; use crate::error::{PhysicalRegionNotFoundSnafu, Result}; @@ -35,6 +36,10 @@ pub(crate) struct MetricEngineState { /// Cache for the columns of physical regions. /// The region id in key is the data region id. physical_columns: HashMap>, + /// Cache for the column metadata of logical regions. + /// The column order is the same with the order in the metadata, which is + /// alphabetically ordered on column name. + logical_columns: HashMap>, } impl MetricEngineState { @@ -80,6 +85,21 @@ impl MetricEngineState { .insert(logical_region_id, physical_region_id); } + /// Add and reorder logical columns. + /// + /// Caller should make sure: + /// 1. there is no duplicate columns + /// 2. the column order is the same with the order in the metadata, which is + /// alphabetically ordered on column name. + pub fn add_logical_columns( + &mut self, + logical_region_id: RegionId, + new_columns: impl IntoIterator, + ) { + let columns = self.logical_columns.entry(logical_region_id).or_default(); + columns.extend(new_columns); + } + pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option { self.logical_regions.get(&logical_region_id).copied() } @@ -88,6 +108,10 @@ impl MetricEngineState { &self.physical_columns } + pub fn logical_columns(&self) -> &HashMap> { + &self.logical_columns + } + pub fn physical_regions(&self) -> &HashMap> { &self.physical_regions } @@ -129,9 +153,15 @@ impl MetricEngineState { .unwrap() // Safety: physical_region_id is got from physical_regions .remove(&logical_region_id); + self.logical_columns.remove(&logical_region_id); + Ok(()) } + pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) { + self.logical_columns.remove(&logical_region_id); + } + pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool { self.logical_regions().contains_key(&logical_region_id) } diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 8e892e7ba7..b02fa3de51 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::collections::HashMap; +use std::sync::Arc; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value}; @@ -21,7 +22,7 @@ use base64::Engine; use common_recordbatch::util::collect; use datafusion::prelude::{col, lit}; use mito2::engine::MitoEngine; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use store_api::metadata::ColumnMetadata; use store_api::metric_engine_consts::{ METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME, @@ -31,11 +32,12 @@ use store_api::metric_engine_consts::{ use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionDeleteRequest, RegionPutRequest}; use store_api::storage::{RegionId, ScanRequest}; +use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock}; use crate::error::{ CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu, - MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseRegionIdSnafu, RegionAlreadyExistsSnafu, - Result, + LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, + ParseRegionIdSnafu, RegionAlreadyExistsSnafu, Result, }; use crate::utils; @@ -56,11 +58,19 @@ const COLUMN_PREFIX: &str = "__column_"; /// itself. pub struct MetadataRegion { mito: MitoEngine, + /// Logical lock for operations that need to be serialized. Like update & read region columns. + /// + /// Region entry will be registered on creating and opening logical region, and deregistered on + /// removing logical region. + logical_region_lock: RwLock>>>, } impl MetadataRegion { pub fn new(mito: MitoEngine) -> Self { - Self { mito } + Self { + mito, + logical_region_lock: RwLock::new(HashMap::new()), + } } /// Add a new table key to metadata. @@ -85,10 +95,21 @@ impl MetadataRegion { } .fail() } else { + self.logical_region_lock + .write() + .await + .insert(logical_region_id, Arc::new(RwLock::new(()))); Ok(()) } } + pub async fn open_logical_region(&self, logical_region_id: RegionId) { + self.logical_region_lock + .write() + .await + .insert(logical_region_id, Arc::new(RwLock::new(()))); + } + /// Add a new column key to metadata. /// /// This method won't check if the column already exists. But @@ -111,6 +132,40 @@ impl MetadataRegion { .await } + /// Retrieve a read lock guard of given logical region id. + pub async fn read_lock_logical_region( + &self, + logical_region_id: RegionId, + ) -> Result> { + let lock = self + .logical_region_lock + .read() + .await + .get(&logical_region_id) + .context(LogicalRegionNotFoundSnafu { + region_id: logical_region_id, + })? + .clone(); + Ok(RwLock::read_owned(lock).await) + } + + /// Retrieve a write lock guard of given logical region id. + pub async fn write_lock_logical_region( + &self, + logical_region_id: RegionId, + ) -> Result> { + let lock = self + .logical_region_lock + .read() + .await + .get(&logical_region_id) + .context(LogicalRegionNotFoundSnafu { + region_id: logical_region_id, + })? + .clone(); + Ok(RwLock::write_owned(lock).await) + } + /// Remove a registered logical region from metadata. /// /// This method doesn't check if the previous key exists. @@ -136,6 +191,11 @@ impl MetadataRegion { column_keys.push(region_key); self.delete(region_id, &column_keys).await?; + self.logical_region_lock + .write() + .await + .remove(&logical_region_id); + Ok(()) }