From c62ba79759d62f0223168952dceed7031a69489b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 9 Nov 2023 14:04:04 +0800 Subject: [PATCH] feat: create logical region to metric engine (#2706) * wip: create logical region Signed-off-by: Ruihang Xia * change metadata region to handle table id rather than table name Signed-off-by: Ruihang Xia * implement create logic region Signed-off-by: Ruihang Xia * add some tests Signed-off-by: Ruihang Xia * implement alter logical region Signed-off-by: Ruihang Xia * test, style and document Signed-off-by: Ruihang Xia * don't change default constraint Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 2 + src/datatypes/src/schema/column_schema.rs | 10 + src/metric-engine/Cargo.toml | 2 + src/metric-engine/src/data_region.rs | 208 ++++++++++++++ src/metric-engine/src/engine.rs | 318 +++++++++++++++++++++- src/metric-engine/src/error.rs | 57 +++- src/metric-engine/src/lib.rs | 2 + src/metric-engine/src/metadata_region.rs | 111 ++++---- src/metric-engine/src/metrics.rs | 36 +++ src/metric-engine/src/test_util.rs | 13 +- 10 files changed, 686 insertions(+), 73 deletions(-) create mode 100644 src/metric-engine/src/data_region.rs create mode 100644 src/metric-engine/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index acd943183f..24199dd7f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4794,8 +4794,10 @@ dependencies = [ "common-time", "datafusion", "datatypes", + "lazy_static", "mito2", "object-store", + "prometheus", "serde_json", "snafu", "store-api", diff --git a/src/datatypes/src/schema/column_schema.rs b/src/datatypes/src/schema/column_schema.rs index 8651318118..04f715fbc1 100644 --- a/src/datatypes/src/schema/column_schema.rs +++ b/src/datatypes/src/schema/column_schema.rs @@ -121,6 +121,10 @@ impl ColumnSchema { self } + /// Set default constraint. + /// + /// If a default constraint exists for the column, this method will + /// validate it against the column's data type and nullability. pub fn with_default_constraint( mut self, default_constraint: Option, @@ -133,6 +137,12 @@ impl ColumnSchema { Ok(self) } + /// Set the nullablity to `true` of the column. + pub fn with_nullable_set(mut self) -> Self { + self.is_nullable = true; + self + } + /// Creates a new [`ColumnSchema`] with given metadata. pub fn with_metadata(mut self, metadata: Metadata) -> Self { self.metadata = metadata; diff --git a/src/metric-engine/Cargo.toml b/src/metric-engine/Cargo.toml index 65b1bdd786..acaf161d14 100644 --- a/src/metric-engine/Cargo.toml +++ b/src/metric-engine/Cargo.toml @@ -16,8 +16,10 @@ common-telemetry.workspace = true common-time.workspace = true datafusion.workspace = true datatypes.workspace = true +lazy_static = "1.4" mito2.workspace = true object-store.workspace = true +prometheus.workspace = true serde_json.workspace = true snafu.workspace = true store-api.workspace = true diff --git a/src/metric-engine/src/data_region.rs b/src/metric-engine/src/data_region.rs new file mode 100644 index 0000000000..dce8a1151c --- /dev/null +++ b/src/metric-engine/src/data_region.rs @@ -0,0 +1,208 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use api::v1::SemanticType; +use common_telemetry::tracing::warn; +use mito2::engine::MitoEngine; +use snafu::ResultExt; +use store_api::metadata::ColumnMetadata; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{AddColumn, AlterKind, RegionAlterRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::error::{ + ColumnTypeMismatchSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu, Result, +}; +use crate::metrics::MITO_DDL_DURATION; +use crate::utils; + +/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It +/// will handle all the data related operations across physical tables. Thus +/// every operation should be associated to a [RegionId], which is the physical +/// table id + region sequence. This handler will transform the region group by +/// itself. +pub struct DataRegion { + mito: MitoEngine, +} + +impl DataRegion { + pub fn new(mito: MitoEngine) -> Self { + Self { mito } + } + + /// Submit an alter request to underlying physical region. + /// + /// This method will change the semantic type of those given columns. + /// [SemanticType::Tag] will become [SemanticType::Field]. The procedure framework + /// ensures there is no concurrent conflict. + /// + /// Invoker don't need to set up or verify the column id. This method will adjust + /// it using underlying schema. + /// + /// This method will also set the nullable marker to true. + pub async fn add_columns( + &self, + region_id: RegionId, + columns: Vec, + ) -> Result<()> { + let region_id = utils::to_data_region_id(region_id); + + // retrieve underlying version + let region_metadata = self + .mito + .get_metadata(region_id) + .await + .context(MitoReadOperationSnafu)?; + let version = region_metadata.schema_version; + + // find the max column id + let new_column_id_start = 1 + region_metadata + .column_metadatas + .iter() + .map(|c| c.column_id) + .max() + .unwrap_or(0); + + // overwrite semantic type + let columns = columns + .into_iter() + .enumerate() + .map(|(delta, mut c)| { + if c.semantic_type == SemanticType::Tag { + c.semantic_type = SemanticType::Field; + if !c.column_schema.data_type.is_string() { + return ColumnTypeMismatchSnafu { + column_type: c.column_schema.data_type, + } + .fail(); + } + } else { + warn!( + "Column {} in region {region_id} is not a tag", + c.column_schema.name + ); + }; + + c.column_id = new_column_id_start + delta as u32; + + c.column_schema = c.column_schema.with_nullable_set(); + + Ok(AddColumn { + column_metadata: c, + location: None, + }) + }) + .collect::>()?; + + // assemble alter request + let alter_request = RegionRequest::Alter(RegionAlterRequest { + schema_version: version, + kind: AlterKind::AddColumns { columns }, + }); + + // submit alter request + { + let _timer = MITO_DDL_DURATION.start_timer(); + self.mito + .handle_request(region_id, alter_request) + .await + .context(MitoWriteOperationSnafu)?; + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + + use super::*; + use crate::test_util::TestEnv; + + #[tokio::test] + async fn test_add_columns() { + common_telemetry::init_default_ut_logging(); + + let env = TestEnv::new().await; + env.init_metric_region().await; + + let current_version = env + .mito() + .get_metadata(utils::to_data_region_id(env.default_region_id())) + .await + .unwrap() + .schema_version; + assert_eq!(current_version, 0); + + let new_columns = vec![ + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "tag2", + ConcreteDataType::string_datatype(), + false, + ), + }, + ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new( + "tag3", + ConcreteDataType::string_datatype(), + false, + ), + }, + ]; + env.data_region() + .add_columns(env.default_region_id(), new_columns) + .await + .unwrap(); + + let new_metadata = env + .mito() + .get_metadata(utils::to_data_region_id(env.default_region_id())) + .await + .unwrap(); + let column_names = new_metadata + .column_metadatas + .iter() + .map(|c| &c.column_schema.name) + .collect::>(); + let expected = vec!["greptime_timestamp", "__metric", "__tsid", "tag2", "tag3"]; + assert_eq!(column_names, expected); + } + + // Only string is allowed for tag column + #[tokio::test] + async fn test_add_invalid_column() { + common_telemetry::init_default_ut_logging(); + + let env = TestEnv::new().await; + env.init_metric_region().await; + + let new_columns = vec![ColumnMetadata { + column_id: 0, + semantic_type: SemanticType::Tag, + column_schema: ColumnSchema::new("tag2", ConcreteDataType::int64_datatype(), false), + }]; + let result = env + .data_region() + .add_columns(env.default_region_id(), new_columns) + .await; + assert!(result.is_err()); + } +} diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 86c41aeced..fd080d6ed6 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use api::v1::SemanticType; @@ -20,20 +20,31 @@ use async_trait::async_trait; use common_error::ext::BoxedError; use common_query::Output; use common_recordbatch::SendableRecordBatchStream; +use common_telemetry::{error, info}; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; use datatypes::value::Value; use mito2::engine::{MitoEngine, MITO_ENGINE_NAME}; use object_store::util::join_dir; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::metadata::{ColumnMetadata, RegionMetadataRef}; use store_api::region_engine::{RegionEngine, RegionRole}; -use store_api::region_request::{RegionCreateRequest, RegionRequest}; +use store_api::region_request::{ + AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest, +}; use store_api::storage::consts::ReservedColumnId; -use store_api::storage::{RegionGroup, RegionId, ScanRequest}; +use store_api::storage::{RegionGroup, RegionId, ScanRequest, TableId}; +use tokio::sync::RwLock; -use crate::error::{CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, Result}; +use crate::data_region::DataRegion; +use crate::error::{ + ConflictRegionOptionSnafu, CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, + LogicalTableNotFoundSnafu, MissingRegionOptionSnafu, PhysicalRegionNotFoundSnafu, + PhysicalTableNotFoundSnafu, Result, +}; +use crate::metadata_region::MetadataRegion; +use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT}; use crate::utils; /// region group value for data region inside a metric region @@ -59,6 +70,33 @@ pub const DATA_REGION_SUBDIR: &str = "data"; pub const METRIC_ENGINE_NAME: &str = "metric"; +/// Metadata key present in the `CREATE TABLE ... WITH ()` clause. This key is +/// used to identify the table is a physical metric table. E.g.: +/// ```sql +/// CREATE TABLE physical_table ( +/// ... +/// ) +/// ENGINE = metric +/// WITH ( +/// physical_metric_table, +/// ); +/// ``` +pub const PHYSICAL_TABLE_METADATA_KEY: &str = "physical_metric_table"; + +/// Metadata key present in the `CREATE TABLE ... WITH ()` clause. This key is +/// used to identify a logical table and associate it with a corresponding physical +/// table . E.g.: +/// ```sql +/// CREATE TABLE logical_table ( +/// ... +/// ) +/// ENGINE = metric +/// WITH ( +/// on_physical_table = "physical_table", +/// ); +/// ``` +pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table"; + pub struct MetricEngine { inner: Arc, } @@ -140,18 +178,34 @@ impl RegionEngine for MetricEngine { impl MetricEngine { pub fn new(mito: MitoEngine) -> Self { + let metadata_region = MetadataRegion::new(mito.clone()); + let data_region = DataRegion::new(mito.clone()); Self { - inner: Arc::new(MetricEngineInner { mito }), + inner: Arc::new(MetricEngineInner { + mito, + metadata_region, + data_region, + physical_tables: RwLock::default(), + physical_columns: RwLock::default(), + }), } } } struct MetricEngineInner { mito: MitoEngine, + metadata_region: MetadataRegion, + data_region: DataRegion, + // TODO(ruihang): handle different catalog/schema + /// Map from physical table name to table id. + physical_tables: RwLock>, + /// Cache for the columns of physical regions. + /// The region id in key is the data region id. + physical_columns: RwLock>>, } impl MetricEngineInner { - /// Initialize a metric region at given region id. + /// Dispatch region creation request to physical region creation or logical pub async fn create_region( &self, region_id: RegionId, @@ -159,8 +213,31 @@ impl MetricEngineInner { ) -> Result<()> { Self::verify_region_create_request(&request)?; + if request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) { + self.create_physical_region(region_id, request).await + } else if request.options.contains_key(LOGICAL_TABLE_METADATA_KEY) { + self.create_logical_region(region_id, request).await + } else { + MissingRegionOptionSnafu {}.fail() + } + } + + /// Initialize a physical metric region at given region id. + async fn create_physical_region( + &self, + region_id: RegionId, + request: RegionCreateRequest, + ) -> Result<()> { let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id); + // TODO: workaround for now, should find another way to retrieve the + // table name. + let physical_table_name = request + .options + .get(PHYSICAL_TABLE_METADATA_KEY) + .ok_or(MissingRegionOptionSnafu {}.build())? + .to_string(); + // create metadata region let create_metadata_region_request = self.create_request_for_metadata_region(&request.region_dir); @@ -186,11 +263,133 @@ impl MetricEngineInner { region_type: DATA_REGION_SUBDIR, })?; + info!("Created physical metric region {region_id:?} with table name {physical_table_name}"); + PHYSICAL_REGION_COUNT.inc(); + + // remember this table + self.physical_tables + .write() + .await + .insert(physical_table_name, region_id.table_id()); + + Ok(()) + } + + /// Create a logical region. + /// + /// Physical table and logical table can have multiple regions, and their + /// region number should be the same. Thus we can infer the physical region + /// id by simply replace the table id part in the given region id, which + /// represent the "logical region" to request. + /// + /// This method will alter the data region to add columns if necessary. + /// + /// If the logical region to create already exists, this method will do nothing. + async fn create_logical_region( + &self, + region_id: RegionId, + request: RegionCreateRequest, + ) -> Result<()> { + // transform IDs + let physical_table_name = request + .options + .get(LOGICAL_TABLE_METADATA_KEY) + .ok_or(MissingRegionOptionSnafu {}.build())?; + let physical_table_id = *self + .physical_tables + .read() + .await + .get(physical_table_name) + .with_context(|| PhysicalTableNotFoundSnafu { + physical_table: physical_table_name, + })?; + let logical_table_id = region_id.table_id(); + let physical_region_id = RegionId::new(physical_table_id, region_id.region_number()); + let (data_region_id, metadata_region_id) = Self::transform_region_id(physical_region_id); + + // check if the logical table already exist + if self + .metadata_region + .is_table_exist(metadata_region_id, logical_table_id) + .await? + { + info!("Create a existing logical region {region_id}. Skipped"); + return Ok(()); + } + + // find new columns to add + let physical_columns = self.physical_columns.read().await; + let physical_columns = + physical_columns + .get(&data_region_id) + .with_context(|| PhysicalRegionNotFoundSnafu { + region_id: data_region_id, + })?; + let mut new_columns = vec![]; + for col in &request.column_metadatas { + if !physical_columns.contains(&col.column_schema.name) { + new_columns.push(col.clone()); + } + } + + self.add_columns_to_physical_data_region( + data_region_id, + metadata_region_id, + logical_table_id, + new_columns, + ) + .await?; + + Ok(()) + } + + async fn add_columns_to_physical_data_region( + &self, + data_region_id: RegionId, + metadata_region_id: RegionId, + logical_table_id: TableId, + new_columns: Vec, + ) -> Result<()> { + // alter data region + self.data_region + .add_columns(data_region_id, new_columns.clone()) + .await?; + + // register columns to metadata region + for col in &new_columns { + self.metadata_region + .add_column( + metadata_region_id, + logical_table_id, + &col.column_schema.name, + col.semantic_type, + ) + .await?; + } + + let mut physical_columns = self.physical_columns.write().await; + // safety: previous step has checked this + let mut column_set = physical_columns.get_mut(&data_region_id).unwrap(); + for col in &new_columns { + column_set.insert(col.column_schema.name.clone()); + } + info!("Create table {logical_table_id} leads to adding columns {new_columns:?} to physical region {data_region_id}"); + PHYSICAL_COLUMN_COUNT.add(new_columns.len() as _); + + // register table to metadata region + self.metadata_region + .add_table(metadata_region_id, logical_table_id) + .await?; + info!("Created new logical table {logical_table_id} on physical region {data_region_id}"); + LOGICAL_REGION_COUNT.inc(); + Ok(()) } /// Check if /// - internal columns are not occupied + /// - required table option is present ([PHYSICAL_TABLE_METADATA_KEY] or + /// [LOGICAL_TABLE_METADATA_KEY]) fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> { let name_to_index = request .column_metadatas @@ -213,6 +412,18 @@ impl MetricEngineInner { } ); + // check if required table option is present + ensure!( + request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) + || request.options.contains_key(LOGICAL_TABLE_METADATA_KEY), + MissingRegionOptionSnafu {} + ); + ensure!( + !(request.options.contains_key(PHYSICAL_TABLE_METADATA_KEY) + && request.options.contains_key(LOGICAL_TABLE_METADATA_KEY)), + ConflictRegionOptionSnafu {} + ); + Ok(()) } @@ -334,8 +545,65 @@ impl MetricEngineInner { } } +impl MetricEngineInner { + pub async fn alter_logic_region( + &self, + region_id: RegionId, + request: RegionAlterRequest, + ) -> Result<()> { + // only handle adding column + let AlterKind::AddColumns { columns } = request.kind else { + return Ok(()); + }; + let logical_table_id = region_id.table_id(); + + // check if the table exists + let metadata_region_id = utils::to_metadata_region_id(region_id); + if !self + .metadata_region + .is_table_exist(metadata_region_id, logical_table_id) + .await? + { + error!("Trying to alter an nonexistent table {logical_table_id}"); + return LogicalTableNotFoundSnafu { + table_id: logical_table_id, + } + .fail(); + } + + let mut columns_to_add = vec![]; + for col in columns { + if self + .metadata_region + .column_semantic_type( + metadata_region_id, + logical_table_id, + &col.column_metadata.column_schema.name, + ) + .await? + .is_none() + { + columns_to_add.push(col.column_metadata); + } + } + + let data_region_id = utils::to_data_region_id(region_id); + self.add_columns_to_physical_data_region( + data_region_id, + metadata_region_id, + logical_table_id, + columns_to_add, + ) + .await?; + + Ok(()) + } +} + #[cfg(test)] mod tests { + use std::hash::Hash; + use super::*; use crate::test_util::TestEnv; @@ -400,9 +668,40 @@ mod tests { region_dir: "test_dir".to_string(), engine: METRIC_ENGINE_NAME.to_string(), primary_key: vec![], + options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] + .into_iter() + .collect(), + }; + let result = MetricEngineInner::verify_region_create_request(&request); + assert!(result.is_ok()); + } + + #[test] + fn test_verify_region_create_request_options() { + let mut request = RegionCreateRequest { + column_metadatas: vec![], + region_dir: "test_dir".to_string(), + engine: METRIC_ENGINE_NAME.to_string(), + primary_key: vec![], options: HashMap::new(), }; let result = MetricEngineInner::verify_region_create_request(&request); + assert!(result.is_err()); + + let mut options = HashMap::new(); + options.insert(PHYSICAL_TABLE_METADATA_KEY.to_string(), "value".to_string()); + request.options = options.clone(); + let result = MetricEngineInner::verify_region_create_request(&request); + assert!(result.is_ok()); + + options.insert(LOGICAL_TABLE_METADATA_KEY.to_string(), "value".to_string()); + request.options = options.clone(); + let result = MetricEngineInner::verify_region_create_request(&request); + assert!(result.is_err()); + + options.remove(PHYSICAL_TABLE_METADATA_KEY).unwrap(); + request.options = options; + let result = MetricEngineInner::verify_region_create_request(&request); assert!(result.is_ok()); } @@ -436,8 +735,9 @@ mod tests { }; let env = TestEnv::new().await; - let engine = MetricEngineInner { mito: env.mito() }; - let data_region_request = engine.create_request_for_data_region(&request); + let engine = MetricEngine::new(env.mito()); + let engine_inner = engine.inner; + let data_region_request = engine_inner.create_request_for_data_region(&request); assert_eq!( data_region_request.region_dir, diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index 6ec8c95b53..3a6c0ff29f 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -17,7 +17,9 @@ use std::any::Any; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; use common_macro::stack_trace_debug; +use datatypes::prelude::ConcreteDataType; use snafu::{Location, Snafu}; +use store_api::storage::{RegionId, TableId}; #[derive(Snafu)] #[snafu(visibility(pub))] @@ -33,9 +35,9 @@ pub enum Error { location: Location, }, - #[snafu(display("Table `{}` already exists", table_name))] + #[snafu(display("Table `{}` already exists", table_id))] TableAlreadyExists { - table_name: String, + table_id: TableId, location: Location, }, @@ -54,6 +56,14 @@ pub enum Error { location: Location, }, + #[snafu(display("Failed to parse table id from {}", raw))] + ParseTableId { + raw: String, + #[snafu(source)] + error: ::Err, + location: Location, + }, + #[snafu(display("Mito read operation fails"))] MitoReadOperation { source: BoxedError, @@ -74,6 +84,37 @@ pub enum Error { #[snafu(display("Internal column {} is reserved", column))] InternalColumnOccupied { column: String, location: Location }, + + #[snafu(display("Required table option is missing"))] + MissingRegionOption { location: Location }, + + #[snafu(display("Region options are conflicted"))] + ConflictRegionOption { location: Location }, + + // TODO: remove this + #[snafu(display("Physical table {} not found", physical_table))] + PhysicalTableNotFound { + physical_table: String, + location: Location, + }, + + #[snafu(display("Physical region {} not found", region_id))] + PhysicalRegionNotFound { + region_id: RegionId, + location: Location, + }, + + #[snafu(display("Logical table {} not found", table_id))] + LogicalTableNotFound { + table_id: TableId, + location: Location, + }, + + #[snafu(display("Column type mismatch. Expect string, got {:?}", column_type))] + ColumnTypeMismatch { + column_type: ConcreteDataType, + location: Location, + }, } pub type Result = std::result::Result; @@ -83,11 +124,19 @@ impl ErrorExt for Error { use Error::*; match self { - InternalColumnOccupied { .. } => StatusCode::InvalidArguments, + InternalColumnOccupied { .. } + | MissingRegionOption { .. } + | ConflictRegionOption { .. } + | ColumnTypeMismatch { .. } => StatusCode::InvalidArguments, MissingInternalColumn { .. } | DeserializeSemanticType { .. } - | DecodeColumnValue { .. } => StatusCode::Unexpected, + | DecodeColumnValue { .. } + | ParseTableId { .. } => StatusCode::Unexpected, + + PhysicalTableNotFound { .. } | LogicalTableNotFound { .. } => StatusCode::TableNotFound, + + PhysicalRegionNotFound { .. } => StatusCode::RegionNotFound, CreateMitoRegion { source, .. } | MitoReadOperation { source, .. } diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index 14d9aa22bb..044a8bd24c 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod data_region; #[allow(unused)] pub mod engine; pub mod error; #[allow(unused)] mod metadata_region; +mod metrics; #[cfg(test)] mod test_util; mod utils; diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 0493553ed8..2f3fa5061a 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -23,7 +23,7 @@ use mito2::engine::MitoEngine; use snafu::ResultExt; use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionPutRequest, RegionReadRequest}; -use store_api::storage::{RegionId, ScanRequest}; +use store_api::storage::{RegionId, ScanRequest, TableId}; use crate::engine::{ METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, @@ -31,14 +31,15 @@ use crate::engine::{ }; use crate::error::{ CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, - MitoReadOperationSnafu, MitoWriteOperationSnafu, Result, TableAlreadyExistsSnafu, + MitoReadOperationSnafu, MitoWriteOperationSnafu, ParseTableIdSnafu, Result, + TableAlreadyExistsSnafu, }; use crate::utils; /// The other two fields key and value will be used as a k-v storage. -/// It contains two group of key: -/// - `__table_` is used for marking table existence. It doesn't have value. -/// - `__column__` is used for marking column existence, +/// It contains two group of key (TABLE_ID refers to the logical table's id): +/// - `__table_` is used for marking table existence. It doesn't have value. +/// - `__column__` is used for marking column existence, /// the value is column's semantic type. To avoid the key conflict, this column key /// will be encoded by base64([STANDARD_NO_PAD]). /// @@ -47,6 +48,10 @@ use crate::utils; /// every operation should be associated to a [RegionId], which is the physical /// table id + region sequence. This handler will transform the region group by /// itself. +/// +/// Notice that all the `region_id` in the public interfaces refers to the +/// physical region id of metadata region. While the `table_id` refers to +/// the logical table id. pub struct MetadataRegion { mito: MitoEngine, } @@ -60,16 +65,16 @@ impl MetadataRegion { /// /// This method will check if the table key already exists, if so, it will return /// a [TableAlreadyExistsSnafu] error. - pub async fn add_table(&self, region_id: RegionId, table_name: &str) -> Result<()> { + pub async fn add_table(&self, region_id: RegionId, table_id: TableId) -> Result<()> { let region_id = utils::to_metadata_region_id(region_id); - let table_key = Self::concat_table_key(table_name); + let table_key = Self::concat_table_key(table_id); let put_success = self .put_conditionally(region_id, table_key, String::new()) .await?; if !put_success { - TableAlreadyExistsSnafu { table_name }.fail() + TableAlreadyExistsSnafu { table_id }.fail() } else { Ok(()) } @@ -82,12 +87,12 @@ impl MetadataRegion { pub async fn add_column( &self, region_id: RegionId, - table_name: &str, + table_id: TableId, column_name: &str, semantic_type: SemanticType, ) -> Result { let region_id = utils::to_metadata_region_id(region_id); - let column_key = Self::concat_column_key(table_name, column_name); + let column_key = Self::concat_column_key(table_id, column_name); self.put_conditionally( region_id, @@ -98,9 +103,9 @@ impl MetadataRegion { } /// Check if the given table exists. - pub async fn is_table_exist(&self, region_id: RegionId, table_name: &str) -> Result { + pub async fn is_table_exist(&self, region_id: RegionId, table_id: TableId) -> Result { let region_id = utils::to_metadata_region_id(region_id); - let table_key = Self::concat_table_key(table_name); + let table_key = Self::concat_table_key(table_id); self.exist(region_id, &table_key).await } @@ -108,11 +113,11 @@ impl MetadataRegion { pub async fn column_semantic_type( &self, region_id: RegionId, - table_name: &str, + table_id: TableId, column_name: &str, ) -> Result> { let region_id = utils::to_metadata_region_id(region_id); - let column_key = Self::concat_column_key(table_name, column_name); + let column_key = Self::concat_column_key(table_id, column_name); let semantic_type = self.get(region_id, &column_key).await?; semantic_type .map(|s| Self::deserialize_semantic_type(&s)) @@ -122,14 +127,14 @@ impl MetadataRegion { // utils to concat and parse key/value impl MetadataRegion { - pub fn concat_table_key(table_name: &str) -> String { - format!("__table_{}", table_name) + pub fn concat_table_key(table_id: TableId) -> String { + format!("__table_{}", table_id) } - pub fn concat_column_key(table_name: &str, column_name: &str) -> String { - let encoded_table_name = STANDARD_NO_PAD.encode(table_name); + /// Column name will be encoded by base64([STANDARD_NO_PAD]) + pub fn concat_column_key(table_id: TableId, column_name: &str) -> String { let encoded_column_name = STANDARD_NO_PAD.encode(column_name); - format!("__column_{}_{}", encoded_table_name, encoded_column_name) + format!("__column_{}_{}", table_id, encoded_column_name) } pub fn parse_table_key(key: &str) -> Option<&str> { @@ -137,23 +142,21 @@ impl MetadataRegion { } /// Parse column key to (table_name, column_name) - pub fn parse_column_key(key: &str) -> Result> { + pub fn parse_column_key(key: &str) -> Result> { if let Some(stripped) = key.strip_prefix("__column_") { let mut iter = stripped.split('_'); - let encoded_table_name = iter.next().unwrap(); - let encoded_column_name = iter.next().unwrap(); - let table_name = STANDARD_NO_PAD - .decode(encoded_table_name) - .context(DecodeColumnValueSnafu)?; + let table_id_raw = iter.next().unwrap(); + let table_id = table_id_raw + .parse() + .with_context(|_| ParseTableIdSnafu { raw: table_id_raw })?; + + let encoded_column_name = iter.next().unwrap(); let column_name = STANDARD_NO_PAD .decode(encoded_column_name) .context(DecodeColumnValueSnafu)?; - Ok(Some(( - String::from_utf8(table_name).unwrap(), - String::from_utf8(column_name).unwrap(), - ))) + Ok(Some((table_id, String::from_utf8(column_name).unwrap()))) } else { Ok(None) } @@ -307,44 +310,40 @@ mod test { #[test] fn test_concat_table_key() { - let table_name = "my_table"; - let expected = "__table_my_table".to_string(); - assert_eq!(MetadataRegion::concat_table_key(table_name), expected); + let table_id = 12934; + let expected = "__table_12934".to_string(); + assert_eq!(MetadataRegion::concat_table_key(table_id), expected); } #[test] fn test_concat_column_key() { - let table_name = "my_table"; + let table_id = 91959; let column_name = "my_column"; - let expected = "__column_bXlfdGFibGU_bXlfY29sdW1u".to_string(); + let expected = "__column_91959_bXlfY29sdW1u".to_string(); assert_eq!( - MetadataRegion::concat_column_key(table_name, column_name), + MetadataRegion::concat_column_key(table_id, column_name), expected ); } #[test] fn test_parse_table_key() { - let encoded = MetadataRegion::concat_column_key("my_table", "my_column"); - assert_eq!(encoded, "__column_bXlfdGFibGU_bXlfY29sdW1u"); + let table_id = 93585; + let encoded = MetadataRegion::concat_column_key(table_id, "my_column"); + assert_eq!(encoded, "__column_93585_bXlfY29sdW1u"); let decoded = MetadataRegion::parse_column_key(&encoded).unwrap(); - assert_eq!( - decoded, - Some(("my_table".to_string(), "my_column".to_string())) - ); + assert_eq!(decoded, Some((table_id, "my_column".to_string()))); } #[test] fn test_parse_valid_column_key() { - let encoded = MetadataRegion::concat_column_key("my_table", "my_column"); - assert_eq!(encoded, "__column_bXlfdGFibGU_bXlfY29sdW1u"); + let table_id = 73952; + let encoded = MetadataRegion::concat_column_key(table_id, "my_column"); + assert_eq!(encoded, "__column_73952_bXlfY29sdW1u"); let decoded = MetadataRegion::parse_column_key(&encoded).unwrap(); - assert_eq!( - decoded, - Some(("my_table".to_string(), "my_column".to_string())) - ); + assert_eq!(decoded, Some((table_id, "my_column".to_string()))); } #[test] @@ -486,19 +485,19 @@ mod test { let region_id = to_metadata_region_id(env.default_region_id()); // add one table - let table_name = "table1"; + let table_id = 77889; metadata_region - .add_table(region_id, table_name) + .add_table(region_id, table_id) .await .unwrap(); assert!(metadata_region - .is_table_exist(region_id, table_name) + .is_table_exist(region_id, table_id) .await .unwrap()); // add it again assert!(metadata_region - .add_table(region_id, table_name) + .add_table(region_id, table_id) .await .is_err()); } @@ -510,27 +509,27 @@ mod test { let metadata_region = env.metadata_region(); let region_id = to_metadata_region_id(env.default_region_id()); - let table_name = "table1"; + let table_id = 23638; let column_name = "column1"; let semantic_type = SemanticType::Tag; metadata_region - .add_column(region_id, table_name, column_name, semantic_type) + .add_column(region_id, table_id, column_name, semantic_type) .await .unwrap(); let actual_semantic_type = metadata_region - .column_semantic_type(region_id, table_name, column_name) + .column_semantic_type(region_id, table_id, column_name) .await .unwrap(); assert_eq!(actual_semantic_type, Some(semantic_type)); // duplicate column won't be updated let is_updated = metadata_region - .add_column(region_id, table_name, column_name, SemanticType::Field) + .add_column(region_id, table_id, column_name, SemanticType::Field) .await .unwrap(); assert!(!is_updated); let actual_semantic_type = metadata_region - .column_semantic_type(region_id, table_name, column_name) + .column_semantic_type(region_id, table_id, column_name) .await .unwrap(); assert_eq!(actual_semantic_type, Some(semantic_type)); diff --git a/src/metric-engine/src/metrics.rs b/src/metric-engine/src/metrics.rs new file mode 100644 index 0000000000..df65604c76 --- /dev/null +++ b/src/metric-engine/src/metrics.rs @@ -0,0 +1,36 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Internal metrics for observability. + +use lazy_static::lazy_static; +use prometheus::*; + +lazy_static! { + /// Gauge for opened regions + pub static ref PHYSICAL_REGION_COUNT: IntGauge = + register_int_gauge!("metric_physical_region_count", "metric engine physical region count").unwrap(); + + /// Gauge of columns across all opened regions + pub static ref PHYSICAL_COLUMN_COUNT: IntGauge = + register_int_gauge!("metric_physical_column_count", "metric engine physical column count").unwrap(); + + /// Gauge for opened logical regions + pub static ref LOGICAL_REGION_COUNT: IntGauge = + register_int_gauge!("metric_logical_region_count", "metric engine logical region count").unwrap(); + + /// Gauge for opened logical regions + pub static ref MITO_DDL_DURATION: Histogram = + register_histogram!("metric_engine_mito_ddl", "metric engine mito ddl").unwrap(); +} diff --git a/src/metric-engine/src/test_util.rs b/src/metric-engine/src/test_util.rs index fc5c5dfede..daec23a259 100644 --- a/src/metric-engine/src/test_util.rs +++ b/src/metric-engine/src/test_util.rs @@ -14,8 +14,6 @@ //! Utilities for testing. -use std::collections::HashMap; - use api::v1::SemanticType; use datatypes::prelude::ConcreteDataType; use datatypes::schema::ColumnSchema; @@ -28,7 +26,8 @@ use store_api::region_engine::RegionEngine; use store_api::region_request::{RegionCreateRequest, RegionRequest}; use store_api::storage::RegionId; -use crate::engine::{MetricEngine, METRIC_ENGINE_NAME}; +use crate::data_region::DataRegion; +use crate::engine::{MetricEngine, METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY}; use crate::metadata_region::MetadataRegion; /// Env to test metric engine. @@ -80,7 +79,9 @@ impl TestEnv { ), }], primary_key: vec![], - options: HashMap::new(), + options: [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())] + .into_iter() + .collect(), region_dir: "test_metric_region".to_string(), }; @@ -95,6 +96,10 @@ impl TestEnv { MetadataRegion::new(self.mito()) } + pub fn data_region(&self) -> DataRegion { + DataRegion::new(self.mito()) + } + /// `RegionId::new(1, 2)` pub fn default_region_id(&self) -> RegionId { RegionId::new(1, 2)