feat: alter logical region in metric region (#2726)

* add test for add and alter logical region

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove table id

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: JeremyHi <jiachun_feng@proton.me>

* CR sugg.

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* extract internal states into a struct

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unused deps

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* Apply suggestions from code review

Co-authored-by: dennis zhuang <killme2008@gmail.com>

* tweak fn name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
Co-authored-by: dennis zhuang <killme2008@gmail.com>
Co-authored-by: JeremyHi <jiachun_feng@proton.me>
This commit is contained in:
Ruihang Xia
2023-11-15 11:52:52 +08:00
committed by GitHub
parent f92b55c745
commit a691cff0c4
7 changed files with 469 additions and 223 deletions

View File

@@ -134,18 +134,17 @@ mod test {
#[tokio::test]
async fn test_add_columns() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
env.init_metric_region().await;
let current_version = env
.mito()
.get_metadata(utils::to_data_region_id(env.default_region_id()))
.get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
.await
.unwrap()
.schema_version;
assert_eq!(current_version, 0);
// TestEnv will create a logical region which changes the version to 1.
assert_eq!(current_version, 1);
let new_columns = vec![
ColumnMetadata {
@@ -168,13 +167,13 @@ mod test {
},
];
env.data_region()
.add_columns(env.default_region_id(), new_columns)
.add_columns(env.default_physical_region_id(), new_columns)
.await
.unwrap();
let new_metadata = env
.mito()
.get_metadata(utils::to_data_region_id(env.default_region_id()))
.get_metadata(utils::to_data_region_id(env.default_physical_region_id()))
.await
.unwrap();
let column_names = new_metadata
@@ -182,15 +181,21 @@ mod test {
.iter()
.map(|c| &c.column_schema.name)
.collect::<Vec<_>>();
let expected = vec!["greptime_timestamp", "__metric", "__tsid", "tag2", "tag3"];
let expected = vec![
"greptime_timestamp",
"greptime_value",
"__metric",
"__tsid",
"job",
"tag2",
"tag3",
];
assert_eq!(column_names, expected);
}
// Only string is allowed for tag column
#[tokio::test]
async fn test_add_invalid_column() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
env.init_metric_region().await;
@@ -201,7 +206,7 @@ mod test {
}];
let result = env
.data_region()
.add_columns(env.default_region_id(), new_columns)
.add_columns(env.default_physical_region_id(), new_columns)
.await;
assert!(result.is_err());
}

View File

@@ -34,18 +34,20 @@ use store_api::region_request::{
AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionGroup, RegionId, ScanRequest, TableId};
use store_api::storage::{RegionGroup, RegionId, ScanRequest};
use tokio::sync::RwLock;
use crate::data_region::DataRegion;
use crate::error::{
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu,
LogicalTableNotFoundSnafu, MissingRegionOptionSnafu, PhysicalRegionNotFoundSnafu,
PhysicalTableNotFoundSnafu, Result,
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, ForbiddenPhysicalAlterSnafu,
InternalColumnOccupiedSnafu, LogicalRegionNotFoundSnafu, MissingRegionOptionSnafu,
ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::metadata_region::MetadataRegion;
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils;
use crate::metrics::{
FORBIDDEN_OPERATION_COUNT, LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT,
};
use crate::utils::{self, to_data_region_id};
/// region group value for data region inside a metric region
pub const METRIC_DATA_REGION_GROUP: RegionGroup = 0;
@@ -95,8 +97,10 @@ pub const PHYSICAL_TABLE_METADATA_KEY: &str = "physical_metric_table";
/// on_physical_table = "physical_table",
/// );
/// ```
/// And this key will be translated to corresponding physical **REGION** id in metasrv.
pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table";
#[derive(Clone)]
pub struct MetricEngine {
inner: Arc<MetricEngineInner>,
}
@@ -127,7 +131,11 @@ impl RegionEngine for MetricEngine {
RegionRequest::Drop(_) => todo!(),
RegionRequest::Open(_) => todo!(),
RegionRequest::Close(_) => todo!(),
RegionRequest::Alter(_) => todo!(),
RegionRequest::Alter(alter) => self
.inner
.alter_region(region_id, alter)
.await
.map(|_| Output::AffectedRows(0)),
RegionRequest::Flush(_) => todo!(),
RegionRequest::Compact(_) => todo!(),
RegionRequest::Truncate(_) => todo!(),
@@ -185,23 +193,75 @@ impl MetricEngine {
mito,
metadata_region,
data_region,
physical_tables: RwLock::default(),
physical_columns: RwLock::default(),
state: RwLock::default(),
}),
}
}
}
/// Internal states of metric engine
#[derive(Default)]
struct MetricEngineState {
/// Mapping from physical region id to its logical region ids
/// `logical_regions` records a reverse mapping from logical region id to
/// physical region id
physical_regions: HashMap<RegionId, HashSet<RegionId>>,
/// Mapping from logical region id to physical region id.
logical_regions: HashMap<RegionId, RegionId>,
/// Cache for the columns of physical regions.
/// The region id in key is the data region id.
physical_columns: HashMap<RegionId, HashSet<String>>,
}
impl MetricEngineState {
pub fn add_physical_region(
&mut self,
physical_region_id: RegionId,
physical_columns: HashSet<String>,
) {
let physical_region_id = to_data_region_id(physical_region_id);
self.physical_regions
.insert(physical_region_id, HashSet::new());
self.physical_columns
.insert(physical_region_id, physical_columns);
}
/// # Panic
/// if the physical region does not exist
pub fn add_physical_columns(
&mut self,
physical_region_id: RegionId,
physical_columns: impl IntoIterator<Item = String>,
) {
let physical_region_id = to_data_region_id(physical_region_id);
let columns = self.physical_columns.get_mut(&physical_region_id).unwrap();
for col in physical_columns {
columns.insert(col);
}
}
/// # Panic
/// if the physical region does not exist
pub fn add_logical_region(
&mut self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) {
let physical_region_id = to_data_region_id(physical_region_id);
self.physical_regions
.get_mut(&physical_region_id)
.unwrap()
.insert(logical_region_id);
self.logical_regions
.insert(logical_region_id, physical_region_id);
}
}
struct MetricEngineInner {
mito: MitoEngine,
metadata_region: MetadataRegion,
data_region: DataRegion,
// TODO(ruihang): handle different catalog/schema
/// Map from physical table name to table id.
physical_tables: RwLock<HashMap<String, TableId>>,
/// Cache for the columns of physical regions.
/// The region id in key is the data region id.
physical_columns: RwLock<HashMap<RegionId, HashSet<String>>>,
state: RwLock<MetricEngineState>,
}
impl MetricEngineInner {
@@ -230,14 +290,6 @@ impl MetricEngineInner {
) -> Result<()> {
let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
// TODO: workaround for now, should find another way to retrieve the
// table name.
let physical_table_name = request
.options
.get(PHYSICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?
.to_string();
// create metadata region
let create_metadata_region_request =
self.create_request_for_metadata_region(&request.region_dir);
@@ -253,6 +305,11 @@ impl MetricEngineInner {
// create data region
let create_data_region_request = self.create_request_for_data_region(&request);
let physical_column_set = create_data_region_request
.column_metadatas
.iter()
.map(|metadata| metadata.column_schema.name.clone())
.collect::<HashSet<_>>();
self.mito
.handle_request(
data_region_id,
@@ -263,14 +320,14 @@ impl MetricEngineInner {
region_type: DATA_REGION_SUBDIR,
})?;
info!("Created physical metric region {region_id:?} with table name {physical_table_name}");
info!("Created physical metric region {region_id:?}");
PHYSICAL_REGION_COUNT.inc();
// remember this table
self.physical_tables
self.state
.write()
.await
.insert(physical_table_name, region_id.table_id());
.add_physical_region(data_region_id, physical_column_set);
Ok(())
}
@@ -287,59 +344,71 @@ impl MetricEngineInner {
/// If the logical region to create already exists, this method will do nothing.
async fn create_logical_region(
&self,
region_id: RegionId,
logical_region_id: RegionId,
request: RegionCreateRequest,
) -> Result<()> {
// transform IDs
let physical_table_name = request
let physical_region_id_raw = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
let physical_table_id = *self
.physical_tables
.read()
.await
.get(physical_table_name)
.with_context(|| PhysicalTableNotFoundSnafu {
physical_table: physical_table_name,
})?;
let logical_table_id = region_id.table_id();
let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
let physical_region_id: RegionId = physical_region_id_raw
.parse::<u64>()
.with_context(|_| ParseRegionIdSnafu {
raw: physical_region_id_raw,
})?
.into();
let (data_region_id, metadata_region_id) = Self::transform_region_id(physical_region_id);
// check if the logical table already exist
// check if the logical region already exist
if self
.metadata_region
.is_table_exist(metadata_region_id, logical_table_id)
.is_logical_region_exists(metadata_region_id, logical_region_id)
.await?
{
info!("Create a existing logical region {region_id}. Skipped");
info!("Create a existing logical region {logical_region_id}. Skipped");
return Ok(());
}
// find new columns to add
let physical_columns = self.physical_columns.read().await;
let physical_columns =
physical_columns
.get(&data_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?;
let mut new_columns = vec![];
for col in &request.column_metadatas {
if !physical_columns.contains(&col.column_schema.name) {
new_columns.push(col.clone());
{
let physical_columns = &self.state.read().await.physical_columns;
let physical_columns = physical_columns.get(&data_region_id).with_context(|| {
PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
}
})?;
for col in &request.column_metadatas {
if !physical_columns.contains(&col.column_schema.name) {
new_columns.push(col.clone());
}
}
}
info!("Found new columns {new_columns:?} to add to physical region {data_region_id}");
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_table_id,
logical_region_id,
new_columns,
)
.await?;
// register logical region to metadata region
self.metadata_region
.add_logical_region(metadata_region_id, logical_region_id)
.await?;
// update the mapping
// Safety: previous steps ensure the physical region exist
self.state
.write()
.await
.add_logical_region(physical_region_id, logical_region_id);
info!("Created new logical region {logical_region_id} on physical region {data_region_id}");
LOGICAL_REGION_COUNT.inc();
Ok(())
}
@@ -347,7 +416,7 @@ impl MetricEngineInner {
&self,
data_region_id: RegionId,
metadata_region_id: RegionId,
logical_table_id: TableId,
logical_region_id: RegionId,
new_columns: Vec<ColumnMetadata>,
) -> Result<()> {
// alter data region
@@ -360,29 +429,23 @@ impl MetricEngineInner {
self.metadata_region
.add_column(
metadata_region_id,
logical_table_id,
logical_region_id,
&col.column_schema.name,
col.semantic_type,
)
.await?;
}
let mut physical_columns = self.physical_columns.write().await;
// safety: previous step has checked this
let mut column_set = physical_columns.get_mut(&data_region_id).unwrap();
for col in &new_columns {
column_set.insert(col.column_schema.name.clone());
}
info!("Create table {logical_table_id} leads to adding columns {new_columns:?} to physical region {data_region_id}");
self.state.write().await.add_physical_columns(
data_region_id,
new_columns
.iter()
.map(|meta| meta.column_schema.name.clone()),
);
info!("Create region {logical_region_id} leads to adding columns {new_columns:?} to physical region {data_region_id}");
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
// register table to metadata region
self.metadata_region
.add_table(metadata_region_id, logical_table_id)
.await?;
info!("Created new logical table {logical_table_id} on physical region {data_region_id}");
LOGICAL_REGION_COUNT.inc();
Ok(())
}
@@ -546,38 +609,51 @@ impl MetricEngineInner {
}
impl MetricEngineInner {
pub async fn alter_logic_region(
/// Dispatch region alter request
pub async fn alter_region(
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<()> {
let is_altering_logical_region = self
.state
.read()
.await
.physical_regions
.contains_key(&region_id);
if is_altering_logical_region {
self.alter_physical_region(region_id, request).await
} else {
self.alter_logical_region(region_id, request).await
}
}
async fn alter_logical_region(
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<()> {
let physical_region_id = {
let logical_regions = &self.state.read().await.logical_regions;
*logical_regions.get(&region_id).with_context(|| {
error!("Trying to alter an nonexistent region {region_id}");
LogicalRegionNotFoundSnafu { region_id }
})?
};
// only handle adding column
let AlterKind::AddColumns { columns } = request.kind else {
return Ok(());
};
let logical_table_id = region_id.table_id();
// check if the table exists
let metadata_region_id = utils::to_metadata_region_id(region_id);
if !self
.metadata_region
.is_table_exist(metadata_region_id, logical_table_id)
.await?
{
error!("Trying to alter an nonexistent table {logical_table_id}");
return LogicalTableNotFoundSnafu {
table_id: logical_table_id,
}
.fail();
}
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let mut columns_to_add = vec![];
for col in columns {
if self
.metadata_region
.column_semantic_type(
metadata_region_id,
logical_table_id,
region_id,
&col.column_metadata.column_schema.name,
)
.await?
@@ -587,23 +663,36 @@ impl MetricEngineInner {
}
}
let data_region_id = utils::to_data_region_id(region_id);
let data_region_id = utils::to_data_region_id(physical_region_id);
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_table_id,
region_id,
columns_to_add,
)
.await?;
Ok(())
}
async fn alter_physical_region(
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<()> {
info!("Metric region received alter request {request:?} on physical region {region_id:?}");
FORBIDDEN_OPERATION_COUNT.inc();
ForbiddenPhysicalAlterSnafu.fail()
}
}
#[cfg(test)]
mod tests {
use std::hash::Hash;
use store_api::region_request::AddColumn;
use super::*;
use crate::test_util::TestEnv;
@@ -749,4 +838,63 @@ mod tests {
vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()]
);
}
#[tokio::test]
async fn test_alter_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();
let engine_inner = engine.inner;
// alter physical region
let physical_region_id = env.default_physical_region_id();
let request = RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: vec![AddColumn {
column_metadata: ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag1",
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
}],
},
};
let result = engine_inner
.alter_physical_region(physical_region_id, request.clone())
.await;
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Alter request to physical region is forbidden".to_string()
);
// alter logical region
let metadata_region = env.metadata_region();
let logical_region_id = env.default_logical_region_id();
let is_column_exist = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "tag1")
.await
.unwrap()
.is_some();
assert!(!is_column_exist);
let region_id = env.default_logical_region_id();
engine_inner
.alter_logical_region(region_id, request)
.await
.unwrap();
let semantic_type = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "tag1")
.await
.unwrap()
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
}
}

View File

@@ -19,7 +19,7 @@ use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
use store_api::storage::{RegionId, TableId};
use store_api::storage::RegionId;
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -35,9 +35,9 @@ pub enum Error {
location: Location,
},
#[snafu(display("Table `{}` already exists", table_id))]
TableAlreadyExists {
table_id: TableId,
#[snafu(display("Region `{}` already exists", region_id))]
RegionAlreadyExists {
region_id: RegionId,
location: Location,
},
@@ -56,11 +56,11 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse table id from {}", raw))]
ParseTableId {
#[snafu(display("Failed to parse region id from {}", raw))]
ParseRegionId {
raw: String,
#[snafu(source)]
error: <TableId as std::str::FromStr>::Err,
error: <u64 as std::str::FromStr>::Err,
location: Location,
},
@@ -91,22 +91,15 @@ pub enum Error {
#[snafu(display("Region options are conflicted"))]
ConflictRegionOption { location: Location },
// TODO: remove this
#[snafu(display("Physical table {} not found", physical_table))]
PhysicalTableNotFound {
physical_table: String,
location: Location,
},
#[snafu(display("Physical region {} not found", region_id))]
PhysicalRegionNotFound {
region_id: RegionId,
location: Location,
},
#[snafu(display("Logical table {} not found", table_id))]
LogicalTableNotFound {
table_id: TableId,
#[snafu(display("Logical region {} not found", region_id))]
LogicalRegionNotFound {
region_id: RegionId,
location: Location,
},
@@ -115,6 +108,9 @@ pub enum Error {
column_type: ConcreteDataType,
location: Location,
},
#[snafu(display("Alter request to physical region is forbidden"))]
ForbiddenPhysicalAlter { location: Location },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -129,14 +125,16 @@ impl ErrorExt for Error {
| ConflictRegionOption { .. }
| ColumnTypeMismatch { .. } => StatusCode::InvalidArguments,
ForbiddenPhysicalAlter { .. } => StatusCode::Unsupported,
MissingInternalColumn { .. }
| DeserializeSemanticType { .. }
| DecodeColumnValue { .. }
| ParseTableId { .. } => StatusCode::Unexpected,
| ParseRegionId { .. } => StatusCode::Unexpected,
PhysicalTableNotFound { .. } | LogicalTableNotFound { .. } => StatusCode::TableNotFound,
PhysicalRegionNotFound { .. } => StatusCode::RegionNotFound,
PhysicalRegionNotFound { .. } | LogicalRegionNotFound { .. } => {
StatusCode::RegionNotFound
}
CreateMitoRegion { source, .. }
| MitoReadOperation { source, .. }
@@ -144,7 +142,7 @@ impl ErrorExt for Error {
CollectRecordBatchStream { source, .. } => source.status_code(),
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
RegionAlreadyExists { .. } => StatusCode::RegionAlreadyExists,
}
}

View File

@@ -31,15 +31,15 @@ use crate::engine::{
};
use crate::error::{
CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu,
MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseTableIdSnafu, Result,
TableAlreadyExistsSnafu,
MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseRegionIdSnafu, RegionAlreadyExistsSnafu,
Result,
};
use crate::utils;
/// The other two fields key and value will be used as a k-v storage.
/// It contains two group of key (TABLE_ID refers to the logical table's id):
/// - `__table_<TABLE_ID>` is used for marking table existence. It doesn't have value.
/// - `__column_<TABLE_ID>_<COLUMN_NAME>` is used for marking column existence,
/// It contains two group of key:
/// - `__region_<LOGICAL_REGION_ID>` is used for marking table existence. It doesn't have value.
/// - `__column_<LOGICAL_REGION_ID>_<COLUMN_NAME>` is used for marking column existence,
/// the value is column's semantic type. To avoid the key conflict, this column key
/// will be encoded by base64([STANDARD_NO_PAD]).
///
@@ -48,10 +48,6 @@ use crate::utils;
/// every operation should be associated to a [RegionId], which is the physical
/// table id + region sequence. This handler will transform the region group by
/// itself.
///
/// Notice that all the `region_id` in the public interfaces refers to the
/// physical region id of metadata region. While the `table_id` refers to
/// the logical table id.
pub struct MetadataRegion {
mito: MitoEngine,
}
@@ -65,16 +61,23 @@ impl MetadataRegion {
///
/// This method will check if the table key already exists, if so, it will return
/// a [TableAlreadyExistsSnafu] error.
pub async fn add_table(&self, region_id: RegionId, table_id: TableId) -> Result<()> {
let region_id = utils::to_metadata_region_id(region_id);
let table_key = Self::concat_table_key(table_id);
pub async fn add_logical_region(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<()> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let region_key = Self::concat_region_key(logical_region_id);
let put_success = self
.put_conditionally(region_id, table_key, String::new())
.put_if_absent(region_id, region_key, String::new())
.await?;
if !put_success {
TableAlreadyExistsSnafu { table_id }.fail()
RegionAlreadyExistsSnafu {
region_id: logical_region_id,
}
.fail()
} else {
Ok(())
}
@@ -86,15 +89,15 @@ impl MetadataRegion {
/// will return if the column is successfully added.
pub async fn add_column(
&self,
region_id: RegionId,
table_id: TableId,
physical_region_id: RegionId,
logical_region_id: RegionId,
column_name: &str,
semantic_type: SemanticType,
) -> Result<bool> {
let region_id = utils::to_metadata_region_id(region_id);
let column_key = Self::concat_column_key(table_id, column_name);
let region_id = utils::to_metadata_region_id(physical_region_id);
let column_key = Self::concat_column_key(logical_region_id, column_name);
self.put_conditionally(
self.put_if_absent(
region_id,
column_key,
Self::serialize_semantic_type(semantic_type),
@@ -102,22 +105,26 @@ impl MetadataRegion {
.await
}
/// Check if the given table exists.
pub async fn is_table_exist(&self, region_id: RegionId, table_id: TableId) -> Result<bool> {
let region_id = utils::to_metadata_region_id(region_id);
let table_key = Self::concat_table_key(table_id);
self.exist(region_id, &table_key).await
/// Check if the given logical region exists.
pub async fn is_logical_region_exists(
&self,
physical_region_id: RegionId,
logical_region_id: RegionId,
) -> Result<bool> {
let region_id = utils::to_metadata_region_id(physical_region_id);
let region_key = Self::concat_region_key(logical_region_id);
self.exists(region_id, &region_key).await
}
/// Check if the given column exists. Return the semantic type if exists.
pub async fn column_semantic_type(
&self,
region_id: RegionId,
table_id: TableId,
physical_region_id: RegionId,
logical_region_id: RegionId,
column_name: &str,
) -> Result<Option<SemanticType>> {
let region_id = utils::to_metadata_region_id(region_id);
let column_key = Self::concat_column_key(table_id, column_name);
let region_id = utils::to_metadata_region_id(physical_region_id);
let column_key = Self::concat_column_key(logical_region_id, column_name);
let semantic_type = self.get(region_id, &column_key).await?;
semantic_type
.map(|s| Self::deserialize_semantic_type(&s))
@@ -127,36 +134,37 @@ impl MetadataRegion {
// utils to concat and parse key/value
impl MetadataRegion {
pub fn concat_table_key(table_id: TableId) -> String {
format!("__table_{}", table_id)
pub fn concat_region_key(region_id: RegionId) -> String {
format!("__region_{}", region_id.as_u64())
}
/// Column name will be encoded by base64([STANDARD_NO_PAD])
pub fn concat_column_key(table_id: TableId, column_name: &str) -> String {
pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
format!("__column_{}_{}", table_id, encoded_column_name)
format!("__column_{}_{}", region_id.as_u64(), encoded_column_name)
}
pub fn parse_table_key(key: &str) -> Option<&str> {
key.strip_prefix("__table_")
pub fn parse_region_key(key: &str) -> Option<&str> {
key.strip_prefix("__region_")
}
/// Parse column key to (table_name, column_name)
pub fn parse_column_key(key: &str) -> Result<Option<(TableId, String)>> {
/// Parse column key to (logical_region_id, column_name)
pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
if let Some(stripped) = key.strip_prefix("__column_") {
let mut iter = stripped.split('_');
let table_id_raw = iter.next().unwrap();
let table_id = table_id_raw
.parse()
.with_context(|_| ParseTableIdSnafu { raw: table_id_raw })?;
let region_id_raw = iter.next().unwrap();
let region_id = region_id_raw
.parse::<u64>()
.with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
.into();
let encoded_column_name = iter.next().unwrap();
let column_name = STANDARD_NO_PAD
.decode(encoded_column_name)
.context(DecodeColumnValueSnafu)?;
Ok(Some((table_id, String::from_utf8(column_name).unwrap())))
Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
} else {
Ok(None)
}
@@ -179,13 +187,13 @@ impl MetadataRegion {
impl MetadataRegion {
/// Put if not exist, return if this put operation is successful (error other
/// than "key already exist" will be wrapped in [Err]).
pub async fn put_conditionally(
pub async fn put_if_absent(
&self,
region_id: RegionId,
key: String,
value: String,
) -> Result<bool> {
if self.exist(region_id, &key).await? {
if self.exists(region_id, &key).await? {
return Ok(false);
}
@@ -203,7 +211,7 @@ impl MetadataRegion {
/// Check if the given key exists.
///
/// Notice that due to mito doesn't support transaction, TOCTTOU is possible.
pub async fn exist(&self, region_id: RegionId, key: &str) -> Result<bool> {
pub async fn exists(&self, region_id: RegionId, key: &str) -> Result<bool> {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
.mito
@@ -310,40 +318,40 @@ mod test {
#[test]
fn test_concat_table_key() {
let table_id = 12934;
let expected = "__table_12934".to_string();
assert_eq!(MetadataRegion::concat_table_key(table_id), expected);
let region_id = RegionId::new(1234, 7844);
let expected = "__region_5299989651108".to_string();
assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
}
#[test]
fn test_concat_column_key() {
let table_id = 91959;
let region_id = RegionId::new(8489, 9184);
let column_name = "my_column";
let expected = "__column_91959_bXlfY29sdW1u".to_string();
let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
assert_eq!(
MetadataRegion::concat_column_key(table_id, column_name),
MetadataRegion::concat_column_key(region_id, column_name),
expected
);
}
#[test]
fn test_parse_table_key() {
let table_id = 93585;
let encoded = MetadataRegion::concat_column_key(table_id, "my_column");
assert_eq!(encoded, "__column_93585_bXlfY29sdW1u");
let region_id = RegionId::new(87474, 10607);
let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
assert_eq!(decoded, Some((table_id, "my_column".to_string())));
assert_eq!(decoded, Some((region_id, "my_column".to_string())));
}
#[test]
fn test_parse_valid_column_key() {
let table_id = 73952;
let encoded = MetadataRegion::concat_column_key(table_id, "my_column");
assert_eq!(encoded, "__column_73952_bXlfY29sdW1u");
let region_id = RegionId::new(176, 910);
let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
assert_eq!(decoded, Some((table_id, "my_column".to_string())));
assert_eq!(decoded, Some((region_id, "my_column".to_string())));
}
#[test]
@@ -396,13 +404,13 @@ mod test {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
let region_id = to_metadata_region_id(env.default_physical_region_id());
// Test inserting a new key-value pair
let key = "test_key".to_string();
let value = "test_value".to_string();
let result = metadata_region
.put_conditionally(region_id, key.clone(), value.clone())
.put_if_absent(region_id, key.clone(), value.clone())
.await;
assert!(result.is_ok());
assert!(result.unwrap());
@@ -419,7 +427,7 @@ mod test {
// Test inserting the same key-value pair again
let result = metadata_region
.put_conditionally(region_id, key.clone(), value.clone())
.put_if_absent(region_id, key.clone(), value.clone())
.await;
assert!(result.is_ok());
assert!(!result.unwrap(),);
@@ -430,11 +438,11 @@ mod test {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
let region_id = to_metadata_region_id(env.default_physical_region_id());
// Test checking for a non-existent key
let key = "test_key".to_string();
let result = metadata_region.exist(region_id, &key).await;
let result = metadata_region.exists(region_id, &key).await;
assert!(result.is_ok());
assert!(!result.unwrap());
@@ -446,7 +454,7 @@ mod test {
.handle_request(region_id, RegionRequest::Put(put_request))
.await
.unwrap();
let result = metadata_region.exist(region_id, &key).await;
let result = metadata_region.exists(region_id, &key).await;
assert!(result.is_ok());
assert!(result.unwrap(),);
}
@@ -456,7 +464,7 @@ mod test {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
let region_id = to_metadata_region_id(env.default_physical_region_id());
// Test getting a non-existent key
let key = "test_key".to_string();
@@ -478,26 +486,26 @@ mod test {
}
#[tokio::test]
async fn test_add_table() {
async fn test_add_logical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
// add one table
let table_id = 77889;
let logical_region_id = RegionId::new(196, 2333);
metadata_region
.add_table(region_id, table_id)
.add_logical_region(physical_region_id, logical_region_id)
.await
.unwrap();
assert!(metadata_region
.is_table_exist(region_id, table_id)
.is_logical_region_exists(physical_region_id, logical_region_id)
.await
.unwrap());
// add it again
assert!(metadata_region
.add_table(region_id, table_id)
.add_logical_region(physical_region_id, logical_region_id)
.await
.is_err());
}
@@ -507,29 +515,39 @@ mod test {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
let table_id = 23638;
let logical_region_id = RegionId::new(868, 8390);
let column_name = "column1";
let semantic_type = SemanticType::Tag;
metadata_region
.add_column(region_id, table_id, column_name, semantic_type)
.add_column(
physical_region_id,
logical_region_id,
column_name,
semantic_type,
)
.await
.unwrap();
let actual_semantic_type = metadata_region
.column_semantic_type(region_id, table_id, column_name)
.column_semantic_type(physical_region_id, logical_region_id, column_name)
.await
.unwrap();
assert_eq!(actual_semantic_type, Some(semantic_type));
// duplicate column won't be updated
let is_updated = metadata_region
.add_column(region_id, table_id, column_name, SemanticType::Field)
.add_column(
physical_region_id,
logical_region_id,
column_name,
SemanticType::Field,
)
.await
.unwrap();
assert!(!is_updated);
let actual_semantic_type = metadata_region
.column_semantic_type(region_id, table_id, column_name)
.column_semantic_type(physical_region_id, logical_region_id, column_name)
.await
.unwrap();
assert_eq!(actual_semantic_type, Some(semantic_type));

View File

@@ -20,17 +20,21 @@ use prometheus::*;
lazy_static! {
/// Gauge for opened regions
pub static ref PHYSICAL_REGION_COUNT: IntGauge =
register_int_gauge!("metric_physical_region_count", "metric engine physical region count").unwrap();
register_int_gauge!("metric_engine_physical_region_count", "metric engine physical region count").unwrap();
/// Gauge of columns across all opened regions
pub static ref PHYSICAL_COLUMN_COUNT: IntGauge =
register_int_gauge!("metric_physical_column_count", "metric engine physical column count").unwrap();
register_int_gauge!("metric_engine_physical_column_count", "metric engine physical column count").unwrap();
/// Gauge for opened logical regions
pub static ref LOGICAL_REGION_COUNT: IntGauge =
register_int_gauge!("metric_logical_region_count", "metric engine logical region count").unwrap();
register_int_gauge!("metric_engine_logical_region_count", "metric engine logical region count").unwrap();
/// Gauge for opened logical regions
/// Histogram for opened logical regions
pub static ref MITO_DDL_DURATION: Histogram =
register_histogram!("metric_engine_mito_ddl", "metric engine mito ddl").unwrap();
/// Counter for forbidden operations
pub static ref FORBIDDEN_OPERATION_COUNT: IntCounter =
register_int_counter!("metric_engine_forbidden_request", "metric forbidden request").unwrap();
}

View File

@@ -27,18 +27,22 @@ use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::data_region::DataRegion;
use crate::engine::{MetricEngine, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use crate::engine::{
MetricEngine, LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY,
};
use crate::metadata_region::MetadataRegion;
/// Env to test metric engine.
pub struct TestEnv {
mito_env: MitoTestEnv,
mito: MitoEngine,
metric: MetricEngine,
}
impl TestEnv {
/// Returns a new env with empty prefix for test.
pub async fn new() -> Self {
common_telemetry::init_default_ut_logging();
Self::with_prefix("").await
}
@@ -46,7 +50,12 @@ impl TestEnv {
pub async fn with_prefix(prefix: &str) -> Self {
let mut mito_env = MitoTestEnv::with_prefix(prefix);
let mito = mito_env.create_engine(MitoConfig::default()).await;
Self { mito_env, mito }
let metric = MetricEngine::new(mito.clone());
Self {
mito_env,
mito,
metric,
}
}
pub fn data_home(&self) -> String {
@@ -60,24 +69,38 @@ impl TestEnv {
}
pub fn metric(&self) -> MetricEngine {
MetricEngine::new(self.mito())
self.metric.clone()
}
/// Create regions in [MetricEngine] under [`default_region_id`](TestEnv::default_region_id)
/// Create regions in [MetricEngine] under [`default_region_id`]
/// and region dir `"test_metric_region"`.
///
/// This method will create one logical region with three columns `(ts, val, job)`
/// under [`default_logical_region_id`].
pub async fn init_metric_region(&self) {
let region_id = self.default_region_id();
let region_id = self.default_physical_region_id();
let region_create_request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
}],
column_metadatas: vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
},
ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"greptime_value",
ConcreteDataType::float64_datatype(),
false,
),
},
],
primary_key: vec![],
options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
@@ -85,7 +108,54 @@ impl TestEnv {
region_dir: "test_metric_region".to_string(),
};
// create regions
// create physical region
self.metric()
.handle_request(region_id, RegionRequest::Create(region_create_request))
.await
.unwrap();
// create logical region
let region_id = self.default_logical_region_id();
let region_create_request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
},
ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
"greptime_value",
ConcreteDataType::float64_datatype(),
false,
),
},
ColumnMetadata {
column_id: 2,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"job",
ConcreteDataType::string_datatype(),
false,
),
},
],
primary_key: vec![2],
options: [(
LOGICAL_TABLE_METADATA_KEY.to_string(),
self.default_physical_region_id().as_u64().to_string(),
)]
.into_iter()
.collect(),
region_dir: "test_metric_region_logical".to_string(),
};
self.metric()
.handle_request(region_id, RegionRequest::Create(region_create_request))
.await
@@ -100,10 +170,15 @@ impl TestEnv {
DataRegion::new(self.mito())
}
/// `RegionId::new(1, 2)`
pub fn default_region_id(&self) -> RegionId {
/// Default physical region id `RegionId::new(1, 2)`
pub fn default_physical_region_id(&self) -> RegionId {
RegionId::new(1, 2)
}
/// Default logical region id `RegionId::new(3, 2)`
pub fn default_logical_region_id(&self) -> RegionId {
RegionId::new(3, 2)
}
}
#[cfg(test)]
@@ -115,11 +190,9 @@ mod test {
#[tokio::test]
async fn create_metadata_region() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
env.init_metric_region().await;
let region_id = to_metadata_region_id(env.default_region_id());
let region_id = to_metadata_region_id(env.default_physical_region_id());
let region_dir = join_dir(&env.data_home(), "test_metric_region");
// `join_dir` doesn't suit windows path

View File

@@ -202,7 +202,7 @@ pub struct RegionOpenRequest {
pub struct RegionCloseRequest {}
/// Alter metadata of a region.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct RegionAlterRequest {
/// The version of the schema before applying the alteration.
pub schema_version: u64,
@@ -255,7 +255,7 @@ impl TryFrom<AlterRequest> for RegionAlterRequest {
}
/// Kind of the alteration.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum AlterKind {
/// Add columns to the region.
AddColumns {
@@ -342,7 +342,7 @@ impl TryFrom<alter_request::Kind> for AlterKind {
}
/// Adds a column.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct AddColumn {
/// Metadata of the column to add.
pub column_metadata: ColumnMetadata,
@@ -408,7 +408,7 @@ impl TryFrom<v1::region::AddColumn> for AddColumn {
}
/// Location to add a column.
#[derive(Debug, PartialEq, Eq)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum AddColumnLocation {
/// Add the column to the first position of columns.
First,