feat: Metric engine skeleton (#2687)

* metadata region logic

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix mito2 test feature gate

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* override unused warnings

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* add basic test

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* apply review sugg

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy lints

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-11-03 16:34:08 +08:00
committed by GitHub
parent 7323d727c9
commit 0dca63bc7b
18 changed files with 829 additions and 39 deletions

23
Cargo.lock generated
View File

@@ -4749,6 +4749,29 @@ dependencies = [
"meter-core",
]
[[package]]
name = "metric-engine"
version = "0.4.2"
dependencies = [
"api",
"async-trait",
"base64 0.21.5",
"common-error",
"common-macro",
"common-query",
"common-recordbatch",
"common-telemetry",
"common-test-util",
"common-time",
"datatypes",
"mito2",
"object-store",
"serde_json",
"snafu",
"store-api",
"tokio",
]
[[package]]
name = "mime"
version = "0.3.17"

View File

@@ -35,6 +35,7 @@ members = [
"src/log-store",
"src/meta-client",
"src/meta-srv",
"src/metric-engine",
"src/mito2",
"src/object-store",
"src/operator",

View File

@@ -0,0 +1,27 @@
[package]
name = "metric-engine"
version.workspace = true
edition.workspace = true
license.workspace = true
[dependencies]
api.workspace = true
async-trait.workspace = true
base64 = "0.21"
common-error.workspace = true
common-macro.workspace = true
common-query.workspace = true
common-recordbatch.workspace = true
common-telemetry.workspace = true
common-time.workspace = true
datatypes.workspace = true
mito2.workspace = true
object-store.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api.workspace = true
tokio.workspace = true
[dev-dependencies]
common-test-util.workspace = true
mito2 = { workspace = true, features = ["test"] }

View File

@@ -0,0 +1,311 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use async_trait::async_trait;
use common_error::ext::BoxedError;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use common_time::Timestamp;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::value::Value;
use mito2::engine::{MitoEngine, MITO_ENGINE_NAME};
use object_store::util::join_dir;
use snafu::ResultExt;
use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
use store_api::region_engine::{RegionEngine, RegionRole};
use store_api::region_request::{RegionCreateRequest, RegionRequest};
use store_api::storage::{RegionGroup, RegionId, ScanRequest};
use crate::error::{CreateMitoRegionSnafu, Result};
use crate::utils;
/// region group value for data region inside a metric region
pub const METRIC_DATA_REGION_GROUP: RegionGroup = 0;
/// region group value for metadata region inside a metric region
pub const METRIC_METADATA_REGION_GROUP: RegionGroup = 1;
const METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME: &str = "ts";
const METADATA_SCHEMA_KEY_COLUMN_NAME: &str = "k";
const METADATA_SCHEMA_VALUE_COLUMN_NAME: &str = "val";
const METADATA_REGION_SUBDIR: &str = "metadata";
const DATA_REGION_SUBDIR: &str = "data";
pub const METRIC_ENGINE_NAME: &str = "metric";
pub struct MetricEngine {
inner: Arc<MetricEngineInner>,
}
#[async_trait]
impl RegionEngine for MetricEngine {
/// Name of this engine
fn name(&self) -> &str {
METRIC_ENGINE_NAME
}
/// Handles request to the region.
///
/// Only query is not included, which is handled in `handle_query`
async fn handle_request(
&self,
region_id: RegionId,
request: RegionRequest,
) -> std::result::Result<Output, BoxedError> {
let result = match request {
RegionRequest::Put(_) => todo!(),
RegionRequest::Delete(_) => todo!(),
RegionRequest::Create(create) => self
.inner
.create_region(region_id, create)
.await
.map(|_| Output::AffectedRows(0)),
RegionRequest::Drop(_) => todo!(),
RegionRequest::Open(_) => todo!(),
RegionRequest::Close(_) => todo!(),
RegionRequest::Alter(_) => todo!(),
RegionRequest::Flush(_) => todo!(),
RegionRequest::Compact(_) => todo!(),
RegionRequest::Truncate(_) => todo!(),
};
result.map_err(BoxedError::new)
}
/// Handles substrait query and return a stream of record batches
async fn handle_query(
&self,
region_id: RegionId,
request: ScanRequest,
) -> std::result::Result<SendableRecordBatchStream, BoxedError> {
todo!()
}
/// Retrieves region's metadata.
async fn get_metadata(
&self,
region_id: RegionId,
) -> std::result::Result<RegionMetadataRef, BoxedError> {
todo!()
}
/// Retrieves region's disk usage.
async fn region_disk_usage(&self, region_id: RegionId) -> Option<i64> {
todo!()
}
/// Stops the engine
async fn stop(&self) -> std::result::Result<(), BoxedError> {
todo!()
}
fn set_writable(
&self,
region_id: RegionId,
writable: bool,
) -> std::result::Result<(), BoxedError> {
todo!()
}
fn role(&self, region_id: RegionId) -> Option<RegionRole> {
todo!()
}
}
struct MetricEngineInner {
mito: MitoEngine,
}
impl MetricEngineInner {
pub async fn create_region(
&self,
region_id: RegionId,
request: RegionCreateRequest,
) -> Result<()> {
self.verify_region_create_request(&request)?;
let (data_region_id, metadata_region_id) = Self::transform_region_id(region_id);
let create_data_region_request = self.create_request_for_data_region(&request);
let create_metadata_region_request =
self.create_request_for_metadata_region(&request.region_dir);
// self.mito
// .handle_request(
// data_region_id,
// RegionRequest::Create(create_data_region_request),
// )
// .await
// .with_context(|_| CreateMitoRegionSnafu {
// region_type: DATA_REGION_SUBDIR,
// })?;
self.mito
.handle_request(
metadata_region_id,
RegionRequest::Create(create_metadata_region_request),
)
.await
.with_context(|_| CreateMitoRegionSnafu {
region_type: METADATA_REGION_SUBDIR,
})?;
Ok(())
}
/// Check if
/// - internal columns are present
fn verify_region_create_request(&self, request: &RegionCreateRequest) -> Result<()> {
let name_to_index = request
.column_metadatas
.iter()
.enumerate()
.map(|(idx, metadata)| (metadata.column_schema.name.clone(), idx))
.collect::<HashMap<String, usize>>();
Ok(())
}
/// Build data region id and metadata region id from the given region id.
///
/// Return value: (data_region_id, metadata_region_id)
fn transform_region_id(region_id: RegionId) -> (RegionId, RegionId) {
(
utils::to_data_region_id(region_id),
utils::to_metadata_region_id(region_id),
)
}
/// Build [RegionCreateRequest] for metadata region
///
/// This method will append [METADATA_REGION_SUBDIR] to the given `region_dir`.
pub fn create_request_for_metadata_region(&self, region_dir: &str) -> RegionCreateRequest {
// ts TIME INDEX DEFAULT 0
let timestamp_column_metadata = ColumnMetadata {
column_id: 0,
semantic_type: SemanticType::Timestamp,
column_schema: ColumnSchema::new(
METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME,
ConcreteDataType::timestamp_millisecond_datatype(),
false,
)
.with_default_constraint(Some(datatypes::schema::ColumnDefaultConstraint::Value(
Value::Timestamp(Timestamp::new_millisecond(0)),
)))
.unwrap(),
};
// key STRING PRIMARY KEY
let key_column_metadata = ColumnMetadata {
column_id: 1,
semantic_type: SemanticType::Tag,
column_schema: ColumnSchema::new(
METADATA_SCHEMA_KEY_COLUMN_NAME,
ConcreteDataType::string_datatype(),
false,
),
};
// val STRING
let value_column_metadata = ColumnMetadata {
column_id: 2,
semantic_type: SemanticType::Field,
column_schema: ColumnSchema::new(
METADATA_SCHEMA_VALUE_COLUMN_NAME,
ConcreteDataType::string_datatype(),
true,
),
};
let metadata_region_dir = join_dir(region_dir, METADATA_REGION_SUBDIR);
RegionCreateRequest {
engine: MITO_ENGINE_NAME.to_string(),
column_metadatas: vec![
timestamp_column_metadata,
key_column_metadata,
value_column_metadata,
],
primary_key: vec![1],
options: HashMap::new(),
region_dir: metadata_region_dir,
}
}
// todo: register "tag columns" to metadata
pub fn create_request_for_data_region(
&self,
request: &RegionCreateRequest,
) -> RegionCreateRequest {
let mut data_region_request = request.clone();
// concat region dir
data_region_request.region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
// todo: change semantic type and primary key
// todo: add internal column
data_region_request
}
}
#[cfg(test)]
mod test {
use std::time::Duration;
use common_telemetry::info;
use super::*;
use crate::test_util::TestEnv;
#[tokio::test]
async fn create_metadata_region() {
common_telemetry::init_default_ut_logging();
let env = TestEnv::new().await;
let mito = env.mito();
let engine = MetricEngine {
inner: Arc::new(MetricEngineInner { mito }),
};
let engine_dir = env.data_home();
let region_dir = join_dir(&engine_dir, "test_metric_region");
let region_id = RegionId::new(1, 2);
let region_create_request = RegionCreateRequest {
engine: METRIC_ENGINE_NAME.to_string(),
column_metadatas: vec![],
primary_key: vec![],
options: HashMap::new(),
region_dir: "test_metric_region".to_string(),
};
// create the region
engine
.handle_request(region_id, RegionRequest::Create(region_create_request))
.await
.unwrap();
// assert metadata region's dir
let metadata_region_dir = join_dir(&region_dir, METADATA_REGION_SUBDIR);
let exist = tokio::fs::try_exists(region_dir).await.unwrap();
assert!(exist);
// check mito engine
let metadata_region_id = utils::to_metadata_region_id(region_id);
let result = env.mito().get_metadata(metadata_region_id).await.unwrap();
}
}

View File

@@ -0,0 +1,78 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::any::Any;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_macro::stack_trace_debug;
use snafu::{Location, Snafu};
#[derive(Snafu)]
#[snafu(visibility(pub))]
#[stack_trace_debug]
pub enum Error {
#[snafu(display("Missing internal column {} in physical metric table", column))]
MissingInternalColumn { column: String, location: Location },
#[snafu(display("Failed to create mito region, region type: {}", region_type))]
CreateMitoRegion {
region_type: String,
source: BoxedError,
location: Location,
},
#[snafu(display("Table `{}` already exists", table_name))]
TableAlreadyExists {
table_name: String,
location: Location,
},
#[snafu(display("Failed to deserialize semantic type from {}", raw))]
DeserializeSemanticType {
raw: String,
#[snafu(source)]
error: serde_json::Error,
location: Location,
},
#[snafu(display("Failed to decode base64 column value"))]
DecodeColumnValue {
#[snafu(source)]
error: base64::DecodeError,
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
impl ErrorExt for Error {
fn status_code(&self) -> StatusCode {
use Error::*;
match self {
MissingInternalColumn { .. }
| DeserializeSemanticType { .. }
| DecodeColumnValue { .. } => StatusCode::Unexpected,
CreateMitoRegion { source, .. } => source.status_code(),
TableAlreadyExists { .. } => StatusCode::TableAlreadyExists,
}
}
fn as_any(&self) -> &dyn Any {
self
}
}

View File

@@ -0,0 +1,22 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#[allow(unused)]
pub mod engine;
pub mod error;
#[allow(unused)]
mod metadata_region;
#[cfg(test)]
mod test_util;
mod utils;

View File

@@ -0,0 +1,229 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::SemanticType;
use base64::engine::general_purpose::STANDARD_NO_PAD;
use base64::Engine;
use mito2::engine::MitoEngine;
use snafu::ResultExt;
use store_api::storage::RegionId;
use crate::error::{
DecodeColumnValueSnafu, DeserializeSemanticTypeSnafu, Result, TableAlreadyExistsSnafu,
};
use crate::utils;
/// The other two fields key and value will be used as a k-v storage.
/// It contains two group of key:
/// - `__table_<TABLE_NAME>` is used for marking table existence. It doesn't have value.
/// - `__column_<TABLE_NAME>_<COLUMN_NAME>` is used for marking column existence,
/// the value is column's semantic type. To avoid the key conflict, this column key
/// will be encoded by base64([STANDARD_NO_PAD]).
///
/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
/// will handle all the metadata related operations across physical tables. Thus
/// every operation should be associated to a [RegionId], which is the physical
/// table id + region sequence. This handler will transform the region group by
/// itself.
pub struct MetadataRegion {
mito: MitoEngine,
}
impl MetadataRegion {
/// Add a new table key to metadata.
///
/// This method will check if the table key already exists, if so, it will return
/// a [TableAlreadyExistsSnafu] error.
pub fn add_table(&self, region_id: RegionId, table_name: &str) -> Result<()> {
let region_id = utils::to_metadata_region_id(region_id);
let table_key = Self::concat_table_key(table_name);
let put_success = self.put_conditionally(region_id, table_key, String::new())?;
if !put_success {
TableAlreadyExistsSnafu { table_name }.fail()
} else {
Ok(())
}
}
/// Add a new column key to metadata.
///
/// This method won't check if the column already exists.
pub fn add_column(
&self,
region_id: RegionId,
table_name: &str,
column_name: &str,
semantic_type: SemanticType,
) -> Result<()> {
let region_id = utils::to_metadata_region_id(region_id);
let column_key = Self::concat_column_key(table_name, column_name);
self.put_conditionally(
region_id,
column_key,
Self::serialize_semantic_type(semantic_type),
)?;
Ok(())
}
}
// utils to concat and parse key/value
impl MetadataRegion {
pub fn concat_table_key(table_name: &str) -> String {
format!("__table_{}", table_name)
}
pub fn concat_column_key(table_name: &str, column_name: &str) -> String {
let encoded_table_name = STANDARD_NO_PAD.encode(table_name);
let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
format!("__column_{}_{}", encoded_table_name, encoded_column_name)
}
pub fn parse_table_key(key: &str) -> Option<&str> {
key.strip_prefix("__table_")
}
/// Parse column key to (table_name, column_name)
pub fn parse_column_key(key: &str) -> Result<Option<(String, String)>> {
if let Some(stripped) = key.strip_prefix("__column_") {
let mut iter = stripped.split('_');
let encoded_table_name = iter.next().unwrap();
let encoded_column_name = iter.next().unwrap();
let table_name = STANDARD_NO_PAD
.decode(encoded_table_name)
.context(DecodeColumnValueSnafu)?;
let column_name = STANDARD_NO_PAD
.decode(encoded_column_name)
.context(DecodeColumnValueSnafu)?;
Ok(Some((
String::from_utf8(table_name).unwrap(),
String::from_utf8(column_name).unwrap(),
)))
} else {
Ok(None)
}
}
pub fn serialize_semantic_type(semantic_type: SemanticType) -> String {
serde_json::to_string(&semantic_type).unwrap()
}
pub fn deserialize_semantic_type(semantic_type: &str) -> Result<SemanticType> {
serde_json::from_str(semantic_type)
.with_context(|_| DeserializeSemanticTypeSnafu { raw: semantic_type })
}
}
// simulate to `KvBackend`
//
// methods in this block assume the given region id is transformed.
#[allow(unused_variables)]
impl MetadataRegion {
/// Put if not exist, return if this put operation is successful (error other
/// than "key already exist" will be wrapped in [Err]).
pub fn put_conditionally(
&self,
region_id: RegionId,
key: String,
value: String,
) -> Result<bool> {
todo!()
}
/// Check if the given key exists.
pub fn exist(&self, region_id: RegionId, key: &str) -> Result<bool> {
todo!()
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_concat_table_key() {
let table_name = "my_table";
let expected = "__table_my_table".to_string();
assert_eq!(MetadataRegion::concat_table_key(table_name), expected);
}
#[test]
fn test_concat_column_key() {
let table_name = "my_table";
let column_name = "my_column";
let expected = "__column_bXlfdGFibGU_bXlfY29sdW1u".to_string();
assert_eq!(
MetadataRegion::concat_column_key(table_name, column_name),
expected
);
}
#[test]
fn test_parse_table_key() {
let encoded = MetadataRegion::concat_column_key("my_table", "my_column");
assert_eq!(encoded, "__column_bXlfdGFibGU_bXlfY29sdW1u");
let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
assert_eq!(
decoded,
Some(("my_table".to_string(), "my_column".to_string()))
);
}
#[test]
fn test_parse_valid_column_key() {
let encoded = MetadataRegion::concat_column_key("my_table", "my_column");
assert_eq!(encoded, "__column_bXlfdGFibGU_bXlfY29sdW1u");
let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
assert_eq!(
decoded,
Some(("my_table".to_string(), "my_column".to_string()))
);
}
#[test]
fn test_parse_invalid_column_key() {
let key = "__column_asdfasd_????";
let result = MetadataRegion::parse_column_key(key);
assert!(result.is_err());
}
#[test]
fn test_serialize_semantic_type() {
let semantic_type = SemanticType::Tag;
let expected = "\"Tag\"".to_string();
assert_eq!(
MetadataRegion::serialize_semantic_type(semantic_type),
expected
);
}
#[test]
fn test_deserialize_semantic_type() {
let semantic_type = "\"Tag\"";
let expected = SemanticType::Tag;
assert_eq!(
MetadataRegion::deserialize_semantic_type(semantic_type).unwrap(),
expected
);
let semantic_type = "\"InvalidType\"";
assert!(MetadataRegion::deserialize_semantic_type(semantic_type).is_err());
}
}

View File

@@ -0,0 +1,50 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Utilities for testing.
use mito2::config::MitoConfig;
use mito2::engine::MitoEngine;
use mito2::test_util::TestEnv as MitoTestEnv;
use object_store::util::join_dir;
/// Env to test metric engine.
pub struct TestEnv {
mito_env: MitoTestEnv,
mito: MitoEngine,
}
impl TestEnv {
/// Returns a new env with empty prefix for test.
pub async fn new() -> Self {
Self::with_prefix("").await
}
/// Returns a new env with specific `prefix` for test.
pub async fn with_prefix(prefix: &str) -> Self {
let mut mito_env = MitoTestEnv::with_prefix(prefix);
let mito = mito_env.create_engine(MitoConfig::default()).await;
Self { mito_env, mito }
}
pub fn data_home(&self) -> String {
let env_root = self.mito_env.data_home().to_string_lossy().to_string();
join_dir(&env_root, "data")
}
/// Returns a reference to the engine.
pub fn mito(&self) -> MitoEngine {
self.mito.clone()
}
}

View File

@@ -0,0 +1,59 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use store_api::storage::RegionId;
use crate::engine::{METRIC_DATA_REGION_GROUP, METRIC_METADATA_REGION_GROUP};
/// Change the given [RegionId]'s region group to [METRIC_METADATA_REGION_GROUP].
pub fn to_metadata_region_id(region_id: RegionId) -> RegionId {
let table_id = region_id.table_id();
let region_sequence = region_id.region_sequence();
RegionId::with_group_and_seq(table_id, METRIC_METADATA_REGION_GROUP, region_sequence)
}
/// Change the given [RegionId]'s region group to [METRIC_DATA_REGION_GROUP].
pub fn to_data_region_id(region_id: RegionId) -> RegionId {
let table_id = region_id.table_id();
let region_sequence = region_id.region_sequence();
RegionId::with_group_and_seq(table_id, METRIC_DATA_REGION_GROUP, region_sequence)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_to_metadata_region_id() {
let region_id = RegionId::new(1, 2);
let expected_region_id = RegionId::with_group_and_seq(1, METRIC_METADATA_REGION_GROUP, 2);
assert_eq!(to_metadata_region_id(region_id), expected_region_id);
let region_id = RegionId::with_group_and_seq(1, 243, 2);
let expected_region_id = RegionId::with_group_and_seq(1, METRIC_METADATA_REGION_GROUP, 2);
assert_eq!(to_metadata_region_id(region_id), expected_region_id);
}
#[test]
fn test_to_data_region_id() {
let region_id = RegionId::new(1, 2);
let expected_region_id = RegionId::with_group_and_seq(1, METRIC_DATA_REGION_GROUP, 2);
assert_eq!(to_data_region_id(region_id), expected_region_id);
let region_id = RegionId::with_group_and_seq(1, 243, 2);
let expected_region_id = RegionId::with_group_and_seq(1, METRIC_DATA_REGION_GROUP, 2);
assert_eq!(to_data_region_id(region_id), expected_region_id);
}
}

View File

@@ -6,7 +6,7 @@ license.workspace = true
[features]
default = []
test = ["common-test-util"]
test = ["common-test-util", "log-store"]
[dependencies]
anymap = "1.0.0-beta.2"
@@ -39,6 +39,7 @@ datatypes = { workspace = true }
futures.workspace = true
humantime-serde = { workspace = true }
lazy_static = "1.4"
log-store = { workspace = true, optional = true }
memcomparable = "0.2"
moka = { workspace = true, features = ["sync"] }
object-store = { workspace = true }

View File

@@ -28,8 +28,8 @@ mod create_test;
mod drop_test;
#[cfg(test)]
mod flush_test;
#[cfg(test)]
pub(crate) mod listener;
#[cfg(any(test, feature = "test"))]
pub mod listener;
#[cfg(test)]
mod open_test;
#[cfg(test)]
@@ -61,6 +61,8 @@ use crate::region::RegionUsage;
use crate::request::WorkerRequest;
use crate::worker::WorkerGroup;
pub const MITO_ENGINE_NAME: &str = "mito";
/// Region engine implementation for timeseries data.
#[derive(Clone)]
pub struct MitoEngine {
@@ -200,7 +202,7 @@ impl EngineInner {
#[async_trait]
impl RegionEngine for MitoEngine {
fn name(&self) -> &str {
"mito"
MITO_ENGINE_NAME
}
async fn handle_request(
@@ -265,7 +267,7 @@ impl RegionEngine for MitoEngine {
}
// Tests methods.
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
impl MitoEngine {
/// Returns a new [MitoEngine] for tests.
pub fn new_for_test<S: LogStore>(

View File

@@ -72,7 +72,7 @@ async fn test_manual_flush() {
async fn test_flush_engine() {
let mut env = TestEnv::new();
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushListener::new());
let listener = Arc::new(FlushListener::default());
let engine = env
.create_engine_with(
MitoConfig::default(),
@@ -131,7 +131,7 @@ async fn test_flush_engine() {
async fn test_write_stall() {
let mut env = TestEnv::new();
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(StallListener::new());
let listener = Arc::new(StallListener::default());
let engine = env
.create_engine_with(
MitoConfig::default(),

View File

@@ -51,18 +51,12 @@ pub trait EventListener: Send + Sync {
pub type EventListenerRef = Arc<dyn EventListener>;
/// Listener to watch flush events.
#[derive(Default)]
pub struct FlushListener {
notify: Notify,
}
impl FlushListener {
/// Creates a new listener.
pub fn new() -> FlushListener {
FlushListener {
notify: Notify::new(),
}
}
/// Wait until one flush job is done.
pub async fn wait(&self) {
self.notify.notified().await;
@@ -83,18 +77,12 @@ impl EventListener for FlushListener {
}
/// Listener to watch stall events.
#[derive(Default)]
pub struct StallListener {
notify: Notify,
}
impl StallListener {
/// Creates a new listener.
pub fn new() -> StallListener {
StallListener {
notify: Notify::new(),
}
}
/// Wait for a stall event.
pub async fn wait(&self) {
self.notify.notified().await;
@@ -120,6 +108,7 @@ impl EventListener for StallListener {
/// to block and wait for `on_flush_region()`.
/// When the background thread calls `on_flush_begin()`, the main thread is notified to truncate
/// region, and background thread thread blocks and waits for `notify_flush()` to continue flushing.
#[derive(Default)]
pub struct FlushTruncateListener {
/// Notify flush operation.
notify_flush: Notify,
@@ -128,14 +117,6 @@ pub struct FlushTruncateListener {
}
impl FlushTruncateListener {
/// Creates a new listener.
pub fn new() -> FlushTruncateListener {
FlushTruncateListener {
notify_flush: Notify::new(),
notify_truncate: Notify::new(),
}
}
/// Notify flush region to proceed.
pub fn notify_flush(&self) {
self.notify_flush.notify_one();

View File

@@ -271,7 +271,7 @@ async fn test_engine_truncate_during_flush() {
init_default_ut_logging();
let mut env = TestEnv::with_prefix("truncate-during-flush");
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
let listener = Arc::new(FlushTruncateListener::new());
let listener = Arc::new(FlushTruncateListener::default());
let engine = env
.create_engine_with(
MitoConfig::default(),

View File

@@ -19,6 +19,7 @@
#![feature(let_chains)]
#[cfg(any(test, feature = "test"))]
#[cfg_attr(feature = "test", allow(unused))]
pub mod test_util;
mod access_layer;

View File

@@ -20,6 +20,7 @@ pub mod scheduler_util;
pub mod version_util;
use std::collections::HashMap;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
@@ -119,6 +120,10 @@ impl TestEnv {
.map(|manager| manager.default_object_store().clone())
}
pub fn data_home(&self) -> &Path {
self.data_home.path()
}
/// Creates a new engine with specific config under this env.
pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine {
let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;

View File

@@ -198,7 +198,7 @@ impl WorkerGroup {
}
// Tests methods.
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
impl WorkerGroup {
/// Starts a worker group with `write_buffer_manager` and `listener` for tests.
///
@@ -583,12 +583,12 @@ impl<S> RegionWorkerLoop<S> {
/// Wrapper that only calls event listener in tests.
#[derive(Default, Clone)]
pub(crate) struct WorkerListener {
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
listener: Option<crate::engine::listener::EventListenerRef>,
}
impl WorkerListener {
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
pub(crate) fn new(
listener: Option<crate::engine::listener::EventListenerRef>,
) -> WorkerListener {
@@ -597,7 +597,7 @@ impl WorkerListener {
/// Flush is finished successfully.
pub(crate) fn on_flush_success(&self, region_id: RegionId) {
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_flush_success(region_id);
}
@@ -607,14 +607,14 @@ impl WorkerListener {
/// Engine is stalled.
pub(crate) fn on_write_stall(&self) {
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_write_stall();
}
}
pub(crate) async fn on_flush_begin(&self, region_id: RegionId) {
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_flush_begin(region_id).await;
}
@@ -623,7 +623,7 @@ impl WorkerListener {
}
pub(crate) fn on_later_drop_begin(&self, region_id: RegionId) -> Option<Duration> {
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
return listener.on_later_drop_begin(region_id);
}
@@ -634,7 +634,7 @@ impl WorkerListener {
/// On later drop task is finished.
pub(crate) fn on_later_drop_end(&self, region_id: RegionId, removed: bool) {
#[cfg(test)]
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_later_drop_end(region_id, removed);
}

View File

@@ -88,7 +88,7 @@ impl RegionId {
RegionId(id)
}
#[cfg(test)]
/// Construct a new [RegionId] from table id, region group and region sequence.
pub const fn with_group_and_seq(
table_id: TableId,
group: RegionGroup,