feat(mito): Define Version and metadata builders for mito2 (#1989)

* feat: define structs for version

* feat: Build region from metadata and memtable builder

* feat: impl validate for metadata

* feat: add more fields to RegionMetadata

* test: more tests

* test: more check and test

* feat: allow overwriting version

* style: fix clippy
This commit is contained in:
Yingwen
2023-07-19 16:50:20 +09:00
committed by GitHub
parent 3241de0b85
commit bb8468437e
16 changed files with 962 additions and 98 deletions

1
Cargo.lock generated
View File

@@ -5541,6 +5541,7 @@ dependencies = [
"store-api",
"table",
"tokio",
"uuid",
]
[[package]]

View File

@@ -112,6 +112,9 @@ pub enum Error {
#[snafu(display("Invalid timestamp precision: {}", precision))]
InvalidTimestampPrecision { precision: u64, location: Location },
#[snafu(display("Column {} already exists", column))]
DuplicateColumn { column: String, location: Location },
}
impl ErrorExt for Error {

View File

@@ -24,7 +24,7 @@ use datafusion_common::DFSchemaRef;
use snafu::{ensure, ResultExt};
use crate::data_type::DataType;
use crate::error::{self, Error, ProjectArrowSchemaSnafu, Result};
use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
pub use crate::schema::column_schema::{ColumnSchema, Metadata, COMMENT_KEY, TIME_INDEX_KEY};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;
@@ -211,8 +211,7 @@ impl SchemaBuilder {
validate_timestamp_index(&self.column_schemas, timestamp_index)?;
}
let _ = self
.metadata
self.metadata
.insert(VERSION_KEY.to_string(), self.version.to_string());
let arrow_schema = ArrowSchema::new(self.fields).with_metadata(self.metadata);
@@ -243,7 +242,14 @@ fn collect_fields(column_schemas: &[ColumnSchema]) -> Result<FieldsAndIndices> {
}
let field = Field::try_from(column_schema)?;
fields.push(field);
let _ = name_to_index.insert(column_schema.name.clone(), index);
ensure!(
name_to_index
.insert(column_schema.name.clone(), index)
.is_none(),
DuplicateColumnSnafu {
column: &column_schema.name,
}
);
}
Ok(FieldsAndIndices {
@@ -382,6 +388,21 @@ mod tests {
assert_eq!(column_schemas, schema.column_schemas());
}
#[test]
fn test_schema_duplicate_column() {
let column_schemas = vec![
ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), false),
ColumnSchema::new("col1", ConcreteDataType::float64_datatype(), true),
];
let err = Schema::try_new(column_schemas).unwrap_err();
assert!(
matches!(err, Error::DuplicateColumn { .. }),
"expect DuplicateColumn, found {}",
err
);
}
#[test]
fn test_metadata() {
let column_schemas = vec![ColumnSchema::new(

View File

@@ -44,6 +44,7 @@ storage = { path = "../storage" }
store-api = { path = "../store-api" }
table = { path = "../table" }
tokio.workspace = true
uuid.workspace = true
[dev-dependencies]
common-test-util = { path = "../common/test-util" }

View File

@@ -101,6 +101,15 @@ pub enum Error {
location
))]
InitialMetadata { location: Location },
#[snafu(display("Invalid metadata, {}, location: {}", reason, location))]
InvalidMeta { reason: String, location: Location },
#[snafu(display("Invalid schema, source: {}, location: {}", source, location))]
InvalidSchema {
source: datatypes::error::Error,
location: Location,
},
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -114,7 +123,10 @@ impl ErrorExt for Error {
CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } => {
StatusCode::Unexpected
}
InvalidScanIndex { .. } | InitialMetadata { .. } => StatusCode::InvalidArguments,
InvalidScanIndex { .. }
| InitialMetadata { .. }
| InvalidMeta { .. }
| InvalidSchema { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}

View File

@@ -28,10 +28,14 @@ pub mod error;
#[allow(unused_variables)]
pub mod manifest;
#[allow(dead_code)]
pub mod memtable;
#[allow(dead_code)]
pub mod metadata;
#[allow(dead_code)]
mod region;
#[allow(dead_code)]
pub(crate) mod sst;
#[allow(dead_code)]
mod worker;
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -108,7 +112,7 @@ mod worker;
/// class Version {
/// -RegionMetadataRef metadata
/// -MemtableVersionRef memtables
/// -LevelMetasRef ssts
/// -SstVersionRef ssts
/// -SequenceNumber flushed_sequence
/// -ManifestVersion manifest_version
/// }
@@ -119,7 +123,7 @@ mod worker;
/// +immutable_memtables() &[MemtableRef]
/// +freeze_mutable(MemtableRef new_mutable) MemtableVersion
/// }
/// class LevelMetas {
/// class SstVersion {
/// -LevelMetaVec levels
/// -AccessLayerRef sst_layer
/// -FilePurgerRef file_purger
@@ -146,8 +150,8 @@ mod worker;
/// VersionControl o-- Version
/// Version o-- RegionMetadata
/// Version o-- MemtableVersion
/// Version o-- LevelMetas
/// LevelMetas o-- LevelMeta
/// Version o-- SstVersion
/// SstVersion o-- LevelMeta
/// LevelMeta o-- FileHandle
/// FileHandle o-- FileMeta
/// class RegionMetadata

View File

@@ -260,27 +260,32 @@ mod test {
use crate::test_util::TestEnv;
fn basic_region_metadata() -> RegionMetadata {
let builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0);
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 45,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 36,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 251,
});
builder.build()
let mut builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0);
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 45,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 36,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"val",
ConcreteDataType::float64_datatype(),
false,
),
semantic_type: SemanticType::Field,
column_id: 251,
});
builder.build().unwrap()
}
#[tokio::test]
@@ -317,13 +322,13 @@ mod test {
.await
.unwrap();
let new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1);
let new_metadata_builder = new_metadata_builder.add_column_metadata(ColumnMetadata {
let mut new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1);
new_metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 252,
});
let new_metadata = new_metadata_builder.build();
let new_metadata = new_metadata_builder.build().unwrap();
let mut action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {

43
src/mito2/src/memtable.rs Normal file
View File

@@ -0,0 +1,43 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Memtables are write buffers for regions.
pub(crate) mod version;
use std::fmt;
use std::sync::Arc;
use crate::metadata::RegionMetadataRef;
/// Id for memtables.
///
/// Should be unique under the same region.
pub type MemtableId = u32;
/// In memory write buffer.
pub trait Memtable: Send + Sync + fmt::Debug {
/// Returns the id of this memtable.
fn id(&self) -> MemtableId;
}
pub type MemtableRef = Arc<dyn Memtable>;
/// Builder to build a new [Memtable].
pub trait MemtableBuilder: Send + Sync + fmt::Debug {
/// Builds a new memtable instance.
fn build(&self, metadata: &RegionMetadataRef) -> MemtableRef;
}
pub type MemtableBuilderRef = Arc<dyn MemtableBuilder>;

View File

@@ -0,0 +1,40 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Memtable version.
use std::sync::Arc;
use crate::memtable::MemtableRef;
/// A version of current memtables in a region.
#[derive(Debug)]
pub(crate) struct MemtableVersion {
/// Mutable memtable.
mutable: MemtableRef,
/// Immutable memtables.
immutables: Vec<MemtableRef>,
}
pub(crate) type MemtableVersionRef = Arc<MemtableVersion>;
impl MemtableVersion {
/// Returns a new [MemtableVersion] with specific mutable memtable.
pub(crate) fn new(mutable: MemtableRef) -> MemtableVersion {
MemtableVersion {
mutable,
immutables: vec![],
}
}
}

View File

@@ -14,12 +14,17 @@
//! Metadata of mito regions.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use datatypes::prelude::DataType;
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use serde::de::Error;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::storage::{ColumnId, RegionId};
use crate::error::{InvalidMetaSnafu, InvalidSchemaSnafu, Result};
use crate::region::VersionNumber;
#[cfg_attr(doc, aquamarine::aquamarine)]
@@ -49,9 +54,16 @@ use crate::region::VersionNumber;
/// ```
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct RegionMetadata {
/// Latest schema of this region
/// Latest schema constructed from [column_metadatas](RegionMetadata::column_metadatas).
#[serde(skip)]
pub schema: SchemaRef,
/// Id of the time index column.
#[serde(skip)]
time_index: ColumnId,
/// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
#[serde(skip)]
id_to_index: HashMap<ColumnId, usize>,
/// Columns in the region. Has the same order as columns
/// in [schema](RegionMetadata::schema).
pub column_metadatas: Vec<ColumnMetadata>,
@@ -67,7 +79,7 @@ pub struct RegionMetadata {
pub type RegionMetadataRef = Arc<RegionMetadata>;
impl<'de> Deserialize<'de> for RegionMetadata {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
@@ -80,49 +92,163 @@ impl<'de> Deserialize<'de> for RegionMetadata {
region_id: RegionId,
}
let region_metadata_without_schema =
RegionMetadataWithoutSchema::deserialize(deserializer)?;
let column_schemas = region_metadata_without_schema
.column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect();
let schema = Arc::new(Schema::new(column_schemas));
let without_schema = RegionMetadataWithoutSchema::deserialize(deserializer)?;
let skipped =
SkippedFields::new(&without_schema.column_metadatas).map_err(D::Error::custom)?;
Ok(Self {
schema,
column_metadatas: region_metadata_without_schema.column_metadatas,
version: region_metadata_without_schema.version,
primary_key: region_metadata_without_schema.primary_key,
region_id: region_metadata_without_schema.region_id,
schema: skipped.schema,
time_index: skipped.time_index,
id_to_index: skipped.id_to_index,
column_metadatas: without_schema.column_metadatas,
version: without_schema.version,
primary_key: without_schema.primary_key,
region_id: without_schema.region_id,
})
}
}
pub struct RegionMetadataBuilder {
/// Fields skipped in serialization.
struct SkippedFields {
/// Last schema.
schema: SchemaRef,
column_metadatas: Vec<ColumnMetadata>,
version: VersionNumber,
primary_key: Vec<ColumnId>,
/// Id of the time index column.
time_index: ColumnId,
/// Map column id to column's index in [column_metadatas](RegionMetadata::column_metadatas).
id_to_index: HashMap<ColumnId, usize>,
}
impl SkippedFields {
/// Constructs skipped fields from `column_metadatas`.
fn new(column_metadatas: &[ColumnMetadata]) -> Result<SkippedFields> {
let column_schemas = column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect();
let schema = Arc::new(Schema::try_new(column_schemas).context(InvalidSchemaSnafu)?);
let time_index = column_metadatas
.iter()
.find_map(|col| {
if col.semantic_type == SemanticType::Timestamp {
Some(col.column_id)
} else {
None
}
})
.context(InvalidMetaSnafu {
reason: "time index not found",
})?;
let id_to_index = column_metadatas
.iter()
.enumerate()
.map(|(idx, col)| (col.column_id, idx))
.collect();
Ok(SkippedFields {
schema,
time_index,
id_to_index,
})
}
}
impl RegionMetadata {
/// Checks whether the metadata is valid.
fn validate(&self) -> Result<()> {
// Id to name.
let mut id_names = HashMap::with_capacity(self.column_metadatas.len());
for col in &self.column_metadatas {
// Validate each column.
col.validate()?;
// Check whether column id is duplicated. We already check column name
// is unique in `Schema` so we only check column id here.
ensure!(
!id_names.contains_key(&col.column_id),
InvalidMetaSnafu {
reason: format!(
"column {} and {} have the same column id",
id_names[&col.column_id], col.column_schema.name
),
}
);
id_names.insert(col.column_id, &col.column_schema.name);
}
// Checks there is only one time index.
let num_time_index = self
.column_metadatas
.iter()
.filter(|col| col.semantic_type == SemanticType::Timestamp)
.count();
ensure!(
num_time_index == 1,
InvalidMetaSnafu {
reason: format!("expect only one time index, found {}", num_time_index),
}
);
if !self.primary_key.is_empty() {
let mut pk_ids = HashSet::with_capacity(self.primary_key.len());
// Checks column ids in the primary key is valid.
for column_id in &self.primary_key {
// Checks whether the column id exists.
ensure!(
id_names.contains_key(column_id),
InvalidMetaSnafu {
reason: format!("unknown column id {}", column_id),
}
);
// Checks duplicate.
ensure!(
!pk_ids.contains(&column_id),
InvalidMetaSnafu {
reason: format!("duplicate column {} in primary key", id_names[column_id]),
}
);
// Checks this is not a time index column.
ensure!(
*column_id != self.time_index,
InvalidMetaSnafu {
reason: format!(
"column {} is already a time index column",
id_names[column_id]
),
}
);
pk_ids.insert(column_id);
}
}
Ok(())
}
}
/// Builder to build [RegionMetadata].
pub struct RegionMetadataBuilder {
region_id: RegionId,
version: VersionNumber,
column_metadatas: Vec<ColumnMetadata>,
primary_key: Vec<ColumnId>,
}
impl RegionMetadataBuilder {
/// Returns a new builder.
pub fn new(id: RegionId, version: VersionNumber) -> Self {
Self {
schema: Arc::new(Schema::new(vec![])),
column_metadatas: vec![],
version,
primary_key: vec![],
region_id: id,
version,
column_metadatas: vec![],
primary_key: vec![],
}
}
/// Create a builder from existing [RegionMetadata].
pub fn from_existing(existing: RegionMetadata, new_version: VersionNumber) -> Self {
Self {
schema: existing.schema,
column_metadatas: existing.column_metadatas,
version: new_version,
primary_key: existing.primary_key,
@@ -130,30 +256,35 @@ impl RegionMetadataBuilder {
}
}
/// Add a column metadata to this region metadata.
/// This method will check the semantic type and add it to primary keys automatically.
pub fn add_column_metadata(mut self, column_metadata: ColumnMetadata) -> Self {
if column_metadata.semantic_type == SemanticType::Tag {
self.primary_key.push(column_metadata.column_id);
}
/// Push a new column metadata to this region's metadata.
pub fn push_column_metadata(&mut self, column_metadata: ColumnMetadata) -> &mut Self {
self.column_metadatas.push(column_metadata);
self
}
pub fn build(self) -> RegionMetadata {
let schema = Arc::new(Schema::new(
self.column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect(),
));
RegionMetadata {
schema,
/// Set the primary key of the region.
pub fn primary_key(&mut self, key: Vec<ColumnId>) -> &mut Self {
self.primary_key = key;
self
}
/// Consume the builder and build a [RegionMetadata].
pub fn build(self) -> Result<RegionMetadata> {
let skipped = SkippedFields::new(&self.column_metadatas)?;
let meta = RegionMetadata {
schema: skipped.schema,
time_index: skipped.time_index,
id_to_index: skipped.id_to_index,
column_metadatas: self.column_metadatas,
version: self.version,
primary_key: self.primary_key,
region_id: self.region_id,
}
};
meta.validate()?;
Ok(meta)
}
}
@@ -168,6 +299,22 @@ pub struct ColumnMetadata {
pub column_id: ColumnId,
}
impl ColumnMetadata {
/// Checks whether it is a valid column.
pub fn validate(&self) -> Result<()> {
if self.semantic_type == SemanticType::Timestamp {
ensure!(
self.column_schema.data_type.is_timestamp_compatible(),
InvalidMetaSnafu {
reason: format!("{} is not timestamp compatible", self.column_schema.name),
}
);
}
Ok(())
}
}
/// The semantic type of one column
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum SemanticType {
@@ -185,28 +332,34 @@ mod test {
use super::*;
fn create_builder() -> RegionMetadataBuilder {
RegionMetadataBuilder::new(RegionId::new(1234, 5678), 9)
}
fn build_test_region_metadata() -> RegionMetadata {
let builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678), 9);
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 2,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"c",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
});
builder.build()
let mut builder = create_builder();
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"c",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
})
.primary_key(vec![1]);
builder.build().unwrap()
}
#[test]
@@ -216,4 +369,170 @@ mod test {
let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
assert_eq!(region_metadata, deserialized);
}
#[test]
fn test_column_metadata_validate() {
let mut builder = create_builder();
let col = ColumnMetadata {
column_schema: ColumnSchema::new("ts", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Timestamp,
column_id: 1,
};
col.validate().unwrap_err();
builder.push_column_metadata(col);
let err = builder.build().unwrap_err();
assert!(
err.to_string().contains("ts is not timestamp compatible"),
"unexpected err: {}",
err
);
}
#[test]
fn test_empty_region_metadata() {
let builder = create_builder();
let err = builder.build().unwrap_err();
// A region must have a time index.
assert!(
err.to_string().contains("time index not found"),
"unexpected err: {}",
err
);
}
#[test]
fn test_same_column_id() {
let mut builder = create_builder();
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
});
let err = builder.build().unwrap_err();
assert!(
err.to_string()
.contains("column a and b have the same column id"),
"unexpected err: {}",
err
);
}
#[test]
fn test_duplicate_time_index() {
let mut builder = create_builder();
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"a",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
});
let err = builder.build().unwrap_err();
assert!(
err.to_string().contains("expect only one time index"),
"unexpected err: {}",
err
);
}
#[test]
fn test_unknown_primary_key() {
let mut builder = create_builder();
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![3]);
let err = builder.build().unwrap_err();
assert!(
err.to_string().contains("unknown column id 3"),
"unexpected err: {}",
err
);
}
#[test]
fn test_same_primary_key() {
let mut builder = create_builder();
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"b",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 2,
})
.primary_key(vec![1, 1]);
let err = builder.build().unwrap_err();
assert!(
err.to_string()
.contains("duplicate column a in primary key"),
"unexpected err: {}",
err
);
}
#[test]
fn test_in_time_index() {
let mut builder = create_builder();
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 1,
})
.primary_key(vec![1]);
let err = builder.build().unwrap_err();
assert!(
err.to_string()
.contains("column ts is already a time index column"),
"unexpected err: {}",
err
);
}
}

View File

@@ -21,7 +21,11 @@ use std::sync::{Arc, RwLock};
use store_api::storage::RegionId;
use crate::region::version::VersionControlRef;
use crate::memtable::MemtableBuilderRef;
use crate::metadata::RegionMetadataRef;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
/// Type to store region version.
pub type VersionNumber = u32;
/// Metadata and runtime status of a region.
@@ -39,3 +43,32 @@ pub(crate) struct RegionMap {
}
pub(crate) type RegionMapRef = Arc<RegionMap>;
/// [MitoRegion] builder.
pub(crate) struct RegionBuilder {
metadata: RegionMetadataRef,
memtable_builder: MemtableBuilderRef,
}
impl RegionBuilder {
/// Returns a new builder.
pub(crate) fn new(
metadata: RegionMetadataRef,
memtable_builder: MemtableBuilderRef,
) -> RegionBuilder {
RegionBuilder {
metadata,
memtable_builder,
}
}
/// Builds a new region.
pub(crate) fn build(self) -> MitoRegion {
let mutable = self.memtable_builder.build(&self.metadata);
let version = VersionBuilder::new(self.metadata, mutable).build();
let version_control = Arc::new(VersionControl::new(version));
MitoRegion { version_control }
}
}

View File

@@ -25,8 +25,74 @@
use std::sync::Arc;
use arc_swap::ArcSwap;
use store_api::manifest::ManifestVersion;
use store_api::storage::SequenceNumber;
use crate::memtable::version::{MemtableVersion, MemtableVersionRef};
use crate::memtable::MemtableRef;
use crate::metadata::RegionMetadataRef;
use crate::sst::version::{SstVersion, SstVersionRef};
/// Controls version of in memory metadata for a region.
#[derive(Debug)]
pub(crate) struct VersionControl {}
pub(crate) struct VersionControl {
/// Latest version.
version: ArcSwap<Version>,
}
impl VersionControl {
/// Returns a new [VersionControl] with specific `version`.
pub(crate) fn new(version: Version) -> VersionControl {
VersionControl {
version: ArcSwap::new(Arc::new(version)),
}
}
}
pub(crate) type VersionControlRef = Arc<VersionControl>;
/// Static metadata of a region.
#[derive(Clone, Debug)]
pub(crate) struct Version {
/// Metadata of the region.
///
/// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
/// metadata and reuse metadata when creating a new `Version`.
metadata: RegionMetadataRef,
/// Mutable and immutable memtables.
///
/// Wrapped in Arc to make clone of `Version` much cheaper.
memtables: MemtableVersionRef,
/// SSTs of the region.
ssts: SstVersionRef,
/// Inclusive max sequence of flushed data.
flushed_sequence: SequenceNumber,
/// Current version of region manifest.
manifest_version: ManifestVersion,
}
/// Version builder.
pub(crate) struct VersionBuilder {
metadata: RegionMetadataRef,
/// Mutable memtable.
mutable: MemtableRef,
}
impl VersionBuilder {
/// Returns a new builder.
pub(crate) fn new(metadata: RegionMetadataRef, mutable: MemtableRef) -> VersionBuilder {
VersionBuilder { metadata, mutable }
}
/// Builds a new [Version] from the builder.
pub(crate) fn build(self) -> Version {
Version {
metadata: self.metadata,
memtables: Arc::new(MemtableVersion::new(self.mutable)),
ssts: Arc::new(SstVersion::new()),
flushed_sequence: 0,
manifest_version: 0,
}
}
}

18
src/mito2/src/sst.rs Normal file
View File

@@ -0,0 +1,18 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Sorted strings tables.
pub mod file;
pub(crate) mod version;

185
src/mito2/src/sst/file.rs Normal file
View File

@@ -0,0 +1,185 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Structures to describe metadata of files.
use std::fmt;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use store_api::storage::RegionId;
use uuid::Uuid;
/// Type to store SST level.
pub type Level = u8;
/// Maximum level of SSTs.
pub const MAX_LEVEL: Level = 2;
#[derive(Debug, Snafu, PartialEq)]
pub struct ParseIdError {
source: uuid::Error,
}
/// Unique id for [SST File].
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub struct FileId(Uuid);
impl FileId {
/// Returns a new unique [FileId] randomly.
pub fn random() -> FileId {
FileId(Uuid::new_v4())
}
/// Parses id from string.
pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
}
/// Append `.parquet` to file id to make a complete file name
pub fn as_parquet(&self) -> String {
format!("{}{}", self.0.hyphenated(), ".parquet")
}
}
impl fmt::Display for FileId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl FromStr for FileId {
type Err = ParseIdError;
fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
FileId::parse_str(s)
}
}
/// Time range of a SST file.
pub type FileTimeRange = (Timestamp, Timestamp);
/// Metadata of a SST file.
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct FileMeta {
/// Region of file.
pub region_id: RegionId,
/// Compared to normal file names, FileId ignore the extension
pub file_id: FileId,
/// Timestamp range of file.
pub time_range: FileTimeRange,
/// SST level of the file.
pub level: Level,
/// Size of the file.
pub file_size: u64,
}
/// Handle to a SST file.
#[derive(Clone)]
pub struct FileHandle {
inner: Arc<FileHandleInner>,
}
impl fmt::Debug for FileHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileHandle")
.field("file_id", &self.inner.meta.file_id)
.field("region_id", &self.inner.meta.region_id)
.field("time_range", &self.inner.meta.time_range)
.field("size", &self.inner.meta.file_size)
.field("level", &self.inner.meta.level)
.field("compacting", &self.inner.compacting)
.field("deleted", &self.inner.deleted)
.finish()
}
}
/// Inner data of [FileHandle].
///
/// Contains meta of the file, and other mutable info like whether the file is compacting.
struct FileHandleInner {
meta: FileMeta,
compacting: AtomicBool,
deleted: AtomicBool,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_file_id() {
let id = FileId::random();
let uuid_str = id.to_string();
assert_eq!(id.0.to_string(), uuid_str);
let parsed = FileId::parse_str(&uuid_str).unwrap();
assert_eq!(id, parsed);
let parsed = uuid_str.parse().unwrap();
assert_eq!(id, parsed);
}
#[test]
fn test_file_id_serialization() {
let id = FileId::random();
let json = serde_json::to_string(&id).unwrap();
assert_eq!(format!("\"{id}\""), json);
let parsed = serde_json::from_str(&json).unwrap();
assert_eq!(id, parsed);
}
#[test]
fn test_file_id_as_parquet() {
let id = FileId::from_str("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
assert_eq!(
"67e55044-10b1-426f-9247-bb680e5fe0c8.parquet",
id.as_parquet()
);
}
fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
FileMeta {
region_id: 0.into(),
file_id,
time_range: FileTimeRange::default(),
level,
file_size: 0,
}
}
#[test]
fn test_deserialize_file_meta() {
let file_meta = create_file_meta(FileId::random(), 0);
let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
assert_eq!(file_meta, deserialized_file_meta.unwrap());
}
#[test]
fn test_deserialize_from_string() {
let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
\"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\"level\":0}";
let file_meta = create_file_meta(
FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
0,
);
let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
assert_eq!(file_meta, deserialized_file_meta);
}
}

View File

@@ -0,0 +1,77 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! SST version.
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use crate::sst::file::{FileHandle, FileId, Level, MAX_LEVEL};
/// A version of all SSTs in a region.
#[derive(Debug)]
pub(crate) struct SstVersion {
/// SST metadata organized by levels.
levels: LevelMetaVec,
}
pub(crate) type SstVersionRef = Arc<SstVersion>;
impl SstVersion {
/// Returns a new [SstVersion].
pub(crate) fn new() -> SstVersion {
SstVersion {
levels: new_level_meta_vec(),
}
}
}
// We only has fixed number of level, so we use array to hold elements. This implementation
// detail of LevelMetaVec should not be exposed to the user of [LevelMetas].
type LevelMetaVec = [LevelMeta; MAX_LEVEL as usize];
/// Metadata of files in the same SST level.
pub struct LevelMeta {
/// Level number.
level: Level,
/// Handles of SSTs in this level.
files: HashMap<FileId, FileHandle>,
}
impl LevelMeta {
/// Returns an empty meta of specific `level`.
pub(crate) fn new(level: Level) -> LevelMeta {
LevelMeta {
level,
files: HashMap::new(),
}
}
}
impl fmt::Debug for LevelMeta {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LevelMeta")
.field("level", &self.level)
.field("files", &self.files.keys())
.finish()
}
}
fn new_level_meta_vec() -> LevelMetaVec {
(0u8..MAX_LEVEL)
.map(LevelMeta::new)
.collect::<Vec<_>>()
.try_into()
.unwrap() // safety: LevelMetaVec is a fixed length array with length MAX_LEVEL
}

View File

@@ -14,6 +14,7 @@
//! Utilities for testing.
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use common_datasource::compression::CompressionType;
@@ -29,7 +30,8 @@ use crate::engine::MitoEngine;
use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::options::RegionManifestOptions;
use crate::metadata::RegionMetadata;
use crate::memtable::{Memtable, MemtableBuilder, MemtableId, MemtableRef};
use crate::metadata::{RegionMetadata, RegionMetadataRef};
use crate::worker::WorkerGroup;
/// Env to test mito engine.
@@ -97,3 +99,37 @@ impl TestEnv {
RegionManifestManager::new(manifest_opts).await
}
}
/// Memtable that only for testing metadata.
#[derive(Debug, Default)]
pub struct MetaOnlyMemtable {
/// Id of this memtable.
id: MemtableId,
}
impl MetaOnlyMemtable {
/// Returns a new memtable with specific `id`.
pub fn new(id: MemtableId) -> MetaOnlyMemtable {
MetaOnlyMemtable { id }
}
}
impl Memtable for MetaOnlyMemtable {
fn id(&self) -> MemtableId {
self.id
}
}
#[derive(Debug, Default)]
pub struct MetaOnlyBuilder {
/// Next memtable id.
next_id: AtomicU32,
}
impl MemtableBuilder for MetaOnlyBuilder {
fn build(&self, _metadata: &RegionMetadataRef) -> MemtableRef {
Arc::new(MetaOnlyMemtable::new(
self.next_id.fetch_add(1, Ordering::Relaxed),
))
}
}