diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 58a5d023a1..51f2eae93a 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -95,6 +95,12 @@ pub enum Error { source: tokio::sync::oneshot::error::RecvError, location: Location, }, + + #[snafu(display( + "Expect initial region metadata on creating/opening a new region, location: {}", + location + ))] + InitialMetadata { location: Location }, } pub type Result = std::result::Result; @@ -108,7 +114,7 @@ impl ErrorExt for Error { CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } => { StatusCode::Unexpected } - InvalidScanIndex { .. } => StatusCode::InvalidArguments, + InvalidScanIndex { .. } | InitialMetadata { .. } => StatusCode::InvalidArguments, RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => { StatusCode::Internal } diff --git a/src/mito2/src/manifest.rs b/src/mito2/src/manifest.rs index 3b51310f1b..1b2ec86e5d 100644 --- a/src/mito2/src/manifest.rs +++ b/src/mito2/src/manifest.rs @@ -14,9 +14,10 @@ //! manifest storage -mod action; -mod gc_task; -mod helper; +pub mod action; +pub mod gc_task; +pub mod helper; #[allow(unused_variables)] -mod impl_; -mod storage; +pub mod manager; +pub mod options; +pub mod storage; diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 9c063f80c4..0cf5ac4727 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -14,72 +14,71 @@ use std::collections::HashMap; +use common_telemetry::info; use serde::{Deserialize, Serialize}; -use snafu::OptionExt; +use snafu::{OptionExt, ResultExt}; use storage::metadata::VersionNumber; use storage::sst::{FileId, FileMeta}; use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; use store_api::manifest::ManifestVersion; use store_api::storage::{RegionId, SequenceNumber}; -use crate::error::{RegionMetadataNotFoundSnafu, Result}; -use crate::manifest::helper; +use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu}; use crate::metadata::RegionMetadata; +/// Actions that can be applied to region manifest. +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum RegionMetaAction { + /// Set the min/max supported protocol version + Protocol(ProtocolAction), + /// Change region's metadata for request like ALTER + Change(RegionChange), + /// Edit region's state for changing options or file list. + Edit(RegionEdit), + /// Remove the region. + Remove(RegionRemove), +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionChange { - /// The committed sequence of the region when this change happens. So the - /// data with sequence **greater than** this sequence would use the new - /// metadata. - pub committed_sequence: SequenceNumber, /// The metadata after changed. pub metadata: RegionMetadata, } +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RegionEdit { + pub region_version: VersionNumber, + pub files_to_add: Vec, + pub files_to_remove: Vec, + pub compaction_time_window: Option, + pub flushed_sequence: Option, +} + #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionRemove { pub region_id: RegionId, } +/// The region manifest data #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct RegionEdit { - pub region_version: VersionNumber, - pub flushed_sequence: Option, - pub files_to_add: Vec, - pub files_to_remove: Vec, - pub compaction_time_window: Option, -} - -/// The region version checkpoint -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct RegionVersion { - pub manifest_version: ManifestVersion, - pub flushed_sequence: Option, - pub files: HashMap, -} - -/// The region manifest data checkpoint -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub struct RegionManifestData { - pub committed_sequence: SequenceNumber, +pub struct RegionManifest { pub metadata: RegionMetadata, - pub version: Option, + pub version: RegionVersion, } #[derive(Debug, Default)] -pub struct RegionManifestDataBuilder { - committed_sequence: SequenceNumber, +pub struct RegionManifestBuilder { metadata: Option, version: Option, } -impl RegionManifestDataBuilder { - pub fn with_checkpoint(checkpoint: Option) -> Self { +impl RegionManifestBuilder { + /// Start with a checkpoint + pub fn with_checkpoint(checkpoint: Option) -> Self { if let Some(s) = checkpoint { Self { metadata: Some(s.metadata), - version: s.version, - committed_sequence: s.committed_sequence, + version: Some(s.version), } } else { Default::default() @@ -88,13 +87,11 @@ impl RegionManifestDataBuilder { pub fn apply_change(&mut self, change: RegionChange) { self.metadata = Some(change.metadata); - self.committed_sequence = change.committed_sequence; } pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) { if let Some(version) = &mut self.version { version.manifest_version = manifest_version; - version.flushed_sequence = edit.flushed_sequence; for file in edit.files_to_add { let _ = version.files.insert(file.file_id, file); } @@ -104,7 +101,6 @@ impl RegionManifestDataBuilder { } else { self.version = Some(RegionVersion { manifest_version, - flushed_sequence: edit.flushed_sequence, files: edit .files_to_add .into_iter() @@ -114,16 +110,32 @@ impl RegionManifestDataBuilder { } } - pub fn try_build(self) -> Result { - Ok(RegionManifestData { - metadata: self.metadata.context(RegionMetadataNotFoundSnafu)?, - version: self.version, - committed_sequence: self.committed_sequence, - }) + /// Check if the builder keeps a [RegionMetadata] + pub fn contains_metadata(&self) -> bool { + self.metadata.is_some() + } + + pub fn try_build(self) -> Result { + let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?; + let version = self.version.unwrap_or_else(|| { + info!( + "Create new default region version for region {:?}", + metadata.region_id + ); + RegionVersion::default() + }); + Ok(RegionManifest { metadata, version }) } } -// The checkpoint of region manifest, generated by checkpoint. +/// The region version checkpoint +#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)] +pub struct RegionVersion { + pub manifest_version: ManifestVersion, + pub files: HashMap, +} + +// The checkpoint of region manifest, generated by checkpointer. #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] pub struct RegionCheckpoint { /// The snasphot protocol @@ -133,35 +145,28 @@ pub struct RegionCheckpoint { // The number of manifest actions that this checkpoint compacts. pub compacted_actions: usize, // The checkpoint data - pub checkpoint: Option, + pub checkpoint: Option, } impl RegionCheckpoint { - fn set_protocol(&mut self, action: ProtocolAction) { + pub fn set_protocol(&mut self, action: ProtocolAction) { self.protocol = action; } - fn last_version(&self) -> ManifestVersion { + pub fn last_version(&self) -> ManifestVersion { self.last_version } - fn encode(&self) -> Result> { + pub fn encode(&self) -> Result> { todo!() } - fn decode(bs: &[u8], reader_version: ProtocolVersion) -> Result { - helper::decode_checkpoint(bs, reader_version) + pub fn decode(bs: &[u8]) -> Result { + // helper::decode_checkpoint(bs, reader_version) + todo!() } } -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] -pub enum RegionMetaAction { - Protocol(ProtocolAction), - Change(RegionChange), - Remove(RegionRemove), - Edit(RegionEdit), -} - #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionMetaActionList { pub actions: Vec, @@ -190,30 +195,31 @@ impl RegionMetaActionList { self.actions.insert(0, RegionMetaAction::Protocol(action)); } - fn set_prev_version(&mut self, version: ManifestVersion) { + pub fn set_prev_version(&mut self, version: ManifestVersion) { self.prev_version = version; } /// Encode self into json in the form of string lines, starts with prev_version and then action json list. - fn encode(&self) -> Result> { - helper::encode_actions(self.prev_version, &self.actions) + pub fn encode(&self) -> Result> { + let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?; + + Ok(json.into_bytes()) } - fn decode( - _bs: &[u8], - _reader_version: ProtocolVersion, - ) -> Result<(Self, Option)> { - todo!() + pub fn decode(bytes: &[u8]) -> Result { + let data = std::str::from_utf8(bytes).context(Utf8Snafu)?; + + serde_json::from_str(data).context(SerdeJsonSnafu) } } -pub struct MetaActionIteratorImpl { +pub struct RegionMetaActionIter { // log_iter: ObjectStoreLogIterator, reader_version: ProtocolVersion, last_protocol: Option, } -impl MetaActionIteratorImpl { +impl RegionMetaActionIter { pub fn last_protocol(&self) -> Option { self.last_protocol.clone() } diff --git a/src/mito2/src/manifest/impl_.rs b/src/mito2/src/manifest/impl_.rs deleted file mode 100644 index 02b5f855ff..0000000000 --- a/src/mito2/src/manifest/impl_.rs +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2023 Greptime Team -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use common_datasource::compression::CompressionType; -use object_store::ObjectStore; -use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; -use store_api::manifest::ManifestVersion; - -use crate::manifest::action::{MetaActionIteratorImpl, RegionCheckpoint, RegionMetaActionList}; - -type Result = std::result::Result; - -// rewrite note: -// trait Checkpoint -> struct RegionCheckpoint -// trait MetaAction -> struct RegionMetaActionList -// trait MetaActionIterator -> struct MetaActionIteratorImpl -#[derive(Clone, Debug)] -pub struct Regionmanifest {} - -impl Regionmanifest { - // from impl ManifestImpl - - pub fn new() -> Self { - todo!() - } - - pub fn create( - _manifest_dir: &str, - _object_store: ObjectStore, - _compress_type: CompressionType, - ) -> Self { - todo!() - } - - // pub (crate) fn checkpointer(&self) -> Checkpointer { - // todo!() - // } - - pub(crate) fn set_last_checkpoint_version(&self, _version: ManifestVersion) { - todo!() - } - - /// Update inner state. - pub fn update_state(&self, _version: ManifestVersion, _protocol: Option) { - todo!() - } - - pub(crate) async fn save_checkpoint(&self, checkpoint: &RegionCheckpoint) -> Result<()> { - todo!() - } - - pub(crate) async fn may_do_checkpoint(&self, version: ManifestVersion) -> Result<()> { - todo!() - } - - // pub(crate) fn manifest_store(&self) -> &Arc { - // todo!() - // } - - // from Manifest - - async fn update(&self, action_list: RegionMetaActionList) -> Result { - todo!() - } - - async fn scan( - &self, - start: ManifestVersion, - end: ManifestVersion, - ) -> Result { - todo!() - } - - async fn do_checkpoint(&self) -> Result> { - todo!() - } - - async fn last_checkpoint(&self) -> Result> { - todo!() - } - - async fn start(&self) -> Result<()> { - todo!() - } - - async fn stop(&self) -> Result<()> { - todo!() - } - - // from Checkpoint - - /// Set a protocol action into checkpoint - pub fn set_protocol(&mut self, _action: ProtocolAction) { - todo!() - } - - /// The last compacted action's version of checkpoint - pub fn last_version(&self) -> ManifestVersion { - todo!() - } - - /// Encode this checkpoint into a byte vector - pub fn encode(&self) -> Result> { - todo!() - } - - pub fn decode(_bytes: &[u8], _reader_version: ProtocolVersion) -> Result { - todo!() - } -} diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs new file mode 100644 index 0000000000..61c0df8664 --- /dev/null +++ b/src/mito2/src/manifest/manager.rs @@ -0,0 +1,340 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use arc_swap::ArcSwap; +use common_telemetry::{debug, info}; +use snafu::OptionExt; +use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; +use store_api::manifest::{AtomicManifestVersion, ManifestVersion, MAX_VERSION, MIN_VERSION}; + +use crate::error::{InitialMetadataSnafu, Result}; +use crate::manifest::action::{ + RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, + RegionMetaActionIter, RegionMetaActionList, +}; +use crate::manifest::options::RegionManifestOptions; +use crate::manifest::storage::ManifestObjectStore; + +// rewrite note: +// trait Checkpoint -> struct RegionCheckpoint +// trait MetaAction -> struct RegionMetaActionList +// trait MetaActionIterator -> struct MetaActionIteratorImpl + +/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted +/// metadata. +#[derive(Clone, Debug)] +pub struct RegionManifestManager { + inner: Arc, +} + +impl RegionManifestManager { + /// Construct and recover a region's manifest from storage. + pub async fn new(options: RegionManifestOptions) -> Result { + let inner = RegionManifestManagerInner::new(options).await?; + Ok(Self { + inner: Arc::new(inner), + }) + } + + pub async fn stop(&self) -> Result<()> { + self.inner.stop().await + } + + /// Update the manifest. Return the current manifest version number. + pub async fn update(&self, action_list: RegionMetaActionList) -> Result { + self.inner.update(action_list).await + } + + /// Retrieve the current [RegionManifest]. + pub fn manifest(&self) -> Arc { + self.inner.manifest.load().clone() + } +} + +#[derive(Debug)] +struct RegionManifestManagerInner { + store: ManifestObjectStore, + options: RegionManifestOptions, + version: AtomicManifestVersion, + manifest: ArcSwap, +} + +impl RegionManifestManagerInner { + pub async fn new(mut options: RegionManifestOptions) -> Result { + // construct storage + let store = ManifestObjectStore::new( + &options.manifest_dir, + options.object_store.clone(), + options.compress_type, + ); + + // recover from storage + // construct manifest builder + let mut version = MIN_VERSION; + let last_checkpoint = store.load_last_checkpoint().await?; + let checkpoint = last_checkpoint + .map(|(_, raw_checkpoint)| RegionCheckpoint::decode(&raw_checkpoint)) + .transpose()?; + let mut manifest_builder = if let Some(checkpoint) = checkpoint { + info!( + "Recover region manifest from checkpoint version {}", + checkpoint.last_version + ); + version = version.max(checkpoint.last_version + 1); + RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint) + } else { + info!("Checkpoint not found, build manifest from scratch"); + RegionManifestBuilder::default() + }; + + // apply actions from storage + let mut action_iter = store.scan(version, MAX_VERSION).await?; + while let Some((manifest_version, raw_action_list)) = action_iter.next_log().await? { + let action_list = RegionMetaActionList::decode(&raw_action_list)?; + for action in action_list.actions { + match action { + RegionMetaAction::Change(action) => { + manifest_builder.apply_change(action); + } + RegionMetaAction::Edit(action) => { + manifest_builder.apply_edit(manifest_version, action); + } + RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => { + debug!("Unhandled action: {:?}", action); + } + } + } + } + + // set the initial metadata if necessary + if !manifest_builder.contains_metadata() { + let metadata = options + .initial_metadata + .take() + .context(InitialMetadataSnafu)?; + info!("Creating region manifest with metadata {:?}", metadata); + manifest_builder.apply_change(RegionChange { metadata }); + } + + let manifest = manifest_builder.try_build()?; + debug!("Recovered region manifest: {:?}", manifest); + let version = manifest.version.manifest_version; + + // todo: start gc task + + Ok(Self { + store, + options, + version: AtomicManifestVersion::new(version), + manifest: ArcSwap::new(Arc::new(manifest)), + }) + } + + pub async fn stop(&self) -> Result<()> { + // todo: stop gc task + Ok(()) + } + + pub async fn update(&self, action_list: RegionMetaActionList) -> Result { + let version = self.inc_version(); + + self.store.save(version, &action_list.encode()?).await?; + + let mut manifest_builder = + RegionManifestBuilder::with_checkpoint(Some(self.manifest.load().as_ref().clone())); + for action in action_list.actions { + match action { + RegionMetaAction::Change(action) => { + manifest_builder.apply_change(action); + } + RegionMetaAction::Edit(action) => { + manifest_builder.apply_edit(version, action); + } + RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => { + debug!("Unhandled action: {:?}", action); + } + } + } + let new_manifest = manifest_builder.try_build()?; + self.manifest.store(Arc::new(new_manifest)); + + Ok(version) + } +} + +impl RegionManifestManagerInner { + fn inc_version(&self) -> ManifestVersion { + self.version.fetch_add(1, Ordering::Relaxed) + } + + // pub (crate) fn checkpointer(&self) -> Checkpointer { + // todo!() + // } + + pub(crate) fn set_last_checkpoint_version(&self, _version: ManifestVersion) { + todo!() + } + + /// Update inner state. + pub fn update_state(&self, _version: ManifestVersion, _protocol: Option) { + todo!() + } + + pub(crate) async fn save_checkpoint(&self, checkpoint: &RegionCheckpoint) -> Result<()> { + todo!() + } + + pub(crate) async fn may_do_checkpoint(&self, version: ManifestVersion) -> Result<()> { + todo!() + } + + // pub(crate) fn manifest_store(&self) -> &Arc { + // todo!() + // } + + // from Manifest + + async fn scan( + &self, + start: ManifestVersion, + end: ManifestVersion, + ) -> Result { + todo!() + } + + async fn do_checkpoint(&self) -> Result> { + todo!() + } + + async fn last_checkpoint(&self) -> Result> { + todo!() + } + + // from Checkpoint + + /// Set a protocol action into checkpoint + pub fn set_protocol(&mut self, _action: ProtocolAction) { + todo!() + } + + /// The last compacted action's version of checkpoint + pub fn last_version(&self) -> ManifestVersion { + todo!() + } + + /// Encode this checkpoint into a byte vector + pub fn encode(&self) -> Result> { + todo!() + } + + pub fn decode(_bytes: &[u8], _reader_version: ProtocolVersion) -> Result { + todo!() + } +} + +#[cfg(test)] +mod test { + use common_datasource::compression::CompressionType; + use datatypes::prelude::ConcreteDataType; + use datatypes::schema::ColumnSchema; + use store_api::storage::RegionId; + + use super::*; + use crate::error::Error; + use crate::manifest::action::RegionChange; + use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType}; + use crate::test_util::TestEnv; + + fn basic_region_metadata() -> RegionMetadata { + let builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0); + let builder = builder.add_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + semantic_type: SemanticType::Timestamp, + column_id: 45, + }); + let builder = builder.add_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false), + semantic_type: SemanticType::Tag, + column_id: 36, + }); + let builder = builder.add_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("val", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 251, + }); + builder.build() + } + + #[tokio::test] + async fn create_region_without_initial_metadata() { + let env = TestEnv::new(""); + let result = env + .create_manifest_manager(CompressionType::Uncompressed, None, None) + .await; + assert!(matches!( + result.err().unwrap(), + Error::InitialMetadata { .. } + )) + } + + #[tokio::test] + async fn create_manifest_manager() { + let metadata = basic_region_metadata(); + let env = TestEnv::new(""); + let manager = env + .create_manifest_manager(CompressionType::Uncompressed, None, Some(metadata.clone())) + .await + .unwrap(); + + let manifest = manager.manifest(); + assert_eq!(manifest.metadata, metadata); + } + + #[tokio::test] + async fn region_change_add_column() { + let metadata = basic_region_metadata(); + let env = TestEnv::new(""); + let manager = env + .create_manifest_manager(CompressionType::Uncompressed, None, Some(metadata.clone())) + .await + .unwrap(); + + let new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1); + let new_metadata_builder = new_metadata_builder.add_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false), + semantic_type: SemanticType::Field, + column_id: 252, + }); + let new_metadata = new_metadata_builder.build(); + + let mut action_list = + RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { + metadata: new_metadata.clone(), + })); + action_list.set_prev_version(0); + + let prev_version = manager.update(action_list).await.unwrap(); + assert_eq!(prev_version, 0); + + let manifest = manager.manifest(); + assert_eq!(manifest.metadata, new_metadata); + } +} diff --git a/src/mito2/src/manifest/options.rs b/src/mito2/src/manifest/options.rs new file mode 100644 index 0000000000..357d3ab526 --- /dev/null +++ b/src/mito2/src/manifest/options.rs @@ -0,0 +1,33 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Options for [RegionManifestManager](crate::manifest::manager::RegionManifestManager). + +use common_datasource::compression::CompressionType; +use object_store::ObjectStore; + +use crate::metadata::RegionMetadata; + +#[derive(Debug, Clone)] +pub struct RegionManifestOptions { + pub manifest_dir: String, + pub object_store: ObjectStore, + pub compress_type: CompressionType, + /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints + /// `None` means disable checkpoint. + pub checkpoint_interval: Option, + /// Initial [RegionMetadata](crate::metadata::RegionMetadata) of this region. + /// Only need to set when create a new region, otherwise it will be ignored. + pub initial_metadata: Option, +} diff --git a/src/mito2/src/manifest/storage.rs b/src/mito2/src/manifest/storage.rs index 6a4cadfb4c..7d9f56abf8 100644 --- a/src/mito2/src/manifest/storage.rs +++ b/src/mito2/src/manifest/storage.rs @@ -106,7 +106,7 @@ pub struct ObjectStoreLogIterator { } impl ObjectStoreLogIterator { - async fn next_log(&mut self) -> Result)>> { + pub async fn next_log(&mut self) -> Result)>> { match self.iter.next() { Some((v, entry)) => { let compress_type = file_compress_type(entry.name()); @@ -182,31 +182,8 @@ impl ManifestObjectStore { pub(crate) fn path(&self) -> &str { &self.path } -} -#[derive(Serialize, Deserialize, Debug)] -struct CheckpointMetadata { - pub size: usize, - /// The latest version this checkpoint contains. - pub version: ManifestVersion, - pub checksum: Option, - pub extend_metadata: Option>, -} - -impl CheckpointMetadata { - fn encode(&self) -> Result> { - serde_json::to_string(self).context(SerdeJsonSnafu) - } - - fn decode(bs: &[u8]) -> Result { - let data = std::str::from_utf8(bs).context(Utf8Snafu)?; - - serde_json::from_str(data).context(SerdeJsonSnafu) - } -} - -impl ManifestObjectStore { - async fn scan( + pub async fn scan( &self, start: ManifestVersion, end: ManifestVersion, @@ -346,7 +323,7 @@ impl ManifestObjectStore { Ok(()) } - async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { + pub async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> { let path = self.delta_file_path(version); logging::debug!("Save log to manifest storage, version: {}", version); let data = self @@ -416,7 +393,7 @@ impl ManifestObjectStore { size: bytes.len(), version, checksum: None, - extend_metadata: None, + extend_metadata: HashMap::new(), }; logging::debug!( @@ -425,9 +402,9 @@ impl ManifestObjectStore { checkpoint_metadata ); - let bs = checkpoint_metadata.encode()?; + let bytes = checkpoint_metadata.encode()?; self.object_store - .write(&last_checkpoint_path, bs.as_ref().to_vec()) + .write(&last_checkpoint_path, bytes) .await .context(OpenDalSnafu)?; @@ -513,7 +490,9 @@ impl ManifestObjectStore { Ok(()) } - async fn load_last_checkpoint(&self) -> Result)>> { + /// Load the latest checkpoint. + /// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any + pub async fn load_last_checkpoint(&self) -> Result)>> { let last_checkpoint_path = self.last_checkpoint_path(); let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await { Ok(data) => data, @@ -537,6 +516,29 @@ impl ManifestObjectStore { } } +#[derive(Serialize, Deserialize, Debug)] +struct CheckpointMetadata { + pub size: usize, + /// The latest version this checkpoint contains. + pub version: ManifestVersion, + pub checksum: Option, + pub extend_metadata: HashMap, +} + +impl CheckpointMetadata { + fn encode(&self) -> Result> { + Ok(serde_json::to_string(self) + .context(SerdeJsonSnafu)? + .into_bytes()) + } + + fn decode(bs: &[u8]) -> Result { + let data = std::str::from_utf8(bs).context(Utf8Snafu)?; + + serde_json::from_str(data).context(SerdeJsonSnafu) + } +} + #[cfg(test)] mod tests { use common_test_util::temp_dir::create_temp_dir; diff --git a/src/mito2/src/metadata.rs b/src/mito2/src/metadata.rs index 02f793d046..627659b79d 100644 --- a/src/mito2/src/metadata.rs +++ b/src/mito2/src/metadata.rs @@ -26,6 +26,7 @@ use crate::region::VersionNumber; /// Static metadata of a region. /// /// This struct implements [Serialize] and [Deserialize] traits. +/// To build a [RegionMetadata] object, use [RegionMetadataBuilder]. /// /// ```mermaid /// class RegionMetadata { @@ -50,17 +51,17 @@ use crate::region::VersionNumber; pub struct RegionMetadata { /// Latest schema of this region #[serde(skip)] - schema: SchemaRef, + pub schema: SchemaRef, /// Columns in the region. Has the same order as columns /// in [schema](RegionMetadata::schema). - column_metadatas: Vec, + pub column_metadatas: Vec, /// Version of metadata. - version: VersionNumber, + pub version: VersionNumber, /// Maintains an ordered list of primary keys - primary_key: Vec, + pub primary_key: Vec, /// Immutable and unique id of a region. - region_id: RegionId, + pub region_id: RegionId, } pub type RegionMetadataRef = Arc; @@ -118,6 +119,17 @@ impl RegionMetadataBuilder { } } + /// Create a builder from existing [RegionMetadata]. + pub fn from_existing(existing: RegionMetadata, new_version: VersionNumber) -> Self { + Self { + schema: existing.schema, + column_metadatas: existing.column_metadatas, + version: new_version, + primary_key: existing.primary_key, + region_id: existing.region_id, + } + } + /// Add a column metadata to this region metadata. /// This method will check the semantic type and add it to primary keys automatically. pub fn add_column_metadata(mut self, column_metadata: ColumnMetadata) -> Self { @@ -149,11 +161,11 @@ impl RegionMetadataBuilder { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct ColumnMetadata { /// Schema of this column. Is the same as `column_schema` in [SchemaRef]. - column_schema: ColumnSchema, + pub column_schema: ColumnSchema, /// Semantic type of this column (e.g. tag or timestamp). - semantic_type: SemanticType, + pub semantic_type: SemanticType, /// Immutable and unique id of a region. - column_id: ColumnId, + pub column_id: ColumnId, } /// The semantic type of one column diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 28eaf5786d..4c7b62efaa 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -16,6 +16,7 @@ use std::sync::Arc; +use common_datasource::compression::CompressionType; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use log_store::raft_engine::log_store::RaftEngineLogStore; use log_store::test_util::log_store_util; @@ -25,6 +26,10 @@ use object_store::ObjectStore; use crate::config::MitoConfig; use crate::engine::MitoEngine; +use crate::error::Result; +use crate::manifest::manager::RegionManifestManager; +use crate::manifest::options::RegionManifestOptions; +use crate::metadata::RegionMetadata; use crate::worker::WorkerGroup; /// Env to test mito engine. @@ -67,4 +72,28 @@ impl TestEnv { (log_store, object_store) } + + pub async fn create_manifest_manager( + &self, + compress_type: CompressionType, + checkpoint_interval: Option, + initial_metadata: Option, + ) -> Result { + let data_home = self.data_home.path().to_str().unwrap(); + let manifest_dir = join_dir(data_home, "manifest"); + + let mut builder = Fs::default(); + let _ = builder.root(&manifest_dir); + let object_store = ObjectStore::new(builder).unwrap().finish(); + + let manifest_opts = RegionManifestOptions { + manifest_dir, + object_store, + compress_type, + checkpoint_interval, + initial_metadata, + }; + + RegionManifestManager::new(manifest_opts).await + } } diff --git a/src/store-api/src/manifest.rs b/src/store-api/src/manifest.rs index 985339c694..b43185ba4b 100644 --- a/src/store-api/src/manifest.rs +++ b/src/store-api/src/manifest.rs @@ -16,6 +16,8 @@ pub mod action; mod storage; +use std::sync::atomic::AtomicU64; + use async_trait::async_trait; use common_error::ext::ErrorExt; use serde::de::DeserializeOwned; @@ -25,6 +27,7 @@ use crate::manifest::action::{ProtocolAction, ProtocolVersion}; pub use crate::manifest::storage::*; pub type ManifestVersion = u64; +pub type AtomicManifestVersion = AtomicU64; pub const MIN_VERSION: u64 = 0; pub const MAX_VERSION: u64 = u64::MAX;