refactor(mito2): implement serialize/deserialize for RegionMetadata (#1964)

* feat: implement serialize/deserialize for RegionMetadata

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove Raw*

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* render mermaid

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* derive Serialize

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* clean up

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* rename symbols

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-07-14 14:44:12 +08:00
committed by GitHub
parent c9cce0225d
commit ce43896a0b
4 changed files with 163 additions and 60 deletions

View File

@@ -75,6 +75,9 @@ pub enum Error {
location: Location,
source: std::str::Utf8Error,
},
#[snafu(display("Cannot find RegionMetadata. Location: {}", location))]
RegionMetadataNotFound { location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;

View File

@@ -15,38 +15,16 @@
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use storage::metadata::{ColumnFamilyMetadata, ColumnMetadata, VersionNumber};
use snafu::OptionExt;
use storage::metadata::VersionNumber;
use storage::sst::{FileId, FileMeta};
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use store_api::manifest::ManifestVersion;
use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{RegionMetadataNotFoundSnafu, Result};
use crate::manifest::helper;
/// Minimal data that could be used to persist and recover [RegionMetadata](crate::metadata::RegionMetadata).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct RawRegionMetadata {
pub id: RegionId,
pub name: String,
pub columns: RawColumnsMetadata,
pub column_families: RawColumnFamiliesMetadata,
pub version: VersionNumber,
}
/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct RawColumnsMetadata {
pub columns: Vec<ColumnMetadata>,
pub row_key_end: usize,
pub timestamp_key_index: usize,
pub user_column_end: usize,
}
/// Minimal data that could be used to persist and recover [ColumnFamiliesMetadata](crate::metadata::ColumnFamiliesMetadata).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct RawColumnFamiliesMetadata {
pub column_families: Vec<ColumnFamilyMetadata>,
}
use crate::metadata::RegionMetadata;
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionChange {
@@ -55,7 +33,7 @@ pub struct RegionChange {
/// metadata.
pub committed_sequence: SequenceNumber,
/// The metadata after changed.
pub metadata: RawRegionMetadata,
pub metadata: RegionMetadata,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -81,17 +59,17 @@ pub struct RegionVersion {
}
/// The region manifest data checkpoint
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionManifestData {
pub committed_sequence: SequenceNumber,
pub metadata: RawRegionMetadata,
pub metadata: RegionMetadata,
pub version: Option<RegionVersion>,
}
#[derive(Debug, Default)]
pub struct RegionManifestDataBuilder {
committed_sequence: SequenceNumber,
metadata: RawRegionMetadata,
metadata: Option<RegionMetadata>,
version: Option<RegionVersion>,
}
@@ -99,7 +77,7 @@ impl RegionManifestDataBuilder {
pub fn with_checkpoint(checkpoint: Option<RegionManifestData>) -> Self {
if let Some(s) = checkpoint {
Self {
metadata: s.metadata,
metadata: Some(s.metadata),
version: s.version,
committed_sequence: s.committed_sequence,
}
@@ -109,7 +87,7 @@ impl RegionManifestDataBuilder {
}
pub fn apply_change(&mut self, change: RegionChange) {
self.metadata = change.metadata;
self.metadata = Some(change.metadata);
self.committed_sequence = change.committed_sequence;
}
@@ -135,12 +113,13 @@ impl RegionManifestDataBuilder {
});
}
}
pub fn build(self) -> RegionManifestData {
RegionManifestData {
metadata: self.metadata,
pub fn try_build(self) -> Result<RegionManifestData> {
Ok(RegionManifestData {
metadata: self.metadata.context(RegionMetadataNotFoundSnafu)?,
version: self.version,
committed_sequence: self.committed_sequence,
}
})
}
}
@@ -166,11 +145,11 @@ impl RegionCheckpoint {
self.last_version
}
fn encode(&self) -> Result<Vec<u8>, ()> {
fn encode(&self) -> Result<Vec<u8>> {
todo!()
}
fn decode(bs: &[u8], reader_version: ProtocolVersion) -> Result<Self, ()> {
fn decode(bs: &[u8], reader_version: ProtocolVersion) -> Result<Self> {
helper::decode_checkpoint(bs, reader_version)
}
}
@@ -216,14 +195,14 @@ impl RegionMetaActionList {
}
/// Encode self into json in the form of string lines, starts with prev_version and then action json list.
fn encode(&self) -> Result<Vec<u8>, ()> {
fn encode(&self) -> Result<Vec<u8>> {
helper::encode_actions(self.prev_version, &self.actions)
}
fn decode(
_bs: &[u8],
_reader_version: ProtocolVersion,
) -> Result<(Self, Option<ProtocolAction>), ()> {
) -> Result<(Self, Option<ProtocolAction>)> {
todo!()
}
}
@@ -239,7 +218,7 @@ impl MetaActionIteratorImpl {
self.last_protocol.clone()
}
async fn next_action(&mut self) -> Result<Option<(ManifestVersion, RegionMetaActionList)>, ()> {
async fn next_action(&mut self) -> Result<Option<(ManifestVersion, RegionMetaActionList)>> {
todo!()
}
}
@@ -263,7 +242,7 @@ mod tests {
let region_edit = r#"{"region_version":0,"flushed_sequence":null,"files_to_add":[{"region_id":4402341478400,"file_name":"4b220a70-2b03-4641-9687-b65d94641208.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1}],"files_to_remove":[{"region_id":4402341478400,"file_name":"34b6ebb9-b8a5-4a4b-b744-56f67defad02.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0}]}"#;
let _ = serde_json::from_str::<RegionEdit>(region_edit).unwrap();
let region_change = r#" {"committed_sequence":42,"metadata":{"id":0,"name":"region-0","columns":{"columns":[{"cf_id":0,"desc":{"id":2,"name":"k1","data_type":{"Int32":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"comment":""}},{"cf_id":0,"desc":{"id":1,"name":"timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"comment":""}},{"cf_id":1,"desc":{"id":3,"name":"v1","data_type":{"Float32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"comment":""}},{"cf_id":1,"desc":{"id":2147483649,"name":"__sequence","data_type":{"UInt64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"comment":""}},{"cf_id":1,"desc":{"id":2147483650,"name":"__op_type","data_type":{"UInt8":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"comment":""}}],"row_key_end":2,"timestamp_key_index":1,"enable_version_column":false,"user_column_end":3},"column_families":{"column_families":[{"name":"default","cf_id":1,"column_index_start":2,"column_index_end":3}]},"version":0}}"#;
let region_change = r#" {"committed_sequence":42,"metadata":{"column_metadatas":[{"column_schema":{"name":"a","data_type":{"Int64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Tag","column_id":1},{"column_schema":{"name":"b","data_type":{"Float64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Field","column_id":2},{"column_schema":{"name":"c","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"metadata":{}},"semantic_type":"Timestamp","column_id":3}],"version":9,"primary_key":[1],"region_id":5299989648942}}"#;
let _ = serde_json::from_str::<RegionChange>(region_change).unwrap();
let region_remove = r#"{"region_id":42}"#;

View File

@@ -16,24 +16,21 @@ use serde::Serialize;
use store_api::manifest::action::ProtocolVersion;
use store_api::manifest::ManifestVersion;
use crate::error::Result;
use crate::manifest::action::RegionCheckpoint;
pub const NEWLINE: &[u8] = b"\n";
pub fn encode_actions<T: Serialize>(
prev_version: ManifestVersion,
actions: &[T],
) -> Result<Vec<u8>, ()> {
) -> Result<Vec<u8>> {
todo!()
}
pub fn encode_checkpoint(snasphot: &RegionCheckpoint) -> Result<Vec<u8>, ()> {
pub fn encode_checkpoint(snasphot: &RegionCheckpoint) -> Result<Vec<u8>> {
todo!()
}
pub fn decode_checkpoint(
bs: &[u8],
reader_version: ProtocolVersion,
) -> Result<RegionCheckpoint, ()> {
pub fn decode_checkpoint(bs: &[u8], reader_version: ProtocolVersion) -> Result<RegionCheckpoint> {
todo!()
}

View File

@@ -16,20 +16,24 @@
use std::sync::Arc;
use datatypes::schema::{ColumnSchema, SchemaRef};
use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use serde::{Deserialize, Deserializer, Serialize};
use store_api::storage::{ColumnId, RegionId};
use crate::region::VersionNumber;
#[cfg_attr(doc, aquamarine::aquamarine)]
/// Static metadata of a region.
///
/// This struct implements [Serialize] and [Deserialize] traits.
///
/// ```mermaid
/// class RegionMetadata {
/// +RegionId region_id
/// +VersionNumber version
/// +SchemaRef schema
/// +Vec&lt;ColumnMetadata&gt; column_metadatas
/// +Vec&lt;ColumnId&gt; primary_keys
/// +Vec&lt;ColumnId&gt; primary_key
/// }
/// class Schema
/// class ColumnMetadata {
@@ -42,25 +46,106 @@ use crate::region::VersionNumber;
/// RegionMetadata o-- ColumnMetadata
/// ColumnMetadata o-- SemanticType
/// ```
#[derive(Debug)]
pub(crate) struct RegionMetadata {
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct RegionMetadata {
/// Latest schema of this region
#[serde(skip)]
schema: SchemaRef,
column_metadatas: Vec<ColumnMetadata>,
/// Version of metadata.
version: VersionNumber,
/// Maintains an ordered list of primary keys
primary_keys: Vec<ColumnId>,
primary_key: Vec<ColumnId>,
/// Immutable and unique id
id: RegionId,
region_id: RegionId,
}
pub(crate) type RegionMetadataRef = Arc<RegionMetadata>;
pub type RegionMetadataRef = Arc<RegionMetadata>;
impl<'de> Deserialize<'de> for RegionMetadata {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
// helper internal struct for deserialization
#[derive(Deserialize)]
struct RegionMetadataWithoutSchema {
column_metadatas: Vec<ColumnMetadata>,
version: VersionNumber,
primary_key: Vec<ColumnId>,
region_id: RegionId,
}
let region_metadata_without_schema =
RegionMetadataWithoutSchema::deserialize(deserializer)?;
let column_schemas = region_metadata_without_schema
.column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect();
let schema = Arc::new(Schema::new(column_schemas));
Ok(Self {
schema,
column_metadatas: region_metadata_without_schema.column_metadatas,
version: region_metadata_without_schema.version,
primary_key: region_metadata_without_schema.primary_key,
region_id: region_metadata_without_schema.region_id,
})
}
}
pub struct RegionMetadataBuilder {
schema: SchemaRef,
column_metadatas: Vec<ColumnMetadata>,
version: VersionNumber,
primary_key: Vec<ColumnId>,
region_id: RegionId,
}
impl RegionMetadataBuilder {
pub fn new(id: RegionId, version: VersionNumber) -> Self {
Self {
schema: Arc::new(Schema::new(vec![])),
column_metadatas: vec![],
version,
primary_key: vec![],
region_id: id,
}
}
/// Add a column metadata to this region metadata.
/// This method will check the semantic type and add it to primary keys automatically.
pub fn add_column_metadata(mut self, column_metadata: ColumnMetadata) -> Self {
if column_metadata.semantic_type == SemanticType::Tag {
self.primary_key.push(column_metadata.column_id);
}
self.column_metadatas.push(column_metadata);
self
}
pub fn build(self) -> RegionMetadata {
let schema = Arc::new(Schema::new(
self.column_metadatas
.iter()
.map(|column_metadata| column_metadata.column_schema.clone())
.collect(),
));
RegionMetadata {
schema,
column_metadatas: self.column_metadatas,
version: self.version,
primary_key: self.primary_key,
region_id: self.region_id,
}
}
}
/// Metadata of a column.
#[derive(Debug)]
pub(crate) struct ColumnMetadata {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ColumnMetadata {
/// Schema of this column. Is the same as `column_schema` in [SchemaRef].
column_schema: ColumnSchema,
semantic_type: SemanticType,
@@ -68,9 +153,48 @@ pub(crate) struct ColumnMetadata {
}
/// The semantic type of one column
#[derive(Debug)]
pub(crate) enum SemanticType {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum SemanticType {
Tag,
Field,
Timestamp,
}
#[cfg(test)]
mod test {
use datatypes::prelude::ConcreteDataType;
use super::*;
fn build_test_region_metadata() -> RegionMetadata {
let builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678), 9);
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 2,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"c",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
});
builder.build()
}
#[test]
fn test_region_metadata_serde() {
let region_metadata = build_test_region_metadata();
let serialized = serde_json::to_string(&region_metadata).unwrap();
let deserialized: RegionMetadata = serde_json::from_str(&serialized).unwrap();
assert_eq!(region_metadata, deserialized);
}
}