feat: create logical region to metric engine (#2706)

* wip: create logical region

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* change metadata region to handle table id rather than table name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement create logic region

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add some tests

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement alter logical region

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test, style and document

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* don't change default constraint

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-11-09 14:04:04 +08:00
committed by GitHub
parent 9d029f7337
commit c62ba79759
10 changed files with 686 additions and 73 deletions

2
Cargo.lock generated
View File

@@ -4794,8 +4794,10 @@ dependencies = [
"common-time",
"datafusion",
"datatypes",
"lazy_static",
"mito2",
"object-store",
"prometheus",
"serde_json",
"snafu",
"store-api",

View File

@@ -121,6 +121,10 @@ impl ColumnSchema {
self
}
/// Set default constraint.
///
/// If a default constraint exists for the column, this method will
/// validate it against the column's data type and nullability.
pub fn with_default_constraint(
mut self,
default_constraint: Option<ColumnDefaultConstraint>,
@@ -133,6 +137,12 @@ impl ColumnSchema {
Ok(self)
}
/// Set the nullablity to `true` of the column.
pub fn with_nullable_set(mut self) -> Self {
self.is_nullable = true;
self
}
/// Creates a new [`ColumnSchema`] with given metadata.
pub fn with_metadata(mut self, metadata: Metadata) -> Self {
self.metadata = metadata;

View File

@@ -16,8 +16,10 @@ common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
datatypes.workspace = true
lazy_static = "1.4"
mito2.workspace = true
object-store.workspace = true
prometheus.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true

View File

@@ -0,0 +1,208 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::SemanticType;
use common_telemetry::tracing::warn;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AddColumn, AlterKind, RegionAlterRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::error::{
ColumnTypeMismatchSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, Result,
};
use crate::metrics::MITO_DDL_DURATION;
use crate::utils;
/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
/// will handle all the data related operations across physical tables. Thus
/// every operation should be associated to a [RegionId], which is the physical
/// table id + region sequence. This handler will transform the region group by
/// itself.
pub struct DataRegion {
mito: MitoEngine,
}
impl DataRegion {
pub fn new(mito: MitoEngine) -> Self {
Self { mito }
}
/// Submit an alter request to underlying physical region.
///
/// This method will change the semantic type of those given columns.
/// [SemanticType::Tag] will become [SemanticType::Field]. The procedure framework
/// ensures there is no concurrent conflict.
///
/// Invoker don't need to set up or verify the column id. This method will adjust
/// it using underlying schema.
///
/// This method will also set the nullable marker to true.
pub async fn add_columns(
&self,
region_id: RegionId,
columns: Vec<ColumnMetadata>,
) -> Result<()> {
let region_id = utils::to_data_region_id(region_id);
// retrieve underlying version
let region_metadata = self
.mito
.get_metadata(region_id)
.await
.context(MitoReadOperationSnafu)?;
let version = region_metadata.schema_version;
// find the max column id
let new_column_id_start = 1 + region_metadata
.column_metadatas
.iter()
.map(|c| c.column_id)
.max()
.unwrap_or(0);
// overwrite semantic type
let columns = columns
.into_iter()
.enumerate()
.map(|(delta, mut c)| {
if c.semantic_type == SemanticType::Tag {
c.semantic_type = SemanticType::Field;
if !c.column_schema.data_type.is_string() {
return ColumnTypeMismatchSnafu {
column_type: c.column_schema.data_type,
}
.fail();
}
} else {
warn!(
"Column {} in region {region_id} is not a tag",
c.column_schema.name
);
};
c.column_id = new_column_id_start + delta as u32;
c.column_schema = c.column_schema.with_nullable_set();
Ok(AddColumn {
column_metadata: c,
location: None,
})
})
.collect::<Result<_>>()?;
// assemble alter request
let alter_request = RegionRequest::Alter(RegionAlterRequest {
schema_version: version,
kind: AlterKind::AddColumns { columns },
});
// submit alter request
{
let _timer = MITO_DDL_DURATION.start_timer();
self.mito
.handle_request(region_id, alter_request)
.await
.context(MitoWriteOperationSnafu)?;
}
Ok(())
}
}
#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn test_add_columns() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
env.init_metric_region().await;
let current_version = env
.mito()
.get_metadata(utils::to_data_region_id(env.default_region_id()))
.await
.unwrap()
.schema_version;
assert_eq!(current_version, 0);
let new_columns = vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag2",
ConcreteDataType::string_datatype(),
false,
),
},
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag3",
ConcreteDataType::string_datatype(),
false,
),
},
];
env.data_region()
.add_columns(env.default_region_id(), new_columns)
.await
.unwrap();
let new_metadata = env
.mito()
.get_metadata(utils::to_data_region_id(env.default_region_id()))
.await
.unwrap();
let column_names = new_metadata
.column_metadatas
.iter()
.map(|c| &c.column_schema.name)
.collect::<Vec<_>>();
let expected = vec!["greptime_timestamp", "__metric", "__tsid", "tag2", "tag3"];
assert_eq!(column_names, expected);
}
// Only string is allowed for tag column
#[tokio::test]
async fn test_add_invalid_column() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
env.init_metric_region().await;
let new_columns = vec![ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false),
}];
let result = env
.data_region()
.add_columns(env.default_region_id(), new_columns)
.await;
assert!(result.is_err());
}
}

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use api::v1::SemanticType;
@@ -20,20 +20,31 @@ use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info};
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use mito2::engine::{MitoEngine, MITO_ENGINE_NAME};
use object_store::util::join_dir;
use snafu::{ensure, ResultExt};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::region_request::{
AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionGroup, RegionId, ScanRequest};
use store_api::storage::{RegionGroup, RegionId, ScanRequest, TableId};
use tokio::sync::RwLock;
use crate::error::{CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, Result};
use crate::data_region::DataRegion;
use crate::error::{
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu,
LogicalTableNotFoundSnafu, MissingRegionOptionSnafu, PhysicalRegionNotFoundSnafu,
PhysicalTableNotFoundSnafu, Result,
};
use crate::metadata_region::MetadataRegion;
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils;
/// region group value for data region inside a metric region
@@ -59,6 +70,33 @@ pub const DATA_REGION_SUBDIR: &str = "data";
pub const METRIC_ENGINE_NAME: &str = "metric";
/// Metadata key present in the `CREATE TABLE ... WITH ()` clause. This key is
/// used to identify the table is a physical metric table. E.g.:
/// ```sql
/// CREATE TABLE physical_table (
/// ...
/// )
/// ENGINE = metric
/// WITH (
/// physical_metric_table,
/// );
/// ```
pub const PHYSICAL_TABLE_METADATA_KEY: &str = "physical_metric_table";
/// Metadata key present in the `CREATE TABLE ... WITH ()` clause. This key is
/// used to identify a logical table and associate it with a corresponding physical
/// table . E.g.:
/// ```sql
/// CREATE TABLE logical_table (
/// ...
/// )
/// ENGINE = metric
/// WITH (
/// on_physical_table = "physical_table",
/// );
/// ```
pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table";
pub struct MetricEngine {
inner: Arc<MetricEngineInner>,
}
@@ -140,18 +178,34 @@ impl RegionEngine for MetricEngine {
impl MetricEngine {
pub fn new(mito: MitoEngine) -> Self {
let metadata_region = MetadataRegion::new(mito.clone());
let data_region = DataRegion::new(mito.clone());
Self {
inner: Arc::new(MetricEngineInner { mito }),
inner: Arc::new(MetricEngineInner {
mito,
metadata_region,
data_region,
physical_tables: RwLock::default(),
physical_columns: RwLock::default(),
}),
}
}
}
struct MetricEngineInner {
mito: MitoEngine,
metadata_region: MetadataRegion,
data_region: DataRegion,
// TODO(ruihang): handle different catalog/schema
/// Map from physical table name to table id.
physical_tables: RwLock<HashMap<String, TableId>>,
/// Cache for the columns of physical regions.
/// The region id in key is the data region id.
physical_columns: RwLock<HashMap<RegionId, HashSet<String>>>,
}
impl MetricEngineInner {
/// Initialize a metric region at given region id.
/// Dispatch region creation request to physical region creation or logical
pub async fn create_region(
&self,
region_id: RegionId,
@@ -159,8 +213,31 @@ impl MetricEngineInner {
) -> Result<()> {
Self::verify_region_create_request(&request)?;
if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) {
self.create_physical_region(region_id, request).await
} else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) {
self.create_logical_region(region_id, request).await
} else {
MissingRegionOptionSnafu {}.fail()
}
}
/// Initialize a physical metric region at given region id.
async fn create_physical_region(
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> Result<()> {
let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
// TODO: workaround for now, should find another way to retrieve the
// table name.
let physical_table_name = request
.options
.get(PHYSICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?
.to_string();
// create metadata region
let create_metadata_region_request =
self.create_request_for_metadata_region(&request.region_dir);
@@ -186,11 +263,133 @@ impl MetricEngineInner {
region_type: DATA_REGION_SUBDIR,
})?;
info!("Created physical metric region {region_id:?} with table name {physical_table_name}");
PHYSICAL_REGION_COUNT.inc();
// remember this table
self.physical_tables
.write()
.await
.insert(physical_table_name, region_id.table_id());
Ok(())
}
/// Create a logical region.
///
/// Physical table and logical table can have multiple regions, and their
/// region number should be the same. Thus we can infer the physical region
/// id by simply replace the table id part in the given region id, which
/// represent the "logical region" to request.
///
/// This method will alter the data region to add columns if necessary.
///
/// If the logical region to create already exists, this method will do nothing.
async fn create_logical_region(
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> Result<()> {
// transform IDs
let physical_table_name = request
.options
.get(LOGICAL_TABLE_METADATA_KEY)
.ok_or(MissingRegionOptionSnafu {}.build())?;
let physical_table_id = *self
.physical_tables
.read()
.await
.get(physical_table_name)
.with_context(|| PhysicalTableNotFoundSnafu {
physical_table: physical_table_name,
})?;
let logical_table_id = region_id.table_id();
let physical_region_id = RegionId::new(physical_table_id, region_id.region_number());
let (data_region_id, metadata_region_id) = Self::transform_region_id(physical_region_id);
// check if the logical table already exist
if self
.metadata_region
.is_table_exist(metadata_region_id, logical_table_id)
.await?
{
info!("Create a existing logical region {region_id}. Skipped");
return Ok(());
}
// find new columns to add
let physical_columns = self.physical_columns.read().await;
let physical_columns =
physical_columns
.get(&data_region_id)
.with_context(|| PhysicalRegionNotFoundSnafu {
region_id: data_region_id,
})?;
let mut new_columns = vec![];
for col in &request.column_metadatas {
if !physical_columns.contains(&col.column_schema.name) {
new_columns.push(col.clone());
}
}
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_table_id,
new_columns,
)
.await?;
Ok(())
}
async fn add_columns_to_physical_data_region(
&self,
data_region_id: RegionId,
metadata_region_id: RegionId,
logical_table_id: TableId,
new_columns: Vec<ColumnMetadata>,
) -> Result<()> {
// alter data region
self.data_region
.add_columns(data_region_id, new_columns.clone())
.await?;
// register columns to metadata region
for col in &new_columns {
self.metadata_region
.add_column(
metadata_region_id,
logical_table_id,
&col.column_schema.name,
col.semantic_type,
)
.await?;
}
let mut physical_columns = self.physical_columns.write().await;
// safety: previous step has checked this
let mut column_set = physical_columns.get_mut(&data_region_id).unwrap();
for col in &new_columns {
column_set.insert(col.column_schema.name.clone());
}
info!("Create table {logical_table_id} leads to adding columns {new_columns:?} to physical region {data_region_id}");
PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _);
// register table to metadata region
self.metadata_region
.add_table(metadata_region_id, logical_table_id)
.await?;
info!("Created new logical table {logical_table_id} on physical region {data_region_id}");
LOGICAL_REGION_COUNT.inc();
Ok(())
}
/// Check if
/// - internal columns are not occupied
/// - required table option is present ([PHYSICAL_TABLE_METADATA_KEY] or
/// [LOGICAL_TABLE_METADATA_KEY])
fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
let name_to_index = request
.column_metadatas
@@ -213,6 +412,18 @@ impl MetricEngineInner {
}
);
// check if required table option is present
ensure!(
request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
|| request.options.contains_key(LOGICAL_TABLE_METADATA_KEY),
MissingRegionOptionSnafu {}
);
ensure!(
!(request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
&& request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)),
ConflictRegionOptionSnafu {}
);
Ok(())
}
@@ -334,8 +545,65 @@ impl MetricEngineInner {
}
}
impl MetricEngineInner {
pub async fn alter_logic_region(
&self,
region_id: RegionId,
request: RegionAlterRequest,
) -> Result<()> {
// only handle adding column
let AlterKind::AddColumns { columns } = request.kind else {
return Ok(());
};
let logical_table_id = region_id.table_id();
// check if the table exists
let metadata_region_id = utils::to_metadata_region_id(region_id);
if !self
.metadata_region
.is_table_exist(metadata_region_id, logical_table_id)
.await?
{
error!("Trying to alter an nonexistent table {logical_table_id}");
return LogicalTableNotFoundSnafu {
table_id: logical_table_id,
}
.fail();
}
let mut columns_to_add = vec![];
for col in columns {
if self
.metadata_region
.column_semantic_type(
metadata_region_id,
logical_table_id,
&col.column_metadata.column_schema.name,
)
.await?
.is_none()
{
columns_to_add.push(col.column_metadata);
}
}
let data_region_id = utils::to_data_region_id(region_id);
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
logical_table_id,
columns_to_add,
)
.await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::hash::Hash;
use super::*;
use crate::test_util::TestEnv;
@@ -400,9 +668,40 @@ mod tests {
region_dir: "test_dir".to_string(),
engine: METRIC_ENGINE_NAME.to_string(),
primary_key: vec![],
options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect(),
};
let result = MetricEngineInner::verify_region_create_request(&request);
assert!(result.is_ok());
}
#[test]
fn test_verify_region_create_request_options() {
let mut request = RegionCreateRequest {
column_metadatas: vec![],
region_dir: "test_dir".to_string(),
engine: METRIC_ENGINE_NAME.to_string(),
primary_key: vec![],
options: HashMap::new(),
};
let result = MetricEngineInner::verify_region_create_request(&request);
assert!(result.is_err());
let mut options = HashMap::new();
options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
request.options = options.clone();
let result = MetricEngineInner::verify_region_create_request(&request);
assert!(result.is_ok());
options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string());
request.options = options.clone();
let result = MetricEngineInner::verify_region_create_request(&request);
assert!(result.is_err());
options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap();
request.options = options;
let result = MetricEngineInner::verify_region_create_request(&request);
assert!(result.is_ok());
}
@@ -436,8 +735,9 @@ mod tests {
};
let env = TestEnv::new().await;
let engine = MetricEngineInner { mito: env.mito() };
let data_region_request = engine.create_request_for_data_region(&request);
let engine = MetricEngine::new(env.mito());
let engine_inner = engine.inner;
let data_region_request = engine_inner.create_request_for_data_region(&request);
assert_eq!(
data_region_request.region_dir,

View File

@@ -17,7 +17,9 @@ use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use datatypes::prelude::ConcreteDataType;
use snafu::{Location, Snafu};
use store_api::storage::{RegionId, TableId};
#[derive(Snafu)]
#[snafu(visibility(pub))]
@@ -33,9 +35,9 @@ pub enum Error {
location: Location,
},
#[snafu(display("Table `{}` already exists", table_name))]
#[snafu(display("Table `{}` already exists", table_id))]
TableAlreadyExists {
table_name: String,
table_id: TableId,
location: Location,
},
@@ -54,6 +56,14 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to parse table id from {}", raw))]
ParseTableId {
raw: String,
#[snafu(source)]
error: <TableId as std::str::FromStr>::Err,
location: Location,
},
#[snafu(display("Mito read operation fails"))]
MitoReadOperation {
source: BoxedError,
@@ -74,6 +84,37 @@ pub enum Error {
#[snafu(display("Internal column {} is reserved", column))]
InternalColumnOccupied { column: String, location: Location },
#[snafu(display("Required table option is missing"))]
MissingRegionOption { location: Location },
#[snafu(display("Region options are conflicted"))]
ConflictRegionOption { location: Location },
// TODO: remove this
#[snafu(display("Physical table {} not found", physical_table))]
PhysicalTableNotFound {
physical_table: String,
location: Location,
},
#[snafu(display("Physical region {} not found", region_id))]
PhysicalRegionNotFound {
region_id: RegionId,
location: Location,
},
#[snafu(display("Logical table {} not found", table_id))]
LogicalTableNotFound {
table_id: TableId,
location: Location,
},
#[snafu(display("Column type mismatch. Expect string, got {:?}", column_type))]
ColumnTypeMismatch {
column_type: ConcreteDataType,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -83,11 +124,19 @@ impl ErrorExt for Error {
use Error::*;
match self {
InternalColumnOccupied { .. } => StatusCode::InvalidArguments,
InternalColumnOccupied { .. }
| MissingRegionOption { .. }
| ConflictRegionOption { .. }
| ColumnTypeMismatch { .. } => StatusCode::InvalidArguments,
MissingInternalColumn { .. }
| DeserializeSemanticType { .. }
| DecodeColumnValue { .. } => StatusCode::Unexpected,
| DecodeColumnValue { .. }
| ParseTableId { .. } => StatusCode::Unexpected,
PhysicalTableNotFound { .. } | LogicalTableNotFound { .. } => StatusCode::TableNotFound,
PhysicalRegionNotFound { .. } => StatusCode::RegionNotFound,
CreateMitoRegion { source, .. }
| MitoReadOperation { source, .. }

View File

@@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod data_region;
#[allow(unused)]
pub mod engine;
pub mod error;
#[allow(unused)]
mod metadata_region;
mod metrics;
#[cfg(test)]
mod test_util;
mod utils;

View File

@@ -23,7 +23,7 @@ use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionPutRequest, RegionReadRequest};
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::{RegionId, ScanRequest, TableId};
use crate::engine::{
METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
@@ -31,14 +31,15 @@ use crate::engine::{
};
use crate::error::{
CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu,
MitoReadOperationSnafu, MitoWriteOperationSnafu, Result, TableAlreadyExistsSnafu,
MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseTableIdSnafu, Result,
TableAlreadyExistsSnafu,
};
use crate::utils;
/// The other two fields key and value will be used as a k-v storage.
/// It contains two group of key:
/// - `__table_<TABLE_NAME>` is used for marking table existence. It doesn't have value.
/// - `__column_<TABLE_NAME>_<COLUMN_NAME>` is used for marking column existence,
/// It contains two group of key (TABLE_ID refers to the logical table's id):
/// - `__table_<TABLE_ID>` is used for marking table existence. It doesn't have value.
/// - `__column_<TABLE_ID>_<COLUMN_NAME>` is used for marking column existence,
/// the value is column's semantic type. To avoid the key conflict, this column key
/// will be encoded by base64([STANDARD_NO_PAD]).
///
@@ -47,6 +48,10 @@ use crate::utils;
/// every operation should be associated to a [RegionId], which is the physical
/// table id + region sequence. This handler will transform the region group by
/// itself.
///
/// Notice that all the `region_id` in the public interfaces refers to the
/// physical region id of metadata region. While the `table_id` refers to
/// the logical table id.
pub struct MetadataRegion {
mito: MitoEngine,
}
@@ -60,16 +65,16 @@ impl MetadataRegion {
///
/// This method will check if the table key already exists, if so, it will return
/// a [TableAlreadyExistsSnafu] error.
pub async fn add_table(&self, region_id: RegionId, table_name: &str) -> Result<()> {
pub async fn add_table(&self, region_id: RegionId, table_id: TableId) -> Result<()> {
let region_id = utils::to_metadata_region_id(region_id);
let table_key = Self::concat_table_key(table_name);
let table_key = Self::concat_table_key(table_id);
let put_success = self
.put_conditionally(region_id, table_key, String::new())
.await?;
if !put_success {
TableAlreadyExistsSnafu { table_name }.fail()
TableAlreadyExistsSnafu { table_id }.fail()
} else {
Ok(())
}
@@ -82,12 +87,12 @@ impl MetadataRegion {
pub async fn add_column(
&self,
region_id: RegionId,
table_name: &str,
table_id: TableId,
column_name: &str,
semantic_type: SemanticType,
) -> Result<bool> {
let region_id = utils::to_metadata_region_id(region_id);
let column_key = Self::concat_column_key(table_name, column_name);
let column_key = Self::concat_column_key(table_id, column_name);
self.put_conditionally(
region_id,
@@ -98,9 +103,9 @@ impl MetadataRegion {
}
/// Check if the given table exists.
pub async fn is_table_exist(&self, region_id: RegionId, table_name: &str) -> Result<bool> {
pub async fn is_table_exist(&self, region_id: RegionId, table_id: TableId) -> Result<bool> {
let region_id = utils::to_metadata_region_id(region_id);
let table_key = Self::concat_table_key(table_name);
let table_key = Self::concat_table_key(table_id);
self.exist(region_id, &table_key).await
}
@@ -108,11 +113,11 @@ impl MetadataRegion {
pub async fn column_semantic_type(
&self,
region_id: RegionId,
table_name: &str,
table_id: TableId,
column_name: &str,
) -> Result<Option<SemanticType>> {
let region_id = utils::to_metadata_region_id(region_id);
let column_key = Self::concat_column_key(table_name, column_name);
let column_key = Self::concat_column_key(table_id, column_name);
let semantic_type = self.get(region_id, &column_key).await?;
semantic_type
.map(|s| Self::deserialize_semantic_type(&s))
@@ -122,14 +127,14 @@ impl MetadataRegion {
// utils to concat and parse key/value
impl MetadataRegion {
pub fn concat_table_key(table_name: &str) -> String {
format!("__table_{}", table_name)
pub fn concat_table_key(table_id: TableId) -> String {
format!("__table_{}", table_id)
}
pub fn concat_column_key(table_name: &str, column_name: &str) -> String {
let encoded_table_name = STANDARD_NO_PAD.encode(table_name);
/// Column name will be encoded by base64([STANDARD_NO_PAD])
pub fn concat_column_key(table_id: TableId, column_name: &str) -> String {
let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
format!("__column_{}_{}", encoded_table_name, encoded_column_name)
format!("__column_{}_{}", table_id, encoded_column_name)
}
pub fn parse_table_key(key: &str) -> Option<&str> {
@@ -137,23 +142,21 @@ impl MetadataRegion {
}
/// Parse column key to (table_name, column_name)
pub fn parse_column_key(key: &str) -> Result<Option<(String, String)>> {
pub fn parse_column_key(key: &str) -> Result<Option<(TableId, String)>> {
if let Some(stripped) = key.strip_prefix("__column_") {
let mut iter = stripped.split('_');
let encoded_table_name = iter.next().unwrap();
let encoded_column_name = iter.next().unwrap();
let table_name = STANDARD_NO_PAD
.decode(encoded_table_name)
.context(DecodeColumnValueSnafu)?;
let table_id_raw = iter.next().unwrap();
let table_id = table_id_raw
.parse()
.with_context(|_| ParseTableIdSnafu { raw: table_id_raw })?;
let encoded_column_name = iter.next().unwrap();
let column_name = STANDARD_NO_PAD
.decode(encoded_column_name)
.context(DecodeColumnValueSnafu)?;
Ok(Some((
String::from_utf8(table_name).unwrap(),
String::from_utf8(column_name).unwrap(),
)))
Ok(Some((table_id, String::from_utf8(column_name).unwrap())))
} else {
Ok(None)
}
@@ -307,44 +310,40 @@ mod test {
#[test]
fn test_concat_table_key() {
let table_name = "my_table";
let expected = "__table_my_table".to_string();
assert_eq!(MetadataRegion::concat_table_key(table_name), expected);
let table_id = 12934;
let expected = "__table_12934".to_string();
assert_eq!(MetadataRegion::concat_table_key(table_id), expected);
}
#[test]
fn test_concat_column_key() {
let table_name = "my_table";
let table_id = 91959;
let column_name = "my_column";
let expected = "__column_bXlfdGFibGU_bXlfY29sdW1u".to_string();
let expected = "__column_91959_bXlfY29sdW1u".to_string();
assert_eq!(
MetadataRegion::concat_column_key(table_name, column_name),
MetadataRegion::concat_column_key(table_id, column_name),
expected
);
}
#[test]
fn test_parse_table_key() {
let encoded = MetadataRegion::concat_column_key("my_table", "my_column");
assert_eq!(encoded, "__column_bXlfdGFibGU_bXlfY29sdW1u");
let table_id = 93585;
let encoded = MetadataRegion::concat_column_key(table_id, "my_column");
assert_eq!(encoded, "__column_93585_bXlfY29sdW1u");
let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
assert_eq!(
decoded,
Some(("my_table".to_string(), "my_column".to_string()))
);
assert_eq!(decoded, Some((table_id, "my_column".to_string())));
}
#[test]
fn test_parse_valid_column_key() {
let encoded = MetadataRegion::concat_column_key("my_table", "my_column");
assert_eq!(encoded, "__column_bXlfdGFibGU_bXlfY29sdW1u");
let table_id = 73952;
let encoded = MetadataRegion::concat_column_key(table_id, "my_column");
assert_eq!(encoded, "__column_73952_bXlfY29sdW1u");
let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
assert_eq!(
decoded,
Some(("my_table".to_string(), "my_column".to_string()))
);
assert_eq!(decoded, Some((table_id, "my_column".to_string())));
}
#[test]
@@ -486,19 +485,19 @@ mod test {
let region_id = to_metadata_region_id(env.default_region_id());
// add one table
let table_name = "table1";
let table_id = 77889;
metadata_region
.add_table(region_id, table_name)
.add_table(region_id, table_id)
.await
.unwrap();
assert!(metadata_region
.is_table_exist(region_id, table_name)
.is_table_exist(region_id, table_id)
.await
.unwrap());
// add it again
assert!(metadata_region
.add_table(region_id, table_name)
.add_table(region_id, table_id)
.await
.is_err());
}
@@ -510,27 +509,27 @@ mod test {
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
let table_name = "table1";
let table_id = 23638;
let column_name = "column1";
let semantic_type = SemanticType::Tag;
metadata_region
.add_column(region_id, table_name, column_name, semantic_type)
.add_column(region_id, table_id, column_name, semantic_type)
.await
.unwrap();
let actual_semantic_type = metadata_region
.column_semantic_type(region_id, table_name, column_name)
.column_semantic_type(region_id, table_id, column_name)
.await
.unwrap();
assert_eq!(actual_semantic_type, Some(semantic_type));
// duplicate column won't be updated
let is_updated = metadata_region
.add_column(region_id, table_name, column_name, SemanticType::Field)
.add_column(region_id, table_id, column_name, SemanticType::Field)
.await
.unwrap();
assert!(!is_updated);
let actual_semantic_type = metadata_region
.column_semantic_type(region_id, table_name, column_name)
.column_semantic_type(region_id, table_id, column_name)
.await
.unwrap();
assert_eq!(actual_semantic_type, Some(semantic_type));

View File

@@ -0,0 +1,36 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Internal metrics for observability.
use lazy_static::lazy_static;
use prometheus::*;
lazy_static! {
/// Gauge for opened regions
pub static ref PHYSICAL_REGION_COUNT: IntGauge =
register_int_gauge!("metric_physical_region_count", "metric engine physical region count").unwrap();
/// Gauge of columns across all opened regions
pub static ref PHYSICAL_COLUMN_COUNT: IntGauge =
register_int_gauge!("metric_physical_column_count", "metric engine physical column count").unwrap();
/// Gauge for opened logical regions
pub static ref LOGICAL_REGION_COUNT: IntGauge =
register_int_gauge!("metric_logical_region_count", "metric engine logical region count").unwrap();
/// Gauge for opened logical regions
pub static ref MITO_DDL_DURATION: Histogram =
register_histogram!("metric_engine_mito_ddl", "metric engine mito ddl").unwrap();
}

View File

@@ -14,8 +14,6 @@
//! Utilities for testing.
use std::collections::HashMap;
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
@@ -28,7 +26,8 @@ use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::engine::{MetricEngine, METRIC_ENGINE_NAME};
use crate::data_region::DataRegion;
use crate::engine::{MetricEngine, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
use crate::metadata_region::MetadataRegion;
/// Env to test metric engine.
@@ -80,7 +79,9 @@ impl TestEnv {
),
}],
primary_key: vec![],
options: HashMap::new(),
options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
.into_iter()
.collect(),
region_dir: "test_metric_region".to_string(),
};
@@ -95,6 +96,10 @@ impl TestEnv {
MetadataRegion::new(self.mito())
}
pub fn data_region(&self) -> DataRegion {
DataRegion::new(self.mito())
}
/// `RegionId::new(1, 2)`
pub fn default_region_id(&self) -> RegionId {
RegionId::new(1, 2)