feat: implement create region request for metric engine (#2694)

* implement basic put/get/exist interfaces

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* test add_column and add_table

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* move engine test to test_util

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* verify incoming create region request

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* create data region

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy lints

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply review sugg

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-11-06 11:48:26 +08:00
committed by GitHub
parent bbaae9223a
commit 6fd04e38a3
9 changed files with 660 additions and 79 deletions

1
Cargo.lock generated
View File

@@ -4765,6 +4765,7 @@ dependencies = [
"common-telemetry",
"common-test-util",
"common-time",
"datafusion",
"datatypes",
"mito2",
"object-store",

View File

@@ -43,7 +43,7 @@ pub trait RecordBatchStream: Stream<Item = Result<RecordBatch>> {
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send>>;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OrderOption {
pub name: String,
pub options: SortOptions,

View File

@@ -14,6 +14,7 @@ common-query.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datafusion.workspace = true
datatypes.workspace = true
mito2.workspace = true
object-store.workspace = true

View File

@@ -26,13 +26,14 @@ use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use mito2::engine::{MitoEngine, MITO_ENGINE_NAME};
use object_store::util::join_dir;
use snafu::ResultExt;
use snafu::{ensure, ResultExt};
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionGroup, RegionId, ScanRequest};
use crate::error::{CreateMitoRegionSnafu, Result};
use crate::error::{CreateMitoRegionSnafu, InternalColumnOccupiedSnafu, Result};
use crate::utils;
/// region group value for data region inside a metric region
@@ -41,12 +42,20 @@ pub const METRIC_DATA_REGION_GROUP: RegionGroup = 0;
/// region group value for metadata region inside a metric region
pub const METRIC_METADATA_REGION_GROUP: RegionGroup = 1;
const METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME: &str = "ts";
const METADATA_SCHEMA_KEY_COLUMN_NAME: &str = "k";
const METADATA_SCHEMA_VALUE_COLUMN_NAME: &str = "val";
pub const METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME: &str = "ts";
pub const METADATA_SCHEMA_KEY_COLUMN_NAME: &str = "k";
pub const METADATA_SCHEMA_VALUE_COLUMN_NAME: &str = "v";
const METADATA_REGION_SUBDIR: &str = "metadata";
const DATA_REGION_SUBDIR: &str = "data";
pub const METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX: usize = 0;
pub const METADATA_SCHEMA_KEY_COLUMN_INDEX: usize = 1;
pub const METADATA_SCHEMA_VALUE_COLUMN_INDEX: usize = 2;
/// Column name of internal column `__metric` that stores the original metric name
pub const DATA_SCHEMA_METRIC_NAME_COLUMN_NAME: &str = "__metric";
pub const DATA_SCHEMA_TSID_COLUMN_NAME: &str = "__tsid";
pub const METADATA_REGION_SUBDIR: &str = "metadata";
pub const DATA_REGION_SUBDIR: &str = "data";
pub const METRIC_ENGINE_NAME: &str = "metric";
@@ -129,32 +138,32 @@ impl RegionEngine for MetricEngine {
}
}
impl MetricEngine {
pub fn new(mito: MitoEngine) -> Self {
Self {
inner: Arc::new(MetricEngineInner { mito }),
}
}
}
struct MetricEngineInner {
mito: MitoEngine,
}
impl MetricEngineInner {
/// Initialize a metric region at given region id.
pub async fn create_region(
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> Result<()> {
self.verify_region_create_request(&request)?;
Self::verify_region_create_request(&request)?;
let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
let create_data_region_request = self.create_request_for_data_region(&request);
// create metadata region
let create_metadata_region_request =
self.create_request_for_metadata_region(&request.region_dir);
// self.mito
// .handle_request(
// data_region_id,
// RegionRequest::Create(create_data_region_request),
// )
// .await
// .with_context(|_| CreateMitoRegionSnafu {
// region_type: DATA_REGION_SUBDIR,
// })?;
self.mito
.handle_request(
metadata_region_id,
@@ -165,12 +174,24 @@ impl MetricEngineInner {
region_type: METADATA_REGION_SUBDIR,
})?;
// create data region
let create_data_region_request = self.create_request_for_data_region(&request);
self.mito
.handle_request(
data_region_id,
RegionRequest::Create(create_data_region_request),
)
.await
.with_context(|_| CreateMitoRegionSnafu {
region_type: DATA_REGION_SUBDIR,
})?;
Ok(())
}
/// Check if
/// - internal columns are present
fn verify_region_create_request(&self, request: &RegionCreateRequest) -> Result<()> {
/// - internal columns are not occupied
fn verify_region_create_request(request: &RegionCreateRequest) -> Result<()> {
let name_to_index = request
.column_metadatas
.iter()
@@ -178,6 +199,20 @@ impl MetricEngineInner {
.map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
.collect::<HashMap<String, usize>>();
// check if internal columns are not occupied
ensure!(
!name_to_index.contains_key(DATA_SCHEMA_METRIC_NAME_COLUMN_NAME),
InternalColumnOccupiedSnafu {
column: DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
}
);
ensure!(
!name_to_index.contains_key(DATA_SCHEMA_TSID_COLUMN_NAME),
InternalColumnOccupiedSnafu {
column: DATA_SCHEMA_TSID_COLUMN_NAME,
}
);
Ok(())
}
@@ -197,7 +232,7 @@ impl MetricEngineInner {
pub fn create_request_for_metadata_region(&self, region_dir: &str) -> RegionCreateRequest {
// ts TIME INDEX DEFAULT 0
let timestamp_column_metadata = ColumnMetadata {
column_id: 0,
column_id: METADATA_SCHEMA_TIMESTAMP_COLUMN_INDEX as _,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
@@ -211,7 +246,7 @@ impl MetricEngineInner {
};
// key STRING PRIMARY KEY
let key_column_metadata = ColumnMetadata {
column_id: 1,
column_id: METADATA_SCHEMA_KEY_COLUMN_INDEX as _,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
METADATA_SCHEMA_KEY_COLUMN_NAME,
@@ -221,7 +256,7 @@ impl MetricEngineInner {
};
// val STRING
let value_column_metadata = ColumnMetadata {
column_id: 2,
column_id: METADATA_SCHEMA_VALUE_COLUMN_INDEX as _,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
METADATA_SCHEMA_VALUE_COLUMN_NAME,
@@ -230,6 +265,7 @@ impl MetricEngineInner {
),
};
// concat region dir
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);
RegionCreateRequest {
@@ -239,13 +275,18 @@ impl MetricEngineInner {
key_column_metadata,
value_column_metadata,
],
primary_key: vec![1],
primary_key: vec![METADATA_SCHEMA_KEY_COLUMN_INDEX as _],
options: HashMap::new(),
region_dir: metadata_region_dir,
}
}
// todo: register "tag columns" to metadata
/// Convert [RegionCreateRequest] for data region.
///
/// All tag columns in the original request will be converted to value columns.
/// Those columns real semantic type is stored in metadata region.
///
/// This will also add internal columns to the request.
pub fn create_request_for_data_region(
&self,
request: &RegionCreateRequest,
@@ -255,57 +296,157 @@ impl MetricEngineInner {
// concat region dir
data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
// todo: change semantic type and primary key
// convert semantic type
data_region_request
.column_metadatas
.iter_mut()
.for_each(|metadata| {
if metadata.semantic_type == SemanticType::Tag {
metadata.semantic_type = SemanticType::Field;
}
});
// todo: add internal column
// add internal columns
let metric_name_col = ColumnMetadata {
column_id: ReservedColumnId::metric_name(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
ConcreteDataType::string_datatype(),
false,
),
};
let tsid_col = ColumnMetadata {
column_id: ReservedColumnId::tsid(),
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_TSID_COLUMN_NAME,
ConcreteDataType::int64_datatype(),
false,
),
};
data_region_request.column_metadatas.push(metric_name_col);
data_region_request.column_metadatas.push(tsid_col);
data_region_request.primary_key =
vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()];
data_region_request
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use common_telemetry::info;
mod tests {
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn create_metadata_region() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
let mito = env.mito();
let engine = MetricEngine {
inner: Arc::new(MetricEngineInner { mito }),
};
let engine_dir = env.data_home();
let region_dir = join_dir(&engine_dir, "test_metric_region");
let region_id = RegionId::new(1, 2);
let region_create_request = RegionCreateRequest {
#[test]
fn test_verify_region_create_request() {
// internal column is occupied
let request = RegionCreateRequest {
column_metadatas: vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
},
ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
DATA_SCHEMA_METRIC_NAME_COLUMN_NAME,
ConcreteDataType::string_datatype(),
false,
),
},
],
region_dir: "test_dir".to_string(),
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![],
primary_key: vec![],
options: HashMap::new(),
region_dir: "test_metric_region".to_string(),
};
let result = MetricEngineInner::verify_region_create_request(&request);
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Internal column __metric is reserved".to_string()
);
// valid request
let request = RegionCreateRequest {
column_metadatas: vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
},
ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"column1".to_string(),
ConcreteDataType::string_datatype(),
false,
),
},
],
region_dir: "test_dir".to_string(),
engine: METRIC_ENGINE_NAME.to_string(),
primary_key: vec![],
options: HashMap::new(),
};
let result = MetricEngineInner::verify_region_create_request(&request);
assert!(result.is_ok());
}
#[tokio::test]
async fn test_create_request_for_data_region() {
let request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![
ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
},
ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
"tag",
ConcreteDataType::string_datatype(),
false,
),
},
],
primary_key: vec![0],
options: HashMap::new(),
region_dir: "test_dir".to_string(),
};
// create the region
engine
.handle_request(region_id, RegionRequest::Create(region_create_request))
.await
.unwrap();
let env = TestEnv::new().await;
let engine = MetricEngineInner { mito: env.mito() };
let data_region_request = engine.create_request_for_data_region(&request);
// assert metadata region's dir
let metadata_region_dir = join_dir(&region_dir, METADATA_REGION_SUBDIR);
let exist = tokio::fs::try_exists(region_dir).await.unwrap();
assert!(exist);
// check mito engine
let metadata_region_id = utils::to_metadata_region_id(region_id);
let result = env.mito().get_metadata(metadata_region_id).await.unwrap();
assert_eq!(
data_region_request.region_dir,
"/test_dir/data/".to_string()
);
assert_eq!(data_region_request.column_metadatas.len(), 4);
assert_eq!(
data_region_request.primary_key,
vec![ReservedColumnId::metric_name(), ReservedColumnId::tsid()]
);
}
}

View File

@@ -53,6 +53,27 @@ pub enum Error {
error: base64::DecodeError,
location: Location,
},
#[snafu(display("Mito read operation fails"))]
MitoReadOperation {
source: BoxedError,
location: Location,
},
#[snafu(display("Mito write operation fails"))]
MitoWriteOperation {
source: BoxedError,
location: Location,
},
#[snafu(display("Failed to collect record batch stream"))]
CollectRecordBatchStream {
source: common_recordbatch::error::Error,
location: Location,
},
#[snafu(display("Internal column {} is reserved", column))]
InternalColumnOccupied { column: String, location: Location },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -62,11 +83,17 @@ impl ErrorExt for Error {
use Error::*;
match self {
InternalColumnOccupied { .. } => StatusCode::InvalidArguments,
MissingInternalColumn { .. }
| DeserializeSemanticType { .. }
| DecodeColumnValue { .. } => StatusCode::Unexpected,
CreateMitoRegion { source, .. } => source.status_code(),
CreateMitoRegion { source, .. }
| MitoReadOperation { source, .. }
| MitoWriteOperation { source, .. } => source.status_code(),
CollectRecordBatchStream { source, .. } => source.status_code(),
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
}

View File

@@ -12,15 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::SemanticType;
use api::v1::value::ValueData;
use api::v1::{self, ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
use base64::engine::general_purpose::STANDARD_NO_PAD;
use base64::Engine;
use common_recordbatch::util::collect;
use datafusion::prelude::{col, lit, Expr};
use datatypes::vectors::StringVector;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::storage::RegionId;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionPutRequest, RegionReadRequest};
use store_api::storage::{RegionId, ScanRequest};
use crate::engine::{
METADATA_SCHEMA_KEY_COLUMN_NAME, METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
METADATA_SCHEMA_VALUE_COLUMN_INDEX, METADATA_SCHEMA_VALUE_COLUMN_NAME,
};
use crate::error::{
DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, Result, TableAlreadyExistsSnafu,
CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu,
MitoReadOperationSnafu, MitoWriteOperationSnafu, Result, TableAlreadyExistsSnafu,
};
use crate::utils;
@@ -41,15 +52,21 @@ pub struct MetadataRegion {
}
impl MetadataRegion {
pub fn new(mito: MitoEngine) -> Self {
Self { mito }
}
/// Add a new table key to metadata.
///
/// This method will check if the table key already exists, if so, it will return
/// a [TableAlreadyExistsSnafu] error.
pub fn add_table(&self, region_id: RegionId, table_name: &str) -> Result<()> {
pub async fn add_table(&self, region_id: RegionId, table_name: &str) -> Result<()> {
let region_id = utils::to_metadata_region_id(region_id);
let table_key = Self::concat_table_key(table_name);
let put_success = self.put_conditionally(region_id, table_key, String::new())?;
let put_success = self
.put_conditionally(region_id, table_key, String::new())
.await?;
if !put_success {
TableAlreadyExistsSnafu { table_name }.fail()
@@ -60,14 +77,15 @@ impl MetadataRegion {
/// Add a new column key to metadata.
///
/// This method won't check if the column already exists.
pub fn add_column(
/// This method won't check if the column already exists. But
/// will return if the column is successfully added.
pub async fn add_column(
&self,
region_id: RegionId,
table_name: &str,
column_name: &str,
semantic_type: SemanticType,
) -> Result<()> {
) -> Result<bool> {
let region_id = utils::to_metadata_region_id(region_id);
let column_key = Self::concat_column_key(table_name, column_name);
@@ -75,8 +93,30 @@ impl MetadataRegion {
region_id,
column_key,
Self::serialize_semantic_type(semantic_type),
)?;
Ok(())
)
.await
}
/// Check if the given table exists.
pub async fn is_table_exist(&self, region_id: RegionId, table_name: &str) -> Result<bool> {
let region_id = utils::to_metadata_region_id(region_id);
let table_key = Self::concat_table_key(table_name);
self.exist(region_id, &table_key).await
}
/// Check if the given column exists. Return the semantic type if exists.
pub async fn column_semantic_type(
&self,
region_id: RegionId,
table_name: &str,
column_name: &str,
) -> Result<Option<SemanticType>> {
let region_id = utils::to_metadata_region_id(region_id);
let column_key = Self::concat_column_key(table_name, column_name);
let semantic_type = self.get(region_id, &column_key).await?;
semantic_type
.map(|s| Self::deserialize_semantic_type(&s))
.transpose()
}
}
@@ -136,24 +176,134 @@ impl MetadataRegion {
impl MetadataRegion {
/// Put if not exist, return if this put operation is successful (error other
/// than "key already exist" will be wrapped in [Err]).
pub fn put_conditionally(
pub async fn put_conditionally(
&self,
region_id: RegionId,
key: String,
value: String,
) -> Result<bool> {
todo!()
if self.exist(region_id, &key).await? {
return Ok(false);
}
let put_request = Self::build_put_request(&key, &value);
self.mito
.handle_request(
region_id,
store_api::region_request::RegionRequest::Put(put_request),
)
.await
.context(MitoWriteOperationSnafu)?;
Ok(true)
}
/// Check if the given key exists.
pub fn exist(&self, region_id: RegionId, key: &str) -> Result<bool> {
todo!()
///
/// Notice that due to mito doesn't support transaction, TOCTTOU is possible.
pub async fn exist(&self, region_id: RegionId, key: &str) -> Result<bool> {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
.mito
.handle_query(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let scan_result = collect(record_batch_stream)
.await
.context(CollectRecordBatchStreamSnafu)?;
let exist = !scan_result.is_empty() && scan_result.first().unwrap().num_rows() != 0;
Ok(exist)
}
/// Retrieves the value associated with the given key in the specified region.
/// Returns `Ok(None)` if the key is not found.
pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
let scan_req = Self::build_read_request(key);
let record_batch_stream = self
.mito
.handle_query(region_id, scan_req)
.await
.context(MitoReadOperationSnafu)?;
let mut scan_result = collect(record_batch_stream)
.await
.context(CollectRecordBatchStreamSnafu)?;
let Some(first_batch) = scan_result.first() else {
return Ok(None);
};
let val = first_batch
.column(0)
.get_ref(0)
.as_string()
.unwrap()
.map(|s| s.to_string());
Ok(val)
}
/// Builds a [ScanRequest] to read metadata for a given key.
/// The request will contains a EQ filter on the key column.
///
/// Only the value column is projected.
fn build_read_request(key: &str) -> ScanRequest {
let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).eq(lit(key));
ScanRequest {
sequence: None,
projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
filters: vec![filter_expr.into()],
output_ordering: None,
limit: None,
}
}
fn build_put_request(key: &str, value: &str) -> RegionPutRequest {
let cols = vec![
ColumnSchema {
column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
datatype: ColumnDataType::TimestampMillisecond as _,
semantic_type: SemanticType::Timestamp as _,
},
ColumnSchema {
column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Tag as _,
},
ColumnSchema {
column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
datatype: ColumnDataType::String as _,
semantic_type: SemanticType::Field as _,
},
];
let rows = Rows {
schema: cols,
rows: vec![Row {
values: vec![
Value {
value_data: Some(ValueData::TimestampMillisecondValue(0)),
},
Value {
value_data: Some(ValueData::StringValue(key.to_string())),
},
Value {
value_data: Some(ValueData::StringValue(value.to_string())),
},
],
}],
};
RegionPutRequest { rows }
}
}
#[cfg(test)]
mod test {
use store_api::region_request::RegionRequest;
use super::*;
use crate::test_util::TestEnv;
use crate::utils::to_metadata_region_id;
#[test]
fn test_concat_table_key() {
@@ -226,4 +376,163 @@ mod test {
let semantic_type = "\"InvalidType\"";
assert!(MetadataRegion::deserialize_semantic_type(semantic_type).is_err());
}
#[test]
fn test_build_read_request() {
let key = "test_key";
let expected_filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).eq(lit(key));
let expected_scan_request = ScanRequest {
sequence: None,
projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
filters: vec![expected_filter_expr.into()],
output_ordering: None,
limit: None,
};
let actual_scan_request = MetadataRegion::build_read_request(key);
assert_eq!(actual_scan_request, expected_scan_request);
}
#[tokio::test]
async fn test_put_conditionally() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
// Test inserting a new key-value pair
let key = "test_key".to_string();
let value = "test_value".to_string();
let result = metadata_region
.put_conditionally(region_id, key.clone(), value.clone())
.await;
assert!(result.is_ok());
assert!(result.unwrap());
// Verify that the key-value pair was actually inserted
let scan_req = MetadataRegion::build_read_request("test_key");
let record_batch_stream = metadata_region
.mito
.handle_query(region_id, scan_req)
.await
.unwrap();
let scan_result = collect(record_batch_stream).await.unwrap();
assert_eq!(scan_result.len(), 1);
// Test inserting the same key-value pair again
let result = metadata_region
.put_conditionally(region_id, key.clone(), value.clone())
.await;
assert!(result.is_ok());
assert!(!result.unwrap(),);
}
#[tokio::test]
async fn test_exist() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
// Test checking for a non-existent key
let key = "test_key".to_string();
let result = metadata_region.exist(region_id, &key).await;
assert!(result.is_ok());
assert!(!result.unwrap());
// Test inserting a key and then checking for its existence
let value = "test_value".to_string();
let put_request = MetadataRegion::build_put_request(&key, &value);
metadata_region
.mito
.handle_request(region_id, RegionRequest::Put(put_request))
.await
.unwrap();
let result = metadata_region.exist(region_id, &key).await;
assert!(result.is_ok());
assert!(result.unwrap(),);
}
#[tokio::test]
async fn test_get() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
// Test getting a non-existent key
let key = "test_key".to_string();
let result = metadata_region.get(region_id, &key).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), None);
// Test inserting a key and then getting its value
let value = "test_value".to_string();
let put_request = MetadataRegion::build_put_request(&key, &value);
metadata_region
.mito
.handle_request(region_id, RegionRequest::Put(put_request))
.await
.unwrap();
let result = metadata_region.get(region_id, &key).await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), Some(value));
}
#[tokio::test]
async fn test_add_table() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
// add one table
let table_name = "table1";
metadata_region
.add_table(region_id, table_name)
.await
.unwrap();
assert!(metadata_region
.is_table_exist(region_id, table_name)
.await
.unwrap());
// add it again
assert!(metadata_region
.add_table(region_id, table_name)
.await
.is_err());
}
#[tokio::test]
async fn test_add_column() {
let env = TestEnv::new().await;
env.init_metric_region().await;
let metadata_region = env.metadata_region();
let region_id = to_metadata_region_id(env.default_region_id());
let table_name = "table1";
let column_name = "column1";
let semantic_type = SemanticType::Tag;
metadata_region
.add_column(region_id, table_name, column_name, semantic_type)
.await
.unwrap();
let actual_semantic_type = metadata_region
.column_semantic_type(region_id, table_name, column_name)
.await
.unwrap();
assert_eq!(actual_semantic_type, Some(semantic_type));
// duplicate column won't be updated
let is_updated = metadata_region
.add_column(region_id, table_name, column_name, SemanticType::Field)
.await
.unwrap();
assert!(!is_updated);
let actual_semantic_type = metadata_region
.column_semantic_type(region_id, table_name, column_name)
.await
.unwrap();
assert_eq!(actual_semantic_type, Some(semantic_type));
}
}

View File

@@ -14,10 +14,22 @@
//! Utilities for testing.
use std::collections::HashMap;
use api::v1::SemanticType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use mito2::test_util::TestEnv as MitoTestEnv;
use object_store::util::join_dir;
use store_api::metadata::ColumnMetadata;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::engine::{MetricEngine, METRIC_ENGINE_NAME};
use crate::metadata_region::MetadataRegion;
/// Env to test metric engine.
pub struct TestEnv {
@@ -47,4 +59,78 @@ impl TestEnv {
pub fn mito(&self) -> MitoEngine {
self.mito.clone()
}
pub fn metric(&self) -> MetricEngine {
MetricEngine::new(self.mito())
}
/// Create regions in [MetricEngine] under [`default_region_id`](TestEnv::default_region_id)
/// and region dir `"test_metric_region"`.
pub async fn init_metric_region(&self) {
let region_id = self.default_region_id();
let region_create_request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
"greptime_timestamp",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
}],
primary_key: vec![],
options: HashMap::new(),
region_dir: "test_metric_region".to_string(),
};
// create regions
self.metric()
.handle_request(region_id, RegionRequest::Create(region_create_request))
.await
.unwrap();
}
pub fn metadata_region(&self) -> MetadataRegion {
MetadataRegion::new(self.mito())
}
/// `RegionId::new(1, 2)`
pub fn default_region_id(&self) -> RegionId {
RegionId::new(1, 2)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::engine::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
use crate::utils::{self, to_metadata_region_id};
#[tokio::test]
async fn create_metadata_region() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
env.init_metric_region().await;
let region_id = to_metadata_region_id(env.default_region_id());
let region_dir = join_dir(&env.data_home(), "test_metric_region");
// assert metadata region's dir
let metadata_region_dir = join_dir(&region_dir, METADATA_REGION_SUBDIR);
let exist = tokio::fs::try_exists(metadata_region_dir).await.unwrap();
assert!(exist);
// assert data region's dir
let data_region_dir = join_dir(&region_dir, DATA_REGION_SUBDIR);
let exist = tokio::fs::try_exists(data_region_dir).await.unwrap();
assert!(exist);
// check mito engine
let metadata_region_id = utils::to_metadata_region_id(region_id);
let _ = env.mito().get_metadata(metadata_region_id).await.unwrap();
let data_region_id = utils::to_data_region_id(region_id);
let _ = env.mito().get_metadata(data_region_id).await.unwrap();
}
}

View File

@@ -37,6 +37,8 @@ enum ReservedColumnType {
Version = 0,
Sequence,
OpType,
Tsid,
MetricName,
}
/// Column id reserved by the engine.
@@ -66,6 +68,20 @@ impl ReservedColumnId {
pub const fn op_type() -> ColumnId {
Self::BASE | ReservedColumnType::OpType as ColumnId
}
/// Id for storing TSID column.
///
/// Used by: metric engine
pub const fn tsid() -> ColumnId {
Self::BASE | ReservedColumnType::Tsid as ColumnId
}
/// Id for storing metric name column.
///
/// Used by: metric engine
pub const fn metric_name() -> ColumnId {
Self::BASE | ReservedColumnType::MetricName as ColumnId
}
}
// -----------------------------------------------------------------------------

View File

@@ -46,7 +46,7 @@ pub trait WriteRequest: Send {
fn delete(&mut self, keys: HashMap<String, VectorRef>) -> Result<(), Self::Error>;
}
#[derive(Default, Clone, Debug)]
#[derive(Default, Clone, Debug, PartialEq, Eq)]
pub struct ScanRequest {
/// Max sequence number to read, None for latest sequence.
///