mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2025-12-23 14:40:01 +00:00
Compare commits
1 Commits
transform-
...
cache-logi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b7ab2957b |
@@ -64,14 +64,18 @@ impl MetricEngineInner {
|
|||||||
/// Return the physical region id behind this logical region
|
/// Return the physical region id behind this logical region
|
||||||
async fn alter_logical_region(
|
async fn alter_logical_region(
|
||||||
&self,
|
&self,
|
||||||
region_id: RegionId,
|
logical_region_id: RegionId,
|
||||||
request: RegionAlterRequest,
|
request: RegionAlterRequest,
|
||||||
) -> Result<RegionId> {
|
) -> Result<RegionId> {
|
||||||
let physical_region_id = {
|
let physical_region_id = {
|
||||||
let state = &self.state.read().unwrap();
|
let state = &self.state.read().unwrap();
|
||||||
state.get_physical_region_id(region_id).with_context(|| {
|
state
|
||||||
error!("Trying to alter an nonexistent region {region_id}");
|
.get_physical_region_id(logical_region_id)
|
||||||
LogicalRegionNotFoundSnafu { region_id }
|
.with_context(|| {
|
||||||
|
error!("Trying to alter an nonexistent region {logical_region_id}");
|
||||||
|
LogicalRegionNotFoundSnafu {
|
||||||
|
region_id: logical_region_id,
|
||||||
|
}
|
||||||
})?
|
})?
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -87,7 +91,7 @@ impl MetricEngineInner {
|
|||||||
.metadata_region
|
.metadata_region
|
||||||
.column_semantic_type(
|
.column_semantic_type(
|
||||||
metadata_region_id,
|
metadata_region_id,
|
||||||
region_id,
|
logical_region_id,
|
||||||
&col.column_metadata.column_schema.name,
|
&col.column_metadata.column_schema.name,
|
||||||
)
|
)
|
||||||
.await?
|
.await?
|
||||||
@@ -102,7 +106,7 @@ impl MetricEngineInner {
|
|||||||
self.add_columns_to_physical_data_region(
|
self.add_columns_to_physical_data_region(
|
||||||
data_region_id,
|
data_region_id,
|
||||||
metadata_region_id,
|
metadata_region_id,
|
||||||
region_id,
|
logical_region_id,
|
||||||
columns_to_add,
|
columns_to_add,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -110,10 +114,16 @@ impl MetricEngineInner {
|
|||||||
// register columns to logical region
|
// register columns to logical region
|
||||||
for col in columns {
|
for col in columns {
|
||||||
self.metadata_region
|
self.metadata_region
|
||||||
.add_column(metadata_region_id, region_id, &col.column_metadata)
|
.add_column(metadata_region_id, logical_region_id, &col.column_metadata)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// invalid logical column cache
|
||||||
|
self.state
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.invalid_logical_column_cache(logical_region_id);
|
||||||
|
|
||||||
Ok(physical_region_id)
|
Ok(physical_region_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -169,11 +169,11 @@ impl MetricEngineInner {
|
|||||||
) -> Result<Vec<usize>> {
|
) -> Result<Vec<usize>> {
|
||||||
// project on logical columns
|
// project on logical columns
|
||||||
let all_logical_columns = self
|
let all_logical_columns = self
|
||||||
.load_logical_columns(physical_region_id, logical_region_id)
|
.load_logical_column_names(physical_region_id, logical_region_id)
|
||||||
.await?;
|
.await?;
|
||||||
let projected_logical_names = origin_projection
|
let projected_logical_names = origin_projection
|
||||||
.iter()
|
.iter()
|
||||||
.map(|i| all_logical_columns[*i].column_schema.name.clone())
|
.map(|i| all_logical_columns[*i].clone())
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
// generate physical projection
|
// generate physical projection
|
||||||
@@ -200,10 +200,8 @@ impl MetricEngineInner {
|
|||||||
logical_region_id: RegionId,
|
logical_region_id: RegionId,
|
||||||
) -> Result<Vec<usize>> {
|
) -> Result<Vec<usize>> {
|
||||||
let logical_columns = self
|
let logical_columns = self
|
||||||
.load_logical_columns(physical_region_id, logical_region_id)
|
.load_logical_column_names(physical_region_id, logical_region_id)
|
||||||
.await?
|
.await?;
|
||||||
.into_iter()
|
|
||||||
.map(|col| col.column_schema.name);
|
|
||||||
let mut projection = Vec::with_capacity(logical_columns.len());
|
let mut projection = Vec::with_capacity(logical_columns.len());
|
||||||
let data_region_id = utils::to_data_region_id(physical_region_id);
|
let data_region_id = utils::to_data_region_id(physical_region_id);
|
||||||
let physical_metadata = self
|
let physical_metadata = self
|
||||||
|
|||||||
@@ -23,13 +23,25 @@ use crate::error::Result;
|
|||||||
impl MetricEngineInner {
|
impl MetricEngineInner {
|
||||||
/// Load column metadata of a logical region.
|
/// Load column metadata of a logical region.
|
||||||
///
|
///
|
||||||
/// The return value is ordered on [ColumnId].
|
/// The return value is ordered on column name.
|
||||||
pub async fn load_logical_columns(
|
pub async fn load_logical_columns(
|
||||||
&self,
|
&self,
|
||||||
physical_region_id: RegionId,
|
physical_region_id: RegionId,
|
||||||
logical_region_id: RegionId,
|
logical_region_id: RegionId,
|
||||||
) -> Result<Vec<ColumnMetadata>> {
|
) -> Result<Vec<ColumnMetadata>> {
|
||||||
// load logical and physical columns, and intersect them to get logical column metadata
|
// First try to load from state cache
|
||||||
|
if let Some(columns) = self
|
||||||
|
.state
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.logical_columns()
|
||||||
|
.get(&logical_region_id)
|
||||||
|
{
|
||||||
|
return Ok(columns.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Else load from metadata region and update the cache.
|
||||||
|
// Load logical and physical columns, and intersect them to get logical column metadata.
|
||||||
let mut logical_column_metadata = self
|
let mut logical_column_metadata = self
|
||||||
.metadata_region
|
.metadata_region
|
||||||
.logical_columns(physical_region_id, logical_region_id)
|
.logical_columns(physical_region_id, logical_region_id)
|
||||||
@@ -37,11 +49,48 @@ impl MetricEngineInner {
|
|||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|(_, column_metadata)| column_metadata)
|
.map(|(_, column_metadata)| column_metadata)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
// Sort columns on column name to ensure the order
|
||||||
// sort columns on column id to ensure the order
|
|
||||||
logical_column_metadata
|
logical_column_metadata
|
||||||
.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));
|
.sort_unstable_by(|c1, c2| c1.column_schema.name.cmp(&c2.column_schema.name));
|
||||||
|
// Update cache
|
||||||
|
self.state
|
||||||
|
.write()
|
||||||
|
.unwrap()
|
||||||
|
.add_logical_columns(logical_region_id, logical_column_metadata.clone());
|
||||||
|
|
||||||
Ok(logical_column_metadata)
|
Ok(logical_column_metadata)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Load logical column names of a logical region.
|
||||||
|
///
|
||||||
|
/// The return value is ordered on column name alphabetically.
|
||||||
|
pub async fn load_logical_column_names(
|
||||||
|
&self,
|
||||||
|
physical_region_id: RegionId,
|
||||||
|
logical_region_id: RegionId,
|
||||||
|
) -> Result<Vec<String>> {
|
||||||
|
// First try to load from state cache
|
||||||
|
if let Some(columns) = self
|
||||||
|
.state
|
||||||
|
.read()
|
||||||
|
.unwrap()
|
||||||
|
.logical_columns()
|
||||||
|
.get(&logical_region_id)
|
||||||
|
{
|
||||||
|
return Ok(columns
|
||||||
|
.iter()
|
||||||
|
.map(|c| c.column_schema.name.clone())
|
||||||
|
.collect());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Else load from metadata region
|
||||||
|
let columns = self
|
||||||
|
.load_logical_columns(physical_region_id, logical_region_id)
|
||||||
|
.await?
|
||||||
|
.into_iter()
|
||||||
|
.map(|c| c.column_schema.name)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
Ok(columns)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,7 @@
|
|||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
use snafu::OptionExt;
|
use snafu::OptionExt;
|
||||||
|
use store_api::metadata::ColumnMetadata;
|
||||||
use store_api::storage::RegionId;
|
use store_api::storage::RegionId;
|
||||||
|
|
||||||
use crate::error::{PhysicalRegionNotFoundSnafu, Result};
|
use crate::error::{PhysicalRegionNotFoundSnafu, Result};
|
||||||
@@ -35,6 +36,10 @@ pub(crate) struct MetricEngineState {
|
|||||||
/// Cache for the columns of physical regions.
|
/// Cache for the columns of physical regions.
|
||||||
/// The region id in key is the data region id.
|
/// The region id in key is the data region id.
|
||||||
physical_columns: HashMap<RegionId, HashSet<String>>,
|
physical_columns: HashMap<RegionId, HashSet<String>>,
|
||||||
|
/// Cache for the column metadata of logical regions.
|
||||||
|
/// The column order is the same with the order in the metadata, which is
|
||||||
|
/// alphabetically ordered on column name.
|
||||||
|
logical_columns: HashMap<RegionId, Vec<ColumnMetadata>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MetricEngineState {
|
impl MetricEngineState {
|
||||||
@@ -80,6 +85,21 @@ impl MetricEngineState {
|
|||||||
.insert(logical_region_id, physical_region_id);
|
.insert(logical_region_id, physical_region_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add and reorder logical columns.
|
||||||
|
///
|
||||||
|
/// Caller should make sure:
|
||||||
|
/// 1. there is no duplicate columns
|
||||||
|
/// 2. the column order is the same with the order in the metadata, which is
|
||||||
|
/// alphabetically ordered on column name.
|
||||||
|
pub fn add_logical_columns(
|
||||||
|
&mut self,
|
||||||
|
logical_region_id: RegionId,
|
||||||
|
new_columns: impl IntoIterator<Item = ColumnMetadata>,
|
||||||
|
) {
|
||||||
|
let columns = self.logical_columns.entry(logical_region_id).or_default();
|
||||||
|
columns.extend(new_columns);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
|
pub fn get_physical_region_id(&self, logical_region_id: RegionId) -> Option<RegionId> {
|
||||||
self.logical_regions.get(&logical_region_id).copied()
|
self.logical_regions.get(&logical_region_id).copied()
|
||||||
}
|
}
|
||||||
@@ -88,6 +108,10 @@ impl MetricEngineState {
|
|||||||
&self.physical_columns
|
&self.physical_columns
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn logical_columns(&self) -> &HashMap<RegionId, Vec<ColumnMetadata>> {
|
||||||
|
&self.logical_columns
|
||||||
|
}
|
||||||
|
|
||||||
pub fn physical_regions(&self) -> &HashMap<RegionId, HashSet<RegionId>> {
|
pub fn physical_regions(&self) -> &HashMap<RegionId, HashSet<RegionId>> {
|
||||||
&self.physical_regions
|
&self.physical_regions
|
||||||
}
|
}
|
||||||
@@ -129,9 +153,15 @@ impl MetricEngineState {
|
|||||||
.unwrap() // Safety: physical_region_id is got from physical_regions
|
.unwrap() // Safety: physical_region_id is got from physical_regions
|
||||||
.remove(&logical_region_id);
|
.remove(&logical_region_id);
|
||||||
|
|
||||||
|
self.logical_columns.remove(&logical_region_id);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn invalid_logical_column_cache(&mut self, logical_region_id: RegionId) {
|
||||||
|
self.logical_columns.remove(&logical_region_id);
|
||||||
|
}
|
||||||
|
|
||||||
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
|
pub fn is_logical_region_exist(&self, logical_region_id: RegionId) -> bool {
|
||||||
self.logical_regions().contains_key(&logical_region_id)
|
self.logical_regions().contains_key(&logical_region_id)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user