feat: write logical region to metric engine (#2759)

* transform write request

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add tests for put request

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use table_id instead of metric_name

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* CR sugg.

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* define random state as const

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-11-17 15:44:11 +08:00
committed by GitHub
parent 3ab494764f
commit ac4b6cd7f0
10 changed files with 446 additions and 43 deletions

1
Cargo.lock generated
View File

@@ -4772,6 +4772,7 @@ dependencies = [
name = "metric-engine"
version = "0.4.3"
dependencies = [
"ahash 0.8.6",
"api",
"async-trait",
"base64 0.21.5",

View File

@@ -63,6 +63,7 @@ edition = "2021"
license = "Apache-2.0"
[workspace.dependencies]
ahash = { version = "0.8", features = ["compile-time-rng"] }
aquamarine = "0.3"
arrow = { version = "47.0" }
arrow-array = "47.0"

View File

@@ -5,6 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
ahash.workspace = true
api.workspace = true
async-trait.workspace = true
base64.workspace = true

View File

@@ -13,12 +13,15 @@
// limitations under the License.
use api::v1::SemanticType;
use common_query::Output;
use common_telemetry::tracing::warn;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{AddColumn, AlterKind, RegionAlterRequest, RegionRequest};
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::error::{
@@ -122,6 +125,18 @@ impl DataRegion {
Ok(())
}
pub async fn write_data(
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<Output> {
let region_id = utils::to_data_region_id(region_id);
self.mito
.handle_request(region_id, RegionRequest::Put(request))
.await
.context(MitoWriteOperationSnafu)
}
}
#[cfg(test)]
@@ -184,7 +199,7 @@ mod test {
let expected = vec![
"greptime_timestamp",
"greptime_value",
"__metric",
"__table_id",
"__tsid",
"job",
"tag2",

View File

@@ -13,9 +13,13 @@
// limitations under the License.
use std::collections::{HashMap, HashSet};
use std::hash::{BuildHasher, Hash, Hasher};
use std::sync::Arc;
use api::v1::SemanticType;
use ahash::{AHasher, RandomState};
use api::helper::to_column_data_type;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema as PbColumnSchema, Row, Rows, SemanticType};
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
@@ -31,23 +35,23 @@ use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{
AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest,
AlterKind, RegionAlterRequest, RegionCreateRequest, RegionPutRequest, RegionRequest,
};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionGroup, RegionId, ScanRequest};
use store_api::storage::{RegionGroup, RegionId, ScanRequest, TableId};
use tokio::sync::RwLock;
use crate::data_region::DataRegion;
use crate::error::{
ConflictRegionOptionSnafu, CreateMitoRegionSnafu, ForbiddenPhysicalAlterSnafu,
InternalColumnOccupiedSnafu, LogicalRegionNotFoundSnafu, MissingRegionOptionSnafu,
ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result,
ColumnNotFoundSnafu, ConflictRegionOptionSnafu, CreateMitoRegionSnafu,
ForbiddenPhysicalAlterSnafu, InternalColumnOccupiedSnafu, LogicalRegionNotFoundSnafu,
MissingRegionOptionSnafu, ParseRegionIdSnafu, PhysicalRegionNotFoundSnafu, Result,
};
use crate::metadata_region::MetadataRegion;
use crate::metrics::{
FORBIDDEN_OPERATION_COUNT, LOGICAL_REGION_COUNT, PHYSICAL_COLUMN_COUNT, PHYSICAL_REGION_COUNT,
};
use crate::utils::{self, to_data_region_id};
use crate::utils::{to_data_region_id, to_metadata_region_id};
/// region group value for data region inside a metric region
pub const METRIC_DATA_REGION_GROUP: RegionGroup = 0;
@@ -64,7 +68,7 @@ pub const METADATA_SCHEMA_KEY_COLUMN_INDEX: usize = 1;
pub const METADATA_SCHEMA_VALUE_COLUMN_INDEX: usize = 2;
/// Column name of internal column `__metric` that stores the original metric name
pub const DATA_SCHEMA_METRIC_NAME_COLUMN_NAME: &str = "__metric";
pub const DATA_SCHEMA_TABLE_ID_COLUMN_NAME: &str = "__table_id";
pub const DATA_SCHEMA_TSID_COLUMN_NAME: &str = "__tsid";
pub const METADATA_REGION_SUBDIR: &str = "metadata";
@@ -100,6 +104,9 @@ pub const PHYSICAL_TABLE_METADATA_KEY: &str = "physical_metric_table";
/// And this key will be translated to corresponding physical **REGION** id in metasrv.
pub const LOGICAL_TABLE_METADATA_KEY: &str = "on_physical_table";
/// Fixed random state for generating tsid
const RANDOM_STATE: ahash::RandomState = ahash::RandomState::with_seeds(1, 2, 3, 4);
#[derive(Clone)]
pub struct MetricEngine {
inner: Arc<MetricEngineInner>,
@@ -121,7 +128,7 @@ impl RegionEngine for MetricEngine {
request: RegionRequest,
) -> std::result::Result<Output, BoxedError> {
let result = match request {
RegionRequest::Put(_) => todo!(),
RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self
.inner
@@ -399,6 +406,16 @@ impl MetricEngineInner {
self.metadata_region
.add_logical_region(metadata_region_id, logical_region_id)
.await?;
for col in &request.column_metadatas {
self.metadata_region
.add_column(
metadata_region_id,
logical_region_id,
&col.column_schema.name,
col.semantic_type,
)
.await?;
}
// update the mapping
// Safety: previous steps ensure the physical region exist
@@ -463,9 +480,9 @@ impl MetricEngineInner {
// check if internal columns are not occupied
ensure!(
!name_to_index.contains_key(DATA_SCHEMA_METRIC_NAME_COLUMN_NAME),
!name_to_index.contains_key(DATA_SCHEMA_TABLE_ID_COLUMN_NAME),
InternalColumnOccupiedSnafu {
column: DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
column: DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
}
);
ensure!(
@@ -495,8 +512,8 @@ impl MetricEngineInner {
/// Return value: (data_region_id, metadata_region_id)
fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
(
utils::to_data_region_id(region_id),
utils::to_metadata_region_id(region_id),
to_data_region_id(region_id),
to_metadata_region_id(region_id),
)
}
@@ -581,12 +598,25 @@ impl MetricEngineInner {
});
// add internal columns
let [table_id_col, tsid_col] = Self::internal_column_metadata();
data_region_request.column_metadatas.push(table_id_col);
data_region_request.column_metadatas.push(tsid_col);
data_region_request.primary_key =
vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()];
data_region_request
}
/// Generate internal column metadata.
///
/// Return `[table_id_col, tsid_col]`
fn internal_column_metadata() -> [ColumnMetadata; 2] {
let metric_name_col = ColumnMetadata {
column_id: ReservedColumnId::metric_name(),
column_id: ReservedColumnId::table_id(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
ConcreteDataType::string_datatype(),
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
ConcreteDataType::uint32_datatype(),
false,
),
};
@@ -595,16 +625,11 @@ impl MetricEngineInner {
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
ConcreteDataType::int64_datatype(),
ConcreteDataType::uint64_datatype(),
false,
),
};
data_region_request.column_metadatas.push(metric_name_col);
data_region_request.column_metadatas.push(tsid_col);
data_region_request.primary_key =
vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()];
data_region_request
[metric_name_col, tsid_col]
}
}
@@ -646,9 +671,9 @@ impl MetricEngineInner {
return Ok(());
};
let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
let metadata_region_id = to_metadata_region_id(physical_region_id);
let mut columns_to_add = vec![];
for col in columns {
for col in &columns {
if self
.metadata_region
.column_semantic_type(
@@ -659,11 +684,12 @@ impl MetricEngineInner {
.await?
.is_none()
{
columns_to_add.push(col.column_metadata);
columns_to_add.push(col.column_metadata.clone());
}
}
let data_region_id = utils::to_data_region_id(physical_region_id);
// alter data region
let data_region_id = to_data_region_id(physical_region_id);
self.add_columns_to_physical_data_region(
data_region_id,
metadata_region_id,
@@ -672,6 +698,18 @@ impl MetricEngineInner {
)
.await?;
// register columns to logical region
for col in columns {
self.metadata_region
.add_column(
metadata_region_id,
region_id,
&col.column_metadata.column_schema.name,
col.column_metadata.semantic_type,
)
.await?;
}
Ok(())
}
@@ -687,14 +725,191 @@ impl MetricEngineInner {
}
}
impl MetricEngineInner {
/// Dispatch region put request
pub async fn put_region(
&self,
region_id: RegionId,
request: RegionPutRequest,
) -> Result<Output> {
let is_putting_physical_region = self
.state
.read()
.await
.physical_regions
.contains_key(&region_id);
if is_putting_physical_region {
info!(
"Metric region received put request {request:?} on physical region {region_id:?}"
);
FORBIDDEN_OPERATION_COUNT.inc();
ForbiddenPhysicalAlterSnafu.fail()
} else {
self.put_logical_region(region_id, request).await
}
}
async fn put_logical_region(
&self,
logical_region_id: RegionId,
mut request: RegionPutRequest,
) -> Result<Output> {
let physical_region_id = *self
.state
.read()
.await
.logical_regions
.get(&logical_region_id)
.with_context(|| LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
})?;
let data_region_id = to_data_region_id(physical_region_id);
self.verify_put_request(logical_region_id, physical_region_id, &request)
.await?;
// write to data region
// TODO: retrieve table name
self.modify_rows(logical_region_id.table_id(), &mut request.rows)?;
self.data_region.write_data(data_region_id, request).await
}
/// Verifies a put request for a logical region against its corresponding metadata region.
///
/// Includes:
/// - Check if the logical region exists
/// - Check if the columns exist
async fn verify_put_request(
&self,
logical_region_id: RegionId,
physical_region_id: RegionId,
request: &RegionPutRequest,
) -> Result<()> {
// check if the region exists
let metadata_region_id = to_metadata_region_id(physical_region_id);
if !self
.metadata_region
.is_logical_region_exists(metadata_region_id, logical_region_id)
.await?
{
error!("Trying to write to an nonexistent region {logical_region_id}");
return LogicalRegionNotFoundSnafu {
region_id: logical_region_id,
}
.fail();
}
// check if the columns exist
for col in &request.rows.schema {
if self
.metadata_region
.column_semantic_type(metadata_region_id, logical_region_id, &col.column_name)
.await?
.is_none()
{
return ColumnNotFoundSnafu {
name: col.column_name.clone(),
region_id: logical_region_id,
}
.fail();
}
}
Ok(())
}
/// Perform metric engine specific logic to incoming rows.
/// - Change the semantic type of tag columns to field
/// - Add table_id column
/// - Generate tsid
fn modify_rows(&self, table_id: TableId, rows: &mut Rows) -> Result<()> {
// gather tag column indices
let mut tag_col_indices = rows
.schema
.iter()
.enumerate()
.filter_map(|(idx, col)| {
if col.semantic_type == SemanticType::Tag as i32 {
Some((idx, col.column_name.clone()))
} else {
None
}
})
.collect::<Vec<_>>();
// generate new schema
rows.schema = rows
.schema
.clone()
.into_iter()
.map(|mut col| {
if col.semantic_type == SemanticType::Tag as i32 {
col.semantic_type = SemanticType::Field as i32;
}
col
})
.collect::<Vec<_>>();
// add table_name column
rows.schema.push(PbColumnSchema {
column_name: DATA_SCHEMA_TABLE_ID_COLUMN_NAME.to_string(),
datatype: to_column_data_type(&ConcreteDataType::uint32_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Tag as _,
});
// add tsid column
rows.schema.push(PbColumnSchema {
column_name: DATA_SCHEMA_TSID_COLUMN_NAME.to_string(),
datatype: to_column_data_type(&ConcreteDataType::uint64_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Tag as _,
});
// fill internal columns
let mut random_state = RANDOM_STATE.clone();
for row in &mut rows.rows {
Self::fill_internal_columns(&mut random_state, table_id, &tag_col_indices, row);
}
Ok(())
}
/// Fills internal columns of a row with table name and a hash of tag values.
fn fill_internal_columns(
random_state: &mut RandomState,
table_id: TableId,
tag_col_indices: &[(usize, String)],
row: &mut Row,
) {
let mut hasher = random_state.build_hasher();
for (idx, name) in tag_col_indices {
let tag = row.values[*idx].clone();
name.hash(&mut hasher);
// The type is checked before. So only null is ignored.
if let Some(ValueData::StringValue(string)) = tag.value_data {
string.hash(&mut hasher);
}
}
let hash = hasher.finish();
// fill table id and tsid
row.values.push(ValueData::U32Value(table_id).into());
row.values.push(ValueData::U64Value(hash).into());
}
}
#[cfg(test)]
mod tests {
use std::hash::Hash;
use api::v1::region::alter_request;
use store_api::region_request::AddColumn;
use super::*;
use crate::test_util::TestEnv;
use crate::test_util::{self, TestEnv};
#[test]
fn test_verify_region_create_request() {
@@ -714,8 +929,8 @@ mod tests {
column_id: 1,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
ConcreteDataType::string_datatype(),
DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
ConcreteDataType::uint32_datatype(),
false,
),
},
@@ -729,7 +944,7 @@ mod tests {
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Internal column __metric is reserved".to_string()
"Internal column __table_id is reserved".to_string()
);
// valid request
@@ -835,7 +1050,7 @@ mod tests {
assert_eq!(data_region_request.column_metadatas.len(), 4);
assert_eq!(
data_region_request.primary_key,
vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()]
vec![ReservedColumnId::table_id(), ReservedColumnId::tsid()]
);
}
@@ -896,5 +1111,82 @@ mod tests {
.unwrap()
.unwrap();
assert_eq!(semantic_type, SemanticType::Tag);
let timestamp_index = metadata_region
.column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
.await
.unwrap()
.unwrap();
assert_eq!(timestamp_index, SemanticType::Timestamp);
}
#[tokio::test]
async fn test_write_logical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();
// add columns
let logical_region_id = env.default_logical_region_id();
let columns = &["odd", "even", "Ev_En"];
let alter_request = test_util::alter_logical_region_add_tag_columns(columns);
engine
.handle_request(logical_region_id, RegionRequest::Alter(alter_request))
.await
.unwrap();
// prepare data
let schema = test_util::row_schema_with_tags(columns);
let rows = test_util::build_rows(3, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});
// write data
let Output::AffectedRows(count) = engine
.handle_request(logical_region_id, request)
.await
.unwrap()
else {
panic!()
};
assert_eq!(100, count);
}
#[tokio::test]
async fn test_write_physical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();
let physical_region_id = env.default_physical_region_id();
let schema = test_util::row_schema_with_tags(&["abc"]);
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});
engine
.handle_request(physical_region_id, request)
.await
.unwrap_err();
}
#[tokio::test]
async fn test_write_nonexist_logical_region() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let engine = env.metric();
let logical_region_id = RegionId::new(175, 8345);
let schema = test_util::row_schema_with_tags(&["def"]);
let rows = test_util::build_rows(1, 100);
let request = RegionRequest::Put(RegionPutRequest {
rows: Rows { schema, rows },
});
engine
.handle_request(logical_region_id, request)
.await
.unwrap_err();
}
}

View File

@@ -109,6 +109,13 @@ pub enum Error {
location: Location,
},
#[snafu(display("Column {} not found in logical region {}", name, region_id))]
ColumnNotFound {
name: String,
region_id: RegionId,
location: Location,
},
#[snafu(display("Alter request to physical region is forbidden"))]
ForbiddenPhysicalAlter { location: Location },
}
@@ -136,6 +143,8 @@ impl ErrorExt for Error {
StatusCode::RegionNotFound
}
ColumnNotFound { .. } => StatusCode::TableColumnNotFound,
CreateMitoRegion { source, .. }
| MitoReadOperation { source, .. }
| MitoWriteOperation { source, .. } => source.status_code(),

View File

@@ -14,7 +14,9 @@
//! Utilities for testing.
use api::v1::SemanticType;
use api::helper::to_column_data_type;
use api::v1::value::ValueData;
use api::v1::{ColumnSchema as PbColumnSchema, Row, SemanticType, Value};
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::config::MitoConfig;
@@ -23,7 +25,9 @@ use mito2::test_util::TestEnv as MitoTestEnv;
use object_store::util::join_dir;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::region_request::{
AddColumn, AlterKind, RegionAlterRequest, RegionCreateRequest, RegionRequest,
};
use store_api::storage::RegionId;
use crate::data_region::DataRegion;
@@ -181,6 +185,87 @@ impl TestEnv {
}
}
/// Generate a [RegionAlterRequest] for adding tag columns.
pub fn alter_logical_region_add_tag_columns(new_tags: &[&str]) -> RegionAlterRequest {
let mut new_columns = vec![];
for (i, tag) in new_tags.iter().enumerate() {
new_columns.push(AddColumn {
column_metadata: ColumnMetadata {
column_id: i as u32,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
tag.to_string(),
ConcreteDataType::string_datatype(),
false,
),
},
location: None,
});
}
RegionAlterRequest {
schema_version: 0,
kind: AlterKind::AddColumns {
columns: new_columns,
},
}
}
/// Generate a row schema with given tag columns.
///
/// The result will also contains default timestamp and value column at beginning.
pub fn row_schema_with_tags(tags: &[&str]) -> Vec<PbColumnSchema> {
let mut schema = vec![
PbColumnSchema {
column_name: "greptime_timestamp".to_string(),
datatype: to_column_data_type(&ConcreteDataType::timestamp_millisecond_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Timestamp as _,
},
PbColumnSchema {
column_name: "greptime_value".to_string(),
datatype: to_column_data_type(&ConcreteDataType::float64_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Field as _,
},
];
for tag in tags {
schema.push(PbColumnSchema {
column_name: tag.to_string(),
datatype: to_column_data_type(&ConcreteDataType::string_datatype())
.unwrap()
.into(),
semantic_type: SemanticType::Tag as _,
});
}
schema
}
/// Build [Rows] for assembling [RegionPutRequest](store_api::region_request::RegionPutRequest).
///
/// The schema is generated by [row_schema_with_tags].
pub fn build_rows(num_tags: usize, num_rows: usize) -> Vec<Row> {
let mut rows = vec![];
for i in 0..num_rows {
let mut values = vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(i as _)),
},
Value {
value_data: Some(ValueData::F64Value(i as f64)),
},
];
for j in 0..num_tags {
values.push(Value {
value_data: Some(ValueData::StringValue(format!("tag_{}", j))),
});
}
rows.push(Row { values });
}
rows
}
#[cfg(test)]
mod test {

View File

@@ -25,8 +25,7 @@ use api::helper::{
use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value};
use common_query::Output;
use common_query::Output::AffectedRows;
use common_telemetry::tracing::log::info;
use common_telemetry::warn;
use common_telemetry::{info, warn};
use datatypes::prelude::DataType;
use prometheus::HistogramTimer;
use prost::Message;

View File

@@ -5,7 +5,7 @@ edition.workspace = true
license.workspace = true
[dependencies]
ahash = { version = "0.8", features = ["compile-time-rng"] }
ahash.workspace = true
api.workspace = true
arc-swap = "1.0"
arrow-schema.workspace = true

View File

@@ -38,7 +38,7 @@ enum ReservedColumnType {
Sequence,
OpType,
Tsid,
MetricName,
TableId,
}
/// Column id reserved by the engine.
@@ -76,11 +76,11 @@ impl ReservedColumnId {
Self::BASE | ReservedColumnType::Tsid as ColumnId
}
/// Id for storing metric name column.
/// Id for storing logical table id column.
///
/// Used by: metric engine
pub const fn metric_name() -> ColumnId {
Self::BASE | ReservedColumnType::MetricName as ColumnId
pub const fn table_id() -> ColumnId {
Self::BASE | ReservedColumnType::TableId as ColumnId
}
}