From 077785cf1e56d6325eb406b4d91620f7c484485a Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Wed, 12 Jul 2023 11:42:55 +0800 Subject: [PATCH] refactor(mito): define manifest related API (#1942) * refactor: port some manifest struct to mito2 Signed-off-by: Ruihang Xia * fix clippy and nextest Signed-off-by: Ruihang Xia * revert lock file and resolve clippy warnings Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- Cargo.lock | 30 +++ src/mito2/Cargo.toml | 33 ++++ src/mito2/src/error.rs | 19 +- src/mito2/src/lib.rs | 3 + src/mito2/src/manifest.rs | 21 +++ src/mito2/src/manifest/action.rs | 295 ++++++++++++++++++++++++++++++ src/mito2/src/manifest/gc_task.rs | 30 +++ src/mito2/src/manifest/helper.rs | 39 ++++ src/mito2/src/manifest/impl_.rs | 121 ++++++++++++ 9 files changed, 589 insertions(+), 2 deletions(-) create mode 100644 src/mito2/src/manifest.rs create mode 100644 src/mito2/src/manifest/action.rs create mode 100644 src/mito2/src/manifest/gc_task.rs create mode 100644 src/mito2/src/manifest/helper.rs create mode 100644 src/mito2/src/manifest/impl_.rs diff --git a/Cargo.lock b/Cargo.lock index 24e79419fc..a953e389ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5433,11 +5433,41 @@ dependencies = [ name = "mito2" version = "0.3.2" dependencies = [ + "anymap", "aquamarine", + "arc-swap", + "async-stream", + "async-trait", + "chrono", + "common-catalog", + "common-datasource", "common-error", + "common-procedure", + "common-procedure-test", + "common-query", + "common-recordbatch", + "common-runtime", + "common-telemetry", + "common-test-util", + "common-time", + "dashmap", + "datafusion", + "datafusion-common", + "datatypes", + "futures", + "key-lock", + "lazy_static", + "log-store", + "metrics", "object-store", + "regex", + "serde", + "serde_json", "snafu", + "storage", "store-api", + "table", + "tokio", ] [[package]] diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index b1b0e55e37..640549dd5f 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -6,7 +6,40 @@ license.workspace = true [dependencies] aquamarine = "0.3" +anymap = "1.0.0-beta.2" +arc-swap = "1.0" +async-stream.workspace = true +async-trait = "0.1" +chrono.workspace = true +common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } +common-procedure = { path = "../common/procedure" } +common-query = { path = "../common/query" } +common-recordbatch = { path = "../common/recordbatch" } +common-runtime = { path = "../common/runtime" } +common-datasource = { path = "../common/datasource" } +common-telemetry = { path = "../common/telemetry" } +common-test-util = { path = "../common/test-util", optional = true } +common-time = { path = "../common/time" } +dashmap = "5.4" +datafusion.workspace = true +datafusion-common.workspace = true +datatypes = { path = "../datatypes" } +futures.workspace = true +key-lock = "0.1" +lazy_static = "1.4" +log-store = { path = "../log-store" } +metrics.workspace = true object-store = { path = "../object-store" } +regex = "1.5" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" snafu.workspace = true +storage = { path = "../storage" } store-api = { path = "../store-api" } +table = { path = "../table" } +tokio.workspace = true + +[dev-dependencies] +common-test-util = { path = "../common/test-util" } +common-procedure-test = { path = "../common/procedure-test" } diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index f751c4ccaf..2722afa956 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -12,10 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! Error of mito engine. +use std::any::Any; -use snafu::Snafu; +use common_error::prelude::*; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum Error {} + +pub type Result = std::result::Result; + +impl ErrorExt for Error { + #[allow(clippy::match_single_binding)] + fn status_code(&self) -> StatusCode { + match self { + _ => todo!(), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index 4636c99627..b84222990c 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -22,6 +22,9 @@ pub mod config; pub mod engine; pub mod error; #[allow(dead_code)] +#[allow(unused_variables)] +pub mod manifest; +#[allow(dead_code)] mod region; #[allow(dead_code)] mod worker; diff --git a/src/mito2/src/manifest.rs b/src/mito2/src/manifest.rs new file mode 100644 index 0000000000..47bd9a3cea --- /dev/null +++ b/src/mito2/src/manifest.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! manifest storage + +mod action; +mod gc_task; +mod helper; +#[allow(unused_variables)] +mod impl_; diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs new file mode 100644 index 0000000000..812804d4dc --- /dev/null +++ b/src/mito2/src/manifest/action.rs @@ -0,0 +1,295 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; +use storage::metadata::{ColumnFamilyMetadata, ColumnMetadata, VersionNumber}; +use storage::sst::{FileId, FileMeta}; +use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; +use store_api::manifest::ManifestVersion; +use store_api::storage::{RegionId, SequenceNumber}; + +use crate::manifest::helper; + +/// Minimal data that could be used to persist and recover [RegionMetadata](crate::metadata::RegionMetadata). +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)] +pub struct RawRegionMetadata { + pub id: RegionId, + pub name: String, + pub columns: RawColumnsMetadata, + pub column_families: RawColumnFamiliesMetadata, + pub version: VersionNumber, +} + +/// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct RawColumnsMetadata { + pub columns: Vec, + pub row_key_end: usize, + pub timestamp_key_index: usize, + pub user_column_end: usize, +} + +/// Minimal data that could be used to persist and recover [ColumnFamiliesMetadata](crate::metadata::ColumnFamiliesMetadata). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct RawColumnFamiliesMetadata { + pub column_families: Vec, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RegionChange { + /// The committed sequence of the region when this change happens. So the + /// data with sequence **greater than** this sequence would use the new + /// metadata. + pub committed_sequence: SequenceNumber, + /// The metadata after changed. + pub metadata: RawRegionMetadata, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RegionRemove { + pub region_id: RegionId, +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RegionEdit { + pub region_version: VersionNumber, + pub flushed_sequence: Option, + pub files_to_add: Vec, + pub files_to_remove: Vec, + pub compaction_time_window: Option, +} + +/// The region version checkpoint +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RegionVersion { + pub manifest_version: ManifestVersion, + pub flushed_sequence: Option, + pub files: HashMap, +} + +/// The region manifest data checkpoint +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)] +pub struct RegionManifestData { + pub committed_sequence: SequenceNumber, + pub metadata: RawRegionMetadata, + pub version: Option, +} + +#[derive(Debug, Default)] +pub struct RegionManifestDataBuilder { + committed_sequence: SequenceNumber, + metadata: RawRegionMetadata, + version: Option, +} + +impl RegionManifestDataBuilder { + pub fn with_checkpoint(checkpoint: Option) -> Self { + if let Some(s) = checkpoint { + Self { + metadata: s.metadata, + version: s.version, + committed_sequence: s.committed_sequence, + } + } else { + Default::default() + } + } + + pub fn apply_change(&mut self, change: RegionChange) { + self.metadata = change.metadata; + self.committed_sequence = change.committed_sequence; + } + + pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) { + if let Some(version) = &mut self.version { + version.manifest_version = manifest_version; + version.flushed_sequence = edit.flushed_sequence; + for file in edit.files_to_add { + let _ = version.files.insert(file.file_id, file); + } + for file in edit.files_to_remove { + let _ = version.files.remove(&file.file_id); + } + } else { + self.version = Some(RegionVersion { + manifest_version, + flushed_sequence: edit.flushed_sequence, + files: edit + .files_to_add + .into_iter() + .map(|f| (f.file_id, f)) + .collect(), + }); + } + } + pub fn build(self) -> RegionManifestData { + RegionManifestData { + metadata: self.metadata, + version: self.version, + committed_sequence: self.committed_sequence, + } + } +} + +// The checkpoint of region manifest, generated by checkpoint. +#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] +pub struct RegionCheckpoint { + /// The snasphot protocol + pub protocol: ProtocolAction, + /// The last manifest version that this checkpoint compacts(inclusive). + pub last_version: ManifestVersion, + // The number of manifest actions that this checkpoint compacts. + pub compacted_actions: usize, + // The checkpoint data + pub checkpoint: Option, +} + +impl RegionCheckpoint { + fn set_protocol(&mut self, action: ProtocolAction) { + self.protocol = action; + } + + fn last_version(&self) -> ManifestVersion { + self.last_version + } + + fn encode(&self) -> Result, ()> { + todo!() + } + + fn decode(bs: &[u8], reader_version: ProtocolVersion) -> Result { + helper::decode_checkpoint(bs, reader_version) + } +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum RegionMetaAction { + Protocol(ProtocolAction), + Change(RegionChange), + Remove(RegionRemove), + Edit(RegionEdit), +} + +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RegionMetaActionList { + pub actions: Vec, + pub prev_version: ManifestVersion, +} + +impl RegionMetaActionList { + pub fn with_action(action: RegionMetaAction) -> Self { + Self { + actions: vec![action], + prev_version: 0, + } + } + + pub fn new(actions: Vec) -> Self { + Self { + actions, + prev_version: 0, + } + } +} + +impl RegionMetaActionList { + fn set_protocol(&mut self, action: ProtocolAction) { + // The protocol action should be the first action in action list by convention. + self.actions.insert(0, RegionMetaAction::Protocol(action)); + } + + fn set_prev_version(&mut self, version: ManifestVersion) { + self.prev_version = version; + } + + /// Encode self into json in the form of string lines, starts with prev_version and then action json list. + fn encode(&self) -> Result, ()> { + helper::encode_actions(self.prev_version, &self.actions) + } + + fn decode( + _bs: &[u8], + _reader_version: ProtocolVersion, + ) -> Result<(Self, Option), ()> { + todo!() + } +} + +pub struct MetaActionIteratorImpl { + // log_iter: ObjectStoreLogIterator, + reader_version: ProtocolVersion, + last_protocol: Option, +} + +impl MetaActionIteratorImpl { + pub fn last_protocol(&self) -> Option { + self.last_protocol.clone() + } + + async fn next_action(&mut self) -> Result, ()> { + todo!() + } +} + +#[cfg(test)] +mod tests { + use storage::sst::FileId; + + use super::*; + + #[test] + fn test_encode_decode_action_list() { + // TODO(ruihang): port this test case + } + + // These tests are used to ensure backward compatibility of manifest files. + // DO NOT modify the serialized string when they fail, check if your + // modification to manifest-related structs is compatible with older manifests. + #[test] + fn test_region_manifest_compatibility() { + let region_edit = r#"{"region_version":0,"flushed_sequence":null,"files_to_add":[{"region_id":4402341478400,"file_name":"4b220a70-2b03-4641-9687-b65d94641208.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":1}],"files_to_remove":[{"region_id":4402341478400,"file_name":"34b6ebb9-b8a5-4a4b-b744-56f67defad02.parquet","time_range":[{"value":1451609210000,"unit":"Millisecond"},{"value":1451609520000,"unit":"Millisecond"}],"level":0}]}"#; + let _ = serde_json::from_str::(region_edit).unwrap(); + + let region_change = r#" {"committed_sequence":42,"metadata":{"id":0,"name":"region-0","columns":{"columns":[{"cf_id":0,"desc":{"id":2,"name":"k1","data_type":{"Int32":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"comment":""}},{"cf_id":0,"desc":{"id":1,"name":"timestamp","data_type":{"Timestamp":{"Millisecond":null}},"is_nullable":false,"is_time_index":true,"default_constraint":null,"comment":""}},{"cf_id":1,"desc":{"id":3,"name":"v1","data_type":{"Float32":{}},"is_nullable":true,"is_time_index":false,"default_constraint":null,"comment":""}},{"cf_id":1,"desc":{"id":2147483649,"name":"__sequence","data_type":{"UInt64":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"comment":""}},{"cf_id":1,"desc":{"id":2147483650,"name":"__op_type","data_type":{"UInt8":{}},"is_nullable":false,"is_time_index":false,"default_constraint":null,"comment":""}}],"row_key_end":2,"timestamp_key_index":1,"enable_version_column":false,"user_column_end":3},"column_families":{"column_families":[{"name":"default","cf_id":1,"column_index_start":2,"column_index_end":3}]},"version":0}}"#; + let _ = serde_json::from_str::(region_change).unwrap(); + + let region_remove = r#"{"region_id":42}"#; + let _ = serde_json::from_str::(region_remove).unwrap(); + + let protocol_action = r#"{"min_reader_version":1,"min_writer_version":2}"#; + let _ = serde_json::from_str::(protocol_action).unwrap(); + } + + fn mock_file_meta() -> FileMeta { + FileMeta { + region_id: 0.into(), + file_id: FileId::random(), + time_range: None, + level: 0, + file_size: 1024, + } + } + + #[test] + fn test_region_manifest_builder() { + // TODO(ruihang): port this test case + } + + #[test] + fn test_encode_decode_region_checkpoint() { + // TODO(ruihang): port this test case + } +} diff --git a/src/mito2/src/manifest/gc_task.rs b/src/mito2/src/manifest/gc_task.rs new file mode 100644 index 0000000000..47e459eff6 --- /dev/null +++ b/src/mito2/src/manifest/gc_task.rs @@ -0,0 +1,30 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_runtime::TaskFunction; + +struct ManifestGcTask {} + +#[async_trait::async_trait] +impl TaskFunction<()> for ManifestGcTask { + /// Invoke the task. + async fn call(&mut self) -> std::result::Result<(), ()> { + todo!() + } + + /// Name of the task. + fn name(&self) -> &str { + todo!() + } +} diff --git a/src/mito2/src/manifest/helper.rs b/src/mito2/src/manifest/helper.rs new file mode 100644 index 0000000000..d24643b444 --- /dev/null +++ b/src/mito2/src/manifest/helper.rs @@ -0,0 +1,39 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::Serialize; +use store_api::manifest::action::ProtocolVersion; +use store_api::manifest::ManifestVersion; + +use crate::manifest::action::RegionCheckpoint; + +pub const NEWLINE: &[u8] = b"\n"; + +pub fn encode_actions( + prev_version: ManifestVersion, + actions: &[T], +) -> Result, ()> { + todo!() +} + +pub fn encode_checkpoint(snasphot: &RegionCheckpoint) -> Result, ()> { + todo!() +} + +pub fn decode_checkpoint( + bs: &[u8], + reader_version: ProtocolVersion, +) -> Result { + todo!() +} diff --git a/src/mito2/src/manifest/impl_.rs b/src/mito2/src/manifest/impl_.rs new file mode 100644 index 0000000000..02b5f855ff --- /dev/null +++ b/src/mito2/src/manifest/impl_.rs @@ -0,0 +1,121 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datasource::compression::CompressionType; +use object_store::ObjectStore; +use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; +use store_api::manifest::ManifestVersion; + +use crate::manifest::action::{MetaActionIteratorImpl, RegionCheckpoint, RegionMetaActionList}; + +type Result = std::result::Result; + +// rewrite note: +// trait Checkpoint -> struct RegionCheckpoint +// trait MetaAction -> struct RegionMetaActionList +// trait MetaActionIterator -> struct MetaActionIteratorImpl +#[derive(Clone, Debug)] +pub struct Regionmanifest {} + +impl Regionmanifest { + // from impl ManifestImpl + + pub fn new() -> Self { + todo!() + } + + pub fn create( + _manifest_dir: &str, + _object_store: ObjectStore, + _compress_type: CompressionType, + ) -> Self { + todo!() + } + + // pub (crate) fn checkpointer(&self) -> Checkpointer { + // todo!() + // } + + pub(crate) fn set_last_checkpoint_version(&self, _version: ManifestVersion) { + todo!() + } + + /// Update inner state. + pub fn update_state(&self, _version: ManifestVersion, _protocol: Option) { + todo!() + } + + pub(crate) async fn save_checkpoint(&self, checkpoint: &RegionCheckpoint) -> Result<()> { + todo!() + } + + pub(crate) async fn may_do_checkpoint(&self, version: ManifestVersion) -> Result<()> { + todo!() + } + + // pub(crate) fn manifest_store(&self) -> &Arc { + // todo!() + // } + + // from Manifest + + async fn update(&self, action_list: RegionMetaActionList) -> Result { + todo!() + } + + async fn scan( + &self, + start: ManifestVersion, + end: ManifestVersion, + ) -> Result { + todo!() + } + + async fn do_checkpoint(&self) -> Result> { + todo!() + } + + async fn last_checkpoint(&self) -> Result> { + todo!() + } + + async fn start(&self) -> Result<()> { + todo!() + } + + async fn stop(&self) -> Result<()> { + todo!() + } + + // from Checkpoint + + /// Set a protocol action into checkpoint + pub fn set_protocol(&mut self, _action: ProtocolAction) { + todo!() + } + + /// The last compacted action's version of checkpoint + pub fn last_version(&self) -> ManifestVersion { + todo!() + } + + /// Encode this checkpoint into a byte vector + pub fn encode(&self) -> Result> { + todo!() + } + + pub fn decode(_bytes: &[u8], _reader_version: ProtocolVersion) -> Result { + todo!() + } +}