feat: cache logical region's metadata (#4827)

* feat: cache logical region's metadata

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: implement logical region locking for metadata operations

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix: correct typo in comment for MetadataRegion struct

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-10-14 16:44:13 +08:00
committed by GitHub
parent 3b2ce31a19
commit 06e565d25a
6 changed files with 198 additions and 32 deletions

View File

@@ -64,15 +64,19 @@ impl MetricEngineInner {
/// Return the physical region id behind this logical region
async fn alter_logical_region(
&self,
region_id: RegionId,
logical_region_id: RegionId,
request: RegionAlterRequest,
) -> Result<RegionId> {
let physical_region_id = {
let state = &self.state.read().unwrap();
state.get_physical_region_id(region_id).with_context(|| {
error!("Trying to alter an nonexistent region {region_id}");
LogicalRegionNotFoundSnafu { region_id }
})?
state
.get_physical_region_id(logical_region_id)
.with_context(|| {
error!("Trying to alter an nonexistent region {logical_region_id}");
LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
})?
};
// only handle adding column
@@ -80,6 +84,12 @@ impl MetricEngineInner {
return Ok(physical_region_id);
};
// lock metadata region for this logical region id
let _write_guard = self
.metadata_region
.write_lock_logical_region(logical_region_id)
.await;
let metadata_region_id = to_metadata_region_id(physical_region_id);
let mut columns_to_add = vec![];
for col in &columns {
@@ -87,7 +97,7 @@ impl MetricEngineInner {
.metadata_region
.column_semantic_type(
metadata_region_id,
region_id,
logical_region_id,
&col.column_metadata.column_schema.name,
)
.await?
@@ -102,7 +112,7 @@ impl MetricEngineInner {
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
region_id,
logical_region_id,
columns_to_add,
)
.await?;
@@ -110,10 +120,16 @@ impl MetricEngineInner {
// register columns to logical region
for col in columns {
self.metadata_region
.add_column(metadata_region_id, region_id, &col.column_metadata)
.add_column(metadata_region_id, logical_region_id, &col.column_metadata)
.await?;
}
// invalid logical column cache
self.state
.write()
.unwrap()
.invalid_logical_column_cache(logical_region_id);
Ok(physical_region_id)
}

View File

@@ -134,17 +134,26 @@ impl MetricEngineInner {
.await?;
let logical_region_num = logical_regions.len();
let mut state = self.state.write().unwrap();
// recover physical column names
let physical_column_names = physical_columns
.into_iter()
.map(|col| col.column_schema.name)
.collect();
state.add_physical_region(physical_region_id, physical_column_names);
// recover logical regions
for logical_region_id in logical_regions {
state.add_logical_region(physical_region_id, logical_region_id);
{
let mut state = self.state.write().unwrap();
// recover physical column names
let physical_column_names = physical_columns
.into_iter()
.map(|col| col.column_schema.name)
.collect();
state.add_physical_region(physical_region_id, physical_column_names);
// recover logical regions
for logical_region_id in &logical_regions {
state.add_logical_region(physical_region_id, *logical_region_id);
}
}
for logical_region_id in logical_regions {
self.metadata_region
.open_logical_region(logical_region_id)
.await;
}
LOGICAL_REGION_COUNT.add(logical_region_num as i64);
Ok(())

View File

@@ -169,11 +169,11 @@ impl MetricEngineInner {
) -> Result<Vec<usize>> {
// project on logical columns
let all_logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.load_logical_column_names(physical_region_id, logical_region_id)
.await?;
let projected_logical_names = origin_projection
.iter()
.map(|i| all_logical_columns[*i].column_schema.name.clone())
.map(|i| all_logical_columns[*i].clone())
.collect::<Vec<_>>();
// generate physical projection
@@ -200,10 +200,8 @@ impl MetricEngineInner {
logical_region_id: RegionId,
) -> Result<Vec<usize>> {
let logical_columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.map(|col| col.column_schema.name);
.load_logical_column_names(physical_region_id, logical_region_id)
.await?;
let mut projection = Vec::with_capacity(logical_columns.len());
let data_region_id = utils::to_data_region_id(physical_region_id);
let physical_metadata = self

View File

@@ -23,13 +23,29 @@ use crate::error::Result;
impl MetricEngineInner {
/// Load column metadata of a logical region.
///
/// The return value is ordered on [ColumnId].
/// The return value is ordered on column name.
pub async fn load_logical_columns(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<ColumnMetadata>> {
// load logical and physical columns, and intersect them to get logical column metadata
// First try to load from state cache
if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
return Ok(columns.clone());
}
// Else load from metadata region and update the cache.
let _read_guard = self
.metadata_region
.read_lock_logical_region(logical_region_id)
.await;
// Load logical and physical columns, and intersect them to get logical column metadata.
let mut logical_column_metadata = self
.metadata_region
.logical_columns(physical_region_id, logical_region_id)
@@ -37,11 +53,48 @@ impl MetricEngineInner {
.into_iter()
.map(|(_, column_metadata)| column_metadata)
.collect::<Vec<_>>();
// sort columns on column id to ensure the order
// Sort columns on column name to ensure the order
logical_column_metadata
.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));
// Update cache
self.state
.write()
.unwrap()
.add_logical_columns(logical_region_id, logical_column_metadata.clone());
Ok(logical_column_metadata)
}
/// Load logical column names of a logical region.
///
/// The return value is ordered on column name alphabetically.
pub async fn load_logical_column_names(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<Vec<String>> {
// First try to load from state cache
if let Some(columns) = self
.state
.read()
.unwrap()
.logical_columns()
.get(&logical_region_id)
{
return Ok(columns
.iter()
.map(|c| c.column_schema.name.clone())
.collect());
}
// Else load from metadata region
let columns = self
.load_logical_columns(physical_region_id, logical_region_id)
.await?
.into_iter()
.map(|c| c.column_schema.name)
.collect::<Vec<_>>();
Ok(columns)
}
}

View File

@@ -17,6 +17,7 @@
use std::collections::{HashMap, HashSet};
use snafu::OptionExt;
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use crate::error::{PhysicalRegionNotFoundSnafu, Result};
@@ -35,6 +36,10 @@ pub(crate) struct MetricEngineState {
/// Cache for the columns of physical regions.
/// The region id in key is the data region id.
physical_columns: HashMap<RegionId, HashSet<String>>,
/// Cache for the column metadata of logical regions.
/// The column order is the same with the order in the metadata, which is
/// alphabetically ordered on column name.
logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
}
impl MetricEngineState {
@@ -80,6 +85,21 @@ impl MetricEngineState {
.insert(logical_region_id, physical_region_id);
}
/// Add and reorder logical columns.
///
/// Caller should make sure:
/// 1. there is no duplicate columns
/// 2. the column order is the same with the order in the metadata, which is
/// alphabetically ordered on column name.
pub fn add_logical_columns(
&mut self,
logical_region_id: RegionId,
new_columns: impl IntoIterator<Item = ColumnMetadata>,
) {
let columns = self.logical_columns.entry(logical_region_id).or_default();
columns.extend(new_columns);
}
pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
self.logical_regions.get(&logical_region_id).copied()
}
@@ -88,6 +108,10 @@ impl MetricEngineState {
&self.physical_columns
}
pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
&self.logical_columns
}
pub fn physical_regions(&self) -> &HashMap<RegionId, HashSet<RegionId>> {
&self.physical_regions
}
@@ -129,9 +153,15 @@ impl MetricEngineState {
.unwrap() // Safety: physical_region_id is got from physical_regions
.remove(&logical_region_id);
self.logical_columns.remove(&logical_region_id);
Ok(())
}
pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) {
self.logical_columns.remove(&logical_region_id);
}
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
self.logical_regions().contains_key(&logical_region_id)
}

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
@@ -21,7 +22,7 @@ use base64::Engine;
use common_recordbatch::util::collect;
use datafusion::prelude::{col, lit};
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
@@ -31,11 +32,12 @@ use store_api::metric_engine_consts::{
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
use crate::error::{
CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu,
MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseRegionIdSnafu, RegionAlreadyExistsSnafu,
Result,
LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu,
ParseRegionIdSnafu, RegionAlreadyExistsSnafu, Result,
};
use crate::utils;
@@ -56,11 +58,19 @@ const COLUMN_PREFIX: &str = "__column_";
/// itself.
pub struct MetadataRegion {
mito: MitoEngine,
/// Logical lock for operations that need to be serialized. Like update & read region columns.
///
/// Region entry will be registered on creating and opening logical region, and deregistered on
/// removing logical region.
logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
}
impl MetadataRegion {
pub fn new(mito: MitoEngine) -> Self {
Self { mito }
Self {
mito,
logical_region_lock: RwLock::new(HashMap::new()),
}
}
/// Add a new table key to metadata.
@@ -85,10 +95,21 @@ impl MetadataRegion {
}
.fail()
} else {
self.logical_region_lock
.write()
.await
.insert(logical_region_id, Arc::new(RwLock::new(())));
Ok(())
}
}
pub async fn open_logical_region(&self, logical_region_id: RegionId) {
self.logical_region_lock
.write()
.await
.insert(logical_region_id, Arc::new(RwLock::new(())));
}
/// Add a new column key to metadata.
///
/// This method won't check if the column already exists. But
@@ -111,6 +132,40 @@ impl MetadataRegion {
.await
}
/// Retrieve a read lock guard of given logical region id.
pub async fn read_lock_logical_region(
&self,
logical_region_id: RegionId,
) -> Result<OwnedRwLockReadGuard<()>> {
let lock = self
.logical_region_lock
.read()
.await
.get(&logical_region_id)
.context(LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
})?
.clone();
Ok(RwLock::read_owned(lock).await)
}
/// Retrieve a write lock guard of given logical region id.
pub async fn write_lock_logical_region(
&self,
logical_region_id: RegionId,
) -> Result<OwnedRwLockWriteGuard<()>> {
let lock = self
.logical_region_lock
.read()
.await
.get(&logical_region_id)
.context(LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
})?
.clone();
Ok(RwLock::write_owned(lock).await)
}
/// Remove a registered logical region from metadata.
///
/// This method doesn't check if the previous key exists.
@@ -136,6 +191,11 @@ impl MetadataRegion {
column_keys.push(region_key);
self.delete(region_id, &column_keys).await?;
self.logical_region_lock
.write()
.await
.remove(&logical_region_id);
Ok(())
}