refactor(mito2): implement RegionManifestManager (#1984)

* finilise manager and related API

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* impl manifest initialize and update

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* more test and utils

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* resolve CR comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2023-07-18 22:03:35 +08:00
committed by GitHub
parent 37dad206f4
commit 8bea853954
10 changed files with 544 additions and 233 deletions

View File

@@ -95,6 +95,12 @@ pub enum Error {
source: tokio::sync::oneshot::error::RecvError,
location: Location,
},
#[snafu(display(
"Expect initial region metadata on creating/opening a new region, location: {}",
location
))]
InitialMetadata { location: Location },
}
pub type Result<T> = std::result::Result<T, Error>;
@@ -108,7 +114,7 @@ impl ErrorExt for Error {
CompressObject { .. } | DecompressObject { .. } | SerdeJson { .. } | Utf8 { .. } => {
StatusCode::Unexpected
}
InvalidScanIndex { .. } => StatusCode::InvalidArguments,
InvalidScanIndex { .. } | InitialMetadata { .. } => StatusCode::InvalidArguments,
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}

View File

@@ -14,9 +14,10 @@
//! manifest storage
mod action;
mod gc_task;
mod helper;
pub mod action;
pub mod gc_task;
pub mod helper;
#[allow(unused_variables)]
mod impl_;
mod storage;
pub mod manager;
pub mod options;
pub mod storage;

View File

@@ -14,72 +14,71 @@
use std::collections::HashMap;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::OptionExt;
use snafu::{OptionExt, ResultExt};
use storage::metadata::VersionNumber;
use storage::sst::{FileId, FileMeta};
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use store_api::manifest::ManifestVersion;
use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{RegionMetadataNotFoundSnafu, Result};
use crate::manifest::helper;
use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
use crate::metadata::RegionMetadata;
/// Actions that can be applied to region manifest.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum RegionMetaAction {
/// Set the min/max supported protocol version
Protocol(ProtocolAction),
/// Change region's metadata for request like ALTER
Change(RegionChange),
/// Edit region's state for changing options or file list.
Edit(RegionEdit),
/// Remove the region.
Remove(RegionRemove),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionChange {
/// The committed sequence of the region when this change happens. So the
/// data with sequence **greater than** this sequence would use the new
/// metadata.
pub committed_sequence: SequenceNumber,
/// The metadata after changed.
pub metadata: RegionMetadata,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionEdit {
pub region_version: VersionNumber,
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
pub flushed_sequence: Option<SequenceNumber>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionRemove {
pub region_id: RegionId,
}
/// The region manifest data
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionEdit {
pub region_version: VersionNumber,
pub flushed_sequence: Option<SequenceNumber>,
pub files_to_add: Vec<FileMeta>,
pub files_to_remove: Vec<FileMeta>,
pub compaction_time_window: Option<i64>,
}
/// The region version checkpoint
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionVersion {
pub manifest_version: ManifestVersion,
pub flushed_sequence: Option<SequenceNumber>,
pub files: HashMap<FileId, FileMeta>,
}
/// The region manifest data checkpoint
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionManifestData {
pub committed_sequence: SequenceNumber,
pub struct RegionManifest {
pub metadata: RegionMetadata,
pub version: Option<RegionVersion>,
pub version: RegionVersion,
}
#[derive(Debug, Default)]
pub struct RegionManifestDataBuilder {
committed_sequence: SequenceNumber,
pub struct RegionManifestBuilder {
metadata: Option<RegionMetadata>,
version: Option<RegionVersion>,
}
impl RegionManifestDataBuilder {
pub fn with_checkpoint(checkpoint: Option<RegionManifestData>) -> Self {
impl RegionManifestBuilder {
/// Start with a checkpoint
pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
if let Some(s) = checkpoint {
Self {
metadata: Some(s.metadata),
version: s.version,
committed_sequence: s.committed_sequence,
version: Some(s.version),
}
} else {
Default::default()
@@ -88,13 +87,11 @@ impl RegionManifestDataBuilder {
pub fn apply_change(&mut self, change: RegionChange) {
self.metadata = Some(change.metadata);
self.committed_sequence = change.committed_sequence;
}
pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
if let Some(version) = &mut self.version {
version.manifest_version = manifest_version;
version.flushed_sequence = edit.flushed_sequence;
for file in edit.files_to_add {
let _ = version.files.insert(file.file_id, file);
}
@@ -104,7 +101,6 @@ impl RegionManifestDataBuilder {
} else {
self.version = Some(RegionVersion {
manifest_version,
flushed_sequence: edit.flushed_sequence,
files: edit
.files_to_add
.into_iter()
@@ -114,16 +110,32 @@ impl RegionManifestDataBuilder {
}
}
pub fn try_build(self) -> Result<RegionManifestData> {
Ok(RegionManifestData {
metadata: self.metadata.context(RegionMetadataNotFoundSnafu)?,
version: self.version,
committed_sequence: self.committed_sequence,
})
/// Check if the builder keeps a [RegionMetadata]
pub fn contains_metadata(&self) -> bool {
self.metadata.is_some()
}
pub fn try_build(self) -> Result<RegionManifest> {
let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
let version = self.version.unwrap_or_else(|| {
info!(
"Create new default region version for region {:?}",
metadata.region_id
);
RegionVersion::default()
});
Ok(RegionManifest { metadata, version })
}
}
// The checkpoint of region manifest, generated by checkpoint.
/// The region version checkpoint
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
pub struct RegionVersion {
pub manifest_version: ManifestVersion,
pub files: HashMap<FileId, FileMeta>,
}
// The checkpoint of region manifest, generated by checkpointer.
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct RegionCheckpoint {
/// The snasphot protocol
@@ -133,35 +145,28 @@ pub struct RegionCheckpoint {
// The number of manifest actions that this checkpoint compacts.
pub compacted_actions: usize,
// The checkpoint data
pub checkpoint: Option<RegionManifestData>,
pub checkpoint: Option<RegionManifest>,
}
impl RegionCheckpoint {
fn set_protocol(&mut self, action: ProtocolAction) {
pub fn set_protocol(&mut self, action: ProtocolAction) {
self.protocol = action;
}
fn last_version(&self) -> ManifestVersion {
pub fn last_version(&self) -> ManifestVersion {
self.last_version
}
fn encode(&self) -> Result<Vec<u8>> {
pub fn encode(&self) -> Result<Vec<u8>> {
todo!()
}
fn decode(bs: &[u8], reader_version: ProtocolVersion) -> Result<Self> {
helper::decode_checkpoint(bs, reader_version)
pub fn decode(bs: &[u8]) -> Result<Self> {
// helper::decode_checkpoint(bs, reader_version)
todo!()
}
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub enum RegionMetaAction {
Protocol(ProtocolAction),
Change(RegionChange),
Remove(RegionRemove),
Edit(RegionEdit),
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionMetaActionList {
pub actions: Vec<RegionMetaAction>,
@@ -190,30 +195,31 @@ impl RegionMetaActionList {
self.actions.insert(0, RegionMetaAction::Protocol(action));
}
fn set_prev_version(&mut self, version: ManifestVersion) {
pub fn set_prev_version(&mut self, version: ManifestVersion) {
self.prev_version = version;
}
/// Encode self into json in the form of string lines, starts with prev_version and then action json list.
fn encode(&self) -> Result<Vec<u8>> {
helper::encode_actions(self.prev_version, &self.actions)
pub fn encode(&self) -> Result<Vec<u8>> {
let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;
Ok(json.into_bytes())
}
fn decode(
_bs: &[u8],
_reader_version: ProtocolVersion,
) -> Result<(Self, Option<ProtocolAction>)> {
todo!()
pub fn decode(bytes: &[u8]) -> Result<Self> {
let data = std::str::from_utf8(bytes).context(Utf8Snafu)?;
serde_json::from_str(data).context(SerdeJsonSnafu)
}
}
pub struct MetaActionIteratorImpl {
pub struct RegionMetaActionIter {
// log_iter: ObjectStoreLogIterator,
reader_version: ProtocolVersion,
last_protocol: Option<ProtocolAction>,
}
impl MetaActionIteratorImpl {
impl RegionMetaActionIter {
pub fn last_protocol(&self) -> Option<ProtocolAction> {
self.last_protocol.clone()
}

View File

@@ -1,121 +0,0 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_datasource::compression::CompressionType;
use object_store::ObjectStore;
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use store_api::manifest::ManifestVersion;
use crate::manifest::action::{MetaActionIteratorImpl, RegionCheckpoint, RegionMetaActionList};
type Result<T> = std::result::Result<T, ()>;
// rewrite note:
// trait Checkpoint -> struct RegionCheckpoint
// trait MetaAction -> struct RegionMetaActionList
// trait MetaActionIterator -> struct MetaActionIteratorImpl
#[derive(Clone, Debug)]
pub struct Regionmanifest {}
impl Regionmanifest {
// from impl ManifestImpl
pub fn new() -> Self {
todo!()
}
pub fn create(
_manifest_dir: &str,
_object_store: ObjectStore,
_compress_type: CompressionType,
) -> Self {
todo!()
}
// pub (crate) fn checkpointer(&self) -> Checkpointer {
// todo!()
// }
pub(crate) fn set_last_checkpoint_version(&self, _version: ManifestVersion) {
todo!()
}
/// Update inner state.
pub fn update_state(&self, _version: ManifestVersion, _protocol: Option<ProtocolAction>) {
todo!()
}
pub(crate) async fn save_checkpoint(&self, checkpoint: &RegionCheckpoint) -> Result<()> {
todo!()
}
pub(crate) async fn may_do_checkpoint(&self, version: ManifestVersion) -> Result<()> {
todo!()
}
// pub(crate) fn manifest_store(&self) -> &Arc<ManifestObjectStore> {
// todo!()
// }
// from Manifest
async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
todo!()
}
async fn scan(
&self,
start: ManifestVersion,
end: ManifestVersion,
) -> Result<MetaActionIteratorImpl> {
todo!()
}
async fn do_checkpoint(&self) -> Result<Option<RegionCheckpoint>> {
todo!()
}
async fn last_checkpoint(&self) -> Result<Option<RegionCheckpoint>> {
todo!()
}
async fn start(&self) -> Result<()> {
todo!()
}
async fn stop(&self) -> Result<()> {
todo!()
}
// from Checkpoint
/// Set a protocol action into checkpoint
pub fn set_protocol(&mut self, _action: ProtocolAction) {
todo!()
}
/// The last compacted action's version of checkpoint
pub fn last_version(&self) -> ManifestVersion {
todo!()
}
/// Encode this checkpoint into a byte vector
pub fn encode(&self) -> Result<Vec<u8>> {
todo!()
}
pub fn decode(_bytes: &[u8], _reader_version: ProtocolVersion) -> Result<Self> {
todo!()
}
}

View File

@@ -0,0 +1,340 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::Ordering;
use std::sync::Arc;
use arc_swap::ArcSwap;
use common_telemetry::{debug, info};
use snafu::OptionExt;
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use store_api::manifest::{AtomicManifestVersion, ManifestVersion, MAX_VERSION, MIN_VERSION};
use crate::error::{InitialMetadataSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
RegionMetaActionIter, RegionMetaActionList,
};
use crate::manifest::options::RegionManifestOptions;
use crate::manifest::storage::ManifestObjectStore;
// rewrite note:
// trait Checkpoint -> struct RegionCheckpoint
// trait MetaAction -> struct RegionMetaActionList
// trait MetaActionIterator -> struct MetaActionIteratorImpl
/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
/// metadata.
#[derive(Clone, Debug)]
pub struct RegionManifestManager {
inner: Arc<RegionManifestManagerInner>,
}
impl RegionManifestManager {
/// Construct and recover a region's manifest from storage.
pub async fn new(options: RegionManifestOptions) -> Result<Self> {
let inner = RegionManifestManagerInner::new(options).await?;
Ok(Self {
inner: Arc::new(inner),
})
}
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
}
/// Update the manifest. Return the current manifest version number.
pub async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
self.inner.update(action_list).await
}
/// Retrieve the current [RegionManifest].
pub fn manifest(&self) -> Arc<RegionManifest> {
self.inner.manifest.load().clone()
}
}
#[derive(Debug)]
struct RegionManifestManagerInner {
store: ManifestObjectStore,
options: RegionManifestOptions,
version: AtomicManifestVersion,
manifest: ArcSwap<RegionManifest>,
}
impl RegionManifestManagerInner {
pub async fn new(mut options: RegionManifestOptions) -> Result<Self> {
// construct storage
let store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
);
// recover from storage
// construct manifest builder
let mut version = MIN_VERSION;
let last_checkpoint = store.load_last_checkpoint().await?;
let checkpoint = last_checkpoint
.map(|(_, raw_checkpoint)| RegionCheckpoint::decode(&raw_checkpoint))
.transpose()?;
let mut manifest_builder = if let Some(checkpoint) = checkpoint {
info!(
"Recover region manifest from checkpoint version {}",
checkpoint.last_version
);
version = version.max(checkpoint.last_version + 1);
RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
} else {
info!("Checkpoint not found, build manifest from scratch");
RegionManifestBuilder::default()
};
// apply actions from storage
let mut action_iter = store.scan(version, MAX_VERSION).await?;
while let Some((manifest_version, raw_action_list)) = action_iter.next_log().await? {
let action_list = RegionMetaActionList::decode(&raw_action_list)?;
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
manifest_builder.apply_change(action);
}
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(manifest_version, action);
}
RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => {
debug!("Unhandled action: {:?}", action);
}
}
}
}
// set the initial metadata if necessary
if !manifest_builder.contains_metadata() {
let metadata = options
.initial_metadata
.take()
.context(InitialMetadataSnafu)?;
info!("Creating region manifest with metadata {:?}", metadata);
manifest_builder.apply_change(RegionChange { metadata });
}
let manifest = manifest_builder.try_build()?;
debug!("Recovered region manifest: {:?}", manifest);
let version = manifest.version.manifest_version;
// todo: start gc task
Ok(Self {
store,
options,
version: AtomicManifestVersion::new(version),
manifest: ArcSwap::new(Arc::new(manifest)),
})
}
pub async fn stop(&self) -> Result<()> {
// todo: stop gc task
Ok(())
}
pub async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let version = self.inc_version();
self.store.save(version, &action_list.encode()?).await?;
let mut manifest_builder =
RegionManifestBuilder::with_checkpoint(Some(self.manifest.load().as_ref().clone()));
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
manifest_builder.apply_change(action);
}
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(version, action);
}
RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => {
debug!("Unhandled action: {:?}", action);
}
}
}
let new_manifest = manifest_builder.try_build()?;
self.manifest.store(Arc::new(new_manifest));
Ok(version)
}
}
impl RegionManifestManagerInner {
fn inc_version(&self) -> ManifestVersion {
self.version.fetch_add(1, Ordering::Relaxed)
}
// pub (crate) fn checkpointer(&self) -> Checkpointer {
// todo!()
// }
pub(crate) fn set_last_checkpoint_version(&self, _version: ManifestVersion) {
todo!()
}
/// Update inner state.
pub fn update_state(&self, _version: ManifestVersion, _protocol: Option<ProtocolAction>) {
todo!()
}
pub(crate) async fn save_checkpoint(&self, checkpoint: &RegionCheckpoint) -> Result<()> {
todo!()
}
pub(crate) async fn may_do_checkpoint(&self, version: ManifestVersion) -> Result<()> {
todo!()
}
// pub(crate) fn manifest_store(&self) -> &Arc<ManifestObjectStore> {
// todo!()
// }
// from Manifest
async fn scan(
&self,
start: ManifestVersion,
end: ManifestVersion,
) -> Result<RegionMetaActionIter> {
todo!()
}
async fn do_checkpoint(&self) -> Result<Option<RegionCheckpoint>> {
todo!()
}
async fn last_checkpoint(&self) -> Result<Option<RegionCheckpoint>> {
todo!()
}
// from Checkpoint
/// Set a protocol action into checkpoint
pub fn set_protocol(&mut self, _action: ProtocolAction) {
todo!()
}
/// The last compacted action's version of checkpoint
pub fn last_version(&self) -> ManifestVersion {
todo!()
}
/// Encode this checkpoint into a byte vector
pub fn encode(&self) -> Result<Vec<u8>> {
todo!()
}
pub fn decode(_bytes: &[u8], _reader_version: ProtocolVersion) -> Result<Self> {
todo!()
}
}
#[cfg(test)]
mod test {
use common_datasource::compression::CompressionType;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::manifest::action::RegionChange;
use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType};
use crate::test_util::TestEnv;
fn basic_region_metadata() -> RegionMetadata {
let builder = RegionMetadataBuilder::new(RegionId::new(23, 33), 0);
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"ts",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 45,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("pk", ConcreteDataType::string_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 36,
});
let builder = builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 251,
});
builder.build()
}
#[tokio::test]
async fn create_region_without_initial_metadata() {
let env = TestEnv::new("");
let result = env
.create_manifest_manager(CompressionType::Uncompressed, None, None)
.await;
assert!(matches!(
result.err().unwrap(),
Error::InitialMetadata { .. }
))
}
#[tokio::test]
async fn create_manifest_manager() {
let metadata = basic_region_metadata();
let env = TestEnv::new("");
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, None, Some(metadata.clone()))
.await
.unwrap();
let manifest = manager.manifest();
assert_eq!(manifest.metadata, metadata);
}
#[tokio::test]
async fn region_change_add_column() {
let metadata = basic_region_metadata();
let env = TestEnv::new("");
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, None, Some(metadata.clone()))
.await
.unwrap();
let new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1);
let new_metadata_builder = new_metadata_builder.add_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 252,
});
let new_metadata = new_metadata_builder.build();
let mut action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
}));
action_list.set_prev_version(0);
let prev_version = manager.update(action_list).await.unwrap();
assert_eq!(prev_version, 0);
let manifest = manager.manifest();
assert_eq!(manifest.metadata, new_metadata);
}
}

View File

@@ -0,0 +1,33 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Options for [RegionManifestManager](crate::manifest::manager::RegionManifestManager).
use common_datasource::compression::CompressionType;
use object_store::ObjectStore;
use crate::metadata::RegionMetadata;
#[derive(Debug, Clone)]
pub struct RegionManifestOptions {
pub manifest_dir: String,
pub object_store: ObjectStore,
pub compress_type: CompressionType,
/// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints
/// `None` means disable checkpoint.
pub checkpoint_interval: Option<u64>,
/// Initial [RegionMetadata](crate::metadata::RegionMetadata) of this region.
/// Only need to set when create a new region, otherwise it will be ignored.
pub initial_metadata: Option<RegionMetadata>,
}

View File

@@ -106,7 +106,7 @@ pub struct ObjectStoreLogIterator {
}
impl ObjectStoreLogIterator {
async fn next_log(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
pub async fn next_log(&mut self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
match self.iter.next() {
Some((v, entry)) => {
let compress_type = file_compress_type(entry.name());
@@ -182,31 +182,8 @@ impl ManifestObjectStore {
pub(crate) fn path(&self) -> &str {
&self.path
}
}
#[derive(Serialize, Deserialize, Debug)]
struct CheckpointMetadata {
pub size: usize,
/// The latest version this checkpoint contains.
pub version: ManifestVersion,
pub checksum: Option<String>,
pub extend_metadata: Option<HashMap<String, String>>,
}
impl CheckpointMetadata {
fn encode(&self) -> Result<impl AsRef<[u8]>> {
serde_json::to_string(self).context(SerdeJsonSnafu)
}
fn decode(bs: &[u8]) -> Result<Self> {
let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
serde_json::from_str(data).context(SerdeJsonSnafu)
}
}
impl ManifestObjectStore {
async fn scan(
pub async fn scan(
&self,
start: ManifestVersion,
end: ManifestVersion,
@@ -346,7 +323,7 @@ impl ManifestObjectStore {
Ok(())
}
async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
pub async fn save(&self, version: ManifestVersion, bytes: &[u8]) -> Result<()> {
let path = self.delta_file_path(version);
logging::debug!("Save log to manifest storage, version: {}", version);
let data = self
@@ -416,7 +393,7 @@ impl ManifestObjectStore {
size: bytes.len(),
version,
checksum: None,
extend_metadata: None,
extend_metadata: HashMap::new(),
};
logging::debug!(
@@ -425,9 +402,9 @@ impl ManifestObjectStore {
checkpoint_metadata
);
let bs = checkpoint_metadata.encode()?;
let bytes = checkpoint_metadata.encode()?;
self.object_store
.write(&last_checkpoint_path, bs.as_ref().to_vec())
.write(&last_checkpoint_path, bytes)
.await
.context(OpenDalSnafu)?;
@@ -513,7 +490,9 @@ impl ManifestObjectStore {
Ok(())
}
async fn load_last_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
/// Load the latest checkpoint.
/// Return manifest version and the raw [RegionCheckpoint](crate::manifest::action::RegionCheckpoint) content if any
pub async fn load_last_checkpoint(&self) -> Result<Option<(ManifestVersion, Vec<u8>)>> {
let last_checkpoint_path = self.last_checkpoint_path();
let last_checkpoint_data = match self.object_store.read(&last_checkpoint_path).await {
Ok(data) => data,
@@ -537,6 +516,29 @@ impl ManifestObjectStore {
}
}
#[derive(Serialize, Deserialize, Debug)]
struct CheckpointMetadata {
pub size: usize,
/// The latest version this checkpoint contains.
pub version: ManifestVersion,
pub checksum: Option<String>,
pub extend_metadata: HashMap<String, String>,
}
impl CheckpointMetadata {
fn encode(&self) -> Result<Vec<u8>> {
Ok(serde_json::to_string(self)
.context(SerdeJsonSnafu)?
.into_bytes())
}
fn decode(bs: &[u8]) -> Result<Self> {
let data = std::str::from_utf8(bs).context(Utf8Snafu)?;
serde_json::from_str(data).context(SerdeJsonSnafu)
}
}
#[cfg(test)]
mod tests {
use common_test_util::temp_dir::create_temp_dir;

View File

@@ -26,6 +26,7 @@ use crate::region::VersionNumber;
/// Static metadata of a region.
///
/// This struct implements [Serialize] and [Deserialize] traits.
/// To build a [RegionMetadata] object, use [RegionMetadataBuilder].
///
/// ```mermaid
/// class RegionMetadata {
@@ -50,17 +51,17 @@ use crate::region::VersionNumber;
pub struct RegionMetadata {
/// Latest schema of this region
#[serde(skip)]
schema: SchemaRef,
pub schema: SchemaRef,
/// Columns in the region. Has the same order as columns
/// in [schema](RegionMetadata::schema).
column_metadatas: Vec<ColumnMetadata>,
pub column_metadatas: Vec<ColumnMetadata>,
/// Version of metadata.
version: VersionNumber,
pub version: VersionNumber,
/// Maintains an ordered list of primary keys
primary_key: Vec<ColumnId>,
pub primary_key: Vec<ColumnId>,
/// Immutable and unique id of a region.
region_id: RegionId,
pub region_id: RegionId,
}
pub type RegionMetadataRef = Arc<RegionMetadata>;
@@ -118,6 +119,17 @@ impl RegionMetadataBuilder {
}
}
/// Create a builder from existing [RegionMetadata].
pub fn from_existing(existing: RegionMetadata, new_version: VersionNumber) -> Self {
Self {
schema: existing.schema,
column_metadatas: existing.column_metadatas,
version: new_version,
primary_key: existing.primary_key,
region_id: existing.region_id,
}
}
/// Add a column metadata to this region metadata.
/// This method will check the semantic type and add it to primary keys automatically.
pub fn add_column_metadata(mut self, column_metadata: ColumnMetadata) -> Self {
@@ -149,11 +161,11 @@ impl RegionMetadataBuilder {
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ColumnMetadata {
/// Schema of this column. Is the same as `column_schema` in [SchemaRef].
column_schema: ColumnSchema,
pub column_schema: ColumnSchema,
/// Semantic type of this column (e.g. tag or timestamp).
semantic_type: SemanticType,
pub semantic_type: SemanticType,
/// Immutable and unique id of a region.
column_id: ColumnId,
pub column_id: ColumnId,
}
/// The semantic type of one column

View File

@@ -16,6 +16,7 @@
use std::sync::Arc;
use common_datasource::compression::CompressionType;
use common_test_util::temp_dir::{create_temp_dir, TempDir};
use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
@@ -25,6 +26,10 @@ use object_store::ObjectStore;
use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::options::RegionManifestOptions;
use crate::metadata::RegionMetadata;
use crate::worker::WorkerGroup;
/// Env to test mito engine.
@@ -67,4 +72,28 @@ impl TestEnv {
(log_store, object_store)
}
pub async fn create_manifest_manager(
&self,
compress_type: CompressionType,
checkpoint_interval: Option<u64>,
initial_metadata: Option<RegionMetadata>,
) -> Result<RegionManifestManager> {
let data_home = self.data_home.path().to_str().unwrap();
let manifest_dir = join_dir(data_home, "manifest");
let mut builder = Fs::default();
let _ = builder.root(&manifest_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let manifest_opts = RegionManifestOptions {
manifest_dir,
object_store,
compress_type,
checkpoint_interval,
initial_metadata,
};
RegionManifestManager::new(manifest_opts).await
}
}

View File

@@ -16,6 +16,8 @@
pub mod action;
mod storage;
use std::sync::atomic::AtomicU64;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use serde::de::DeserializeOwned;
@@ -25,6 +27,7 @@ use crate::manifest::action::{ProtocolAction, ProtocolVersion};
pub use crate::manifest::storage::*;
pub type ManifestVersion = u64;
pub type AtomicManifestVersion = AtomicU64;
pub const MIN_VERSION: u64 = 0;
pub const MAX_VERSION: u64 = u64::MAX;