diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index baba14622b..075feefab4 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -75,6 +75,9 @@ pub enum Error { location: Location, source: std::str::Utf8Error, }, + + #[snafu(display("Cannot find RegionMetadata. Location: {}", location))] + RegionMetadataNotFound { location: Location }, } pub type Result = std::result::Result; diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 812804d4dc..9c063f80c4 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -15,38 +15,16 @@ use std::collections::HashMap; use serde::{Deserialize, Serialize}; -use storage::metadata::{ColumnFamilyMetadata, ColumnMetadata, VersionNumber}; +use snafu::OptionExt; +use storage::metadata::VersionNumber; use storage::sst::{FileId, FileMeta}; use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; use store_api::manifest::ManifestVersion; use store_api::storage::{RegionId, SequenceNumber}; +use crate::error::{RegionMetadataNotFoundSnafu, Result}; use crate::manifest::helper; - -/// Minimal data that could be used to persist and recover [RegionMetadata](crate::metadata::RegionMetadata). -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] -pub struct RawRegionMetadata { - pub id: RegionId, - pub name: String, - pub columns: RawColumnsMetadata, - pub column_families: RawColumnFamiliesMetadata, - pub version: VersionNumber, -} - -/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata). -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] -pub struct RawColumnsMetadata { - pub columns: Vec, - pub row_key_end: usize, - pub timestamp_key_index: usize, - pub user_column_end: usize, -} - -/// Minimal data that could be used to persist and recover [ColumnFamiliesMetadata](crate::metadata::ColumnFamiliesMetadata). -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] -pub struct RawColumnFamiliesMetadata { - pub column_families: Vec, -} +use crate::metadata::RegionMetadata; #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionChange { @@ -55,7 +33,7 @@ pub struct RegionChange { /// metadata. pub committed_sequence: SequenceNumber, /// The metadata after changed. - pub metadata: RawRegionMetadata, + pub metadata: RegionMetadata, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -81,17 +59,17 @@ pub struct RegionVersion { } /// The region manifest data checkpoint -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)] +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionManifestData { pub committed_sequence: SequenceNumber, - pub metadata: RawRegionMetadata, + pub metadata: RegionMetadata, pub version: Option, } #[derive(Debug, Default)] pub struct RegionManifestDataBuilder { committed_sequence: SequenceNumber, - metadata: RawRegionMetadata, + metadata: Option, version: Option, } @@ -99,7 +77,7 @@ impl RegionManifestDataBuilder { pub fn with_checkpoint(checkpoint: Option) -> Self { if let Some(s) = checkpoint { Self { - metadata: s.metadata, + metadata: Some(s.metadata), version: s.version, committed_sequence: s.committed_sequence, } @@ -109,7 +87,7 @@ impl RegionManifestDataBuilder { } pub fn apply_change(&mut self, change: RegionChange) { - self.metadata = change.metadata; + self.metadata = Some(change.metadata); self.committed_sequence = change.committed_sequence; } @@ -135,12 +113,13 @@ impl RegionManifestDataBuilder { }); } } - pub fn build(self) -> RegionManifestData { - RegionManifestData { - metadata: self.metadata, + + pub fn try_build(self) -> Result { + Ok(RegionManifestData { + metadata: self.metadata.context(RegionMetadataNotFoundSnafu)?, version: self.version, committed_sequence: self.committed_sequence, - } + }) } } @@ -166,11 +145,11 @@ impl RegionCheckpoint { self.last_version } - fn encode(&self) -> Result, ()> { + fn encode(&self) -> Result> { todo!() } - fn decode(bs: &[u8], reader_version: ProtocolVersion) -> Result { + fn decode(bs: &[u8], reader_version: ProtocolVersion) -> Result { helper::decode_checkpoint(bs, reader_version) } } @@ -216,14 +195,14 @@ impl RegionMetaActionList { } /// Encode self into json in the form of string lines, starts with prev_version and then action json list. - fn encode(&self) -> Result, ()> { + fn encode(&self) -> Result> { helper::encode_actions(self.prev_version, &self.actions) } fn decode( _bs: &[u8], _reader_version: ProtocolVersion, - ) -> Result<(Self, Option), ()> { + ) -> Result<(Self, Option)> { todo!() } } @@ -239,7 +218,7 @@ impl MetaActionIteratorImpl { self.last_protocol.clone() } - async fn next_action(&mut self) -> Result, ()> { + async fn next_action(&mut self) -> Result> { todo!() } } @@ -263,7 +242,7 @@ mod tests { let region_edit = r#"{"region_version":0,"flushed_sequence":null,"files_to_add":[{"region_id":4402341478400,"file_name":"4b220a70-2b03-4641-9687-b65d94641208.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1}],"files_to_remove":[{"region_id":4402341478400,"file_name":"34b6ebb9-b8a5-4a4b-b744-56f67defad02.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0}]}"#; let _ = serde_json::from_str::(region_edit).unwrap(); - let region_change = r#" {"committed_sequence":42,"metadata":{"id":0,"name":"region-0","columns":{"columns":[{"cf_id":0,"desc":{"id":2,"name":"k1","data_type":{"Int32":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"comment":""}},{"cf_id":0,"desc":{"id":1,"name":"timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"comment":""}},{"cf_id":1,"desc":{"id":3,"name":"v1","data_type":{"Float32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"comment":""}},{"cf_id":1,"desc":{"id":2147483649,"name":"__sequence","data_type":{"UInt64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"comment":""}},{"cf_id":1,"desc":{"id":2147483650,"name":"__op_type","data_type":{"UInt8":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"comment":""}}],"row_key_end":2,"timestamp_key_index":1,"enable_version_column":false,"user_column_end":3},"column_families":{"column_families":[{"name":"default","cf_id":1,"column_index_start":2,"column_index_end":3}]},"version":0}}"#; + let region_change = r#" {"committed_sequence":42,"metadata":{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"version":9,"primary_key":[1],"region_id":5299989648942}}"#; let _ = serde_json::from_str::(region_change).unwrap(); let region_remove = r#"{"region_id":42}"#; diff --git a/src/mito2/src/manifest/helper.rs b/src/mito2/src/manifest/helper.rs index d24643b444..398eb2bb2f 100644 --- a/src/mito2/src/manifest/helper.rs +++ b/src/mito2/src/manifest/helper.rs @@ -16,24 +16,21 @@ use serde::Serialize; use store_api::manifest::action::ProtocolVersion; use store_api::manifest::ManifestVersion; +use crate::error::Result; use crate::manifest::action::RegionCheckpoint; - pub const NEWLINE: &[u8] = b"\n"; pub fn encode_actions( prev_version: ManifestVersion, actions: &[T], -) -> Result, ()> { +) -> Result> { todo!() } -pub fn encode_checkpoint(snasphot: &RegionCheckpoint) -> Result, ()> { +pub fn encode_checkpoint(snasphot: &RegionCheckpoint) -> Result> { todo!() } -pub fn decode_checkpoint( - bs: &[u8], - reader_version: ProtocolVersion, -) -> Result { +pub fn decode_checkpoint(bs: &[u8], reader_version: ProtocolVersion) -> Result { todo!() } diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index 5905f2a00b..04533b8525 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -16,20 +16,24 @@ use std::sync::Arc; -use datatypes::schema::{ColumnSchema, SchemaRef}; +use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; +use serde::{Deserialize, Deserializer, Serialize}; use store_api::storage::{ColumnId, RegionId}; use crate::region::VersionNumber; +#[cfg_attr(doc, aquamarine::aquamarine)] /// Static metadata of a region. /// +/// This struct implements [Serialize] and [Deserialize] traits. +/// /// ```mermaid /// class RegionMetadata { /// +RegionId region_id /// +VersionNumber version /// +SchemaRef schema /// +Vec<ColumnMetadata> column_metadatas -/// +Vec<ColumnId> primary_keys +/// +Vec<ColumnId> primary_key /// } /// class Schema /// class ColumnMetadata { @@ -42,25 +46,106 @@ use crate::region::VersionNumber; /// RegionMetadata o-- ColumnMetadata /// ColumnMetadata o-- SemanticType /// ``` -#[derive(Debug)] -pub(crate) struct RegionMetadata { +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct RegionMetadata { /// Latest schema of this region + #[serde(skip)] schema: SchemaRef, column_metadatas: Vec, /// Version of metadata. version: VersionNumber, /// Maintains an ordered list of primary keys - primary_keys: Vec, + primary_key: Vec, /// Immutable and unique id - id: RegionId, + region_id: RegionId, } -pub(crate) type RegionMetadataRef = Arc; +pub type RegionMetadataRef = Arc; + +impl<'de> Deserialize<'de> for RegionMetadata { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + // helper internal struct for deserialization + #[derive(Deserialize)] + struct RegionMetadataWithoutSchema { + column_metadatas: Vec, + version: VersionNumber, + primary_key: Vec, + region_id: RegionId, + } + + let region_metadata_without_schema = + RegionMetadataWithoutSchema::deserialize(deserializer)?; + + let column_schemas = region_metadata_without_schema + .column_metadatas + .iter() + .map(|column_metadata| column_metadata.column_schema.clone()) + .collect(); + let schema = Arc::new(Schema::new(column_schemas)); + + Ok(Self { + schema, + column_metadatas: region_metadata_without_schema.column_metadatas, + version: region_metadata_without_schema.version, + primary_key: region_metadata_without_schema.primary_key, + region_id: region_metadata_without_schema.region_id, + }) + } +} + +pub struct RegionMetadataBuilder { + schema: SchemaRef, + column_metadatas: Vec, + version: VersionNumber, + primary_key: Vec, + region_id: RegionId, +} + +impl RegionMetadataBuilder { + pub fn new(id: RegionId, version: VersionNumber) -> Self { + Self { + schema: Arc::new(Schema::new(vec![])), + column_metadatas: vec![], + version, + primary_key: vec![], + region_id: id, + } + } + + /// Add a column metadata to this region metadata. + /// This method will check the semantic type and add it to primary keys automatically. + pub fn add_column_metadata(mut self, column_metadata: ColumnMetadata) -> Self { + if column_metadata.semantic_type == SemanticType::Tag { + self.primary_key.push(column_metadata.column_id); + } + self.column_metadatas.push(column_metadata); + self + } + + pub fn build(self) -> RegionMetadata { + let schema = Arc::new(Schema::new( + self.column_metadatas + .iter() + .map(|column_metadata| column_metadata.column_schema.clone()) + .collect(), + )); + RegionMetadata { + schema, + column_metadatas: self.column_metadatas, + version: self.version, + primary_key: self.primary_key, + region_id: self.region_id, + } + } +} /// Metadata of a column. -#[derive(Debug)] -pub(crate) struct ColumnMetadata { +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct ColumnMetadata { /// Schema of this column. Is the same as `column_schema` in [SchemaRef]. column_schema: ColumnSchema, semantic_type: SemanticType, @@ -68,9 +153,48 @@ pub(crate) struct ColumnMetadata { } /// The semantic type of one column -#[derive(Debug)] -pub(crate) enum SemanticType { +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum SemanticType { Tag, Field, Timestamp, } + +#[cfg(test)] +mod test { + use datatypes::prelude::ConcreteDataType; + + use super::*; + + fn build_test_region_metadata() -> RegionMetadata { + let builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678), 9); + let builder = builder.add_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 1, + }); + let builder = builder.add_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 2, + }); + let builder = builder.add_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "c", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 3, + }); + builder.build() + } + + #[test] + fn test_region_metadata_serde() { + let region_metadata = build_test_region_metadata(); + let serialized = serde_json::to_string(®ion_metadata).unwrap(); + let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap(); + assert_eq!(region_metadata, deserialized); + } +}