feat(mito): Implement open for RegionManifestManager (#2036)

* feat: file purger trait

* feat: Implement open for RegionManifestManager

* feat: remove RegionVersion

* feat: Use RwLock

* chore: remove AtomicManifestVersion

* feat: Remove unused error

* feat: store meta action

* chore: update comment
This commit is contained in:
Yingwen
2023-07-31 19:04:22 +09:00
committed by GitHub
parent 922d826347
commit 4d5ecb54c5
10 changed files with 294 additions and 155 deletions

View File

@@ -98,12 +98,6 @@ pub enum Error {
location: Location,
},
#[snafu(display(
"Expect initial region metadata on creating/opening a new region, location: {}",
location
))]
InitialMetadata { location: Location },
#[snafu(display("Invalid metadata, {}, location: {}", reason, location))]
InvalidMeta { reason: String, location: Location },
@@ -180,10 +174,9 @@ impl ErrorExt for Error {
| Utf8 { .. }
| RegionExists { .. }
| NewRecordBatch { .. } => StatusCode::Unexpected,
InvalidScanIndex { .. }
| InitialMetadata { .. }
| InvalidMeta { .. }
| InvalidSchema { .. } => StatusCode::InvalidArguments,
InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => {
StatusCode::InvalidArguments
}
RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => {
StatusCode::Internal
}

View File

@@ -14,7 +14,6 @@
use std::collections::HashMap;
use common_telemetry::info;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use storage::metadata::VersionNumber;
@@ -24,7 +23,7 @@ use store_api::manifest::ManifestVersion;
use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu};
use crate::metadata::RegionMetadata;
use crate::metadata::RegionMetadataRef;
/// Actions that can be applied to region manifest.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -42,7 +41,7 @@ pub enum RegionMetaAction {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionChange {
/// The metadata after changed.
pub metadata: RegionMetadata,
pub metadata: RegionMetadataRef,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -59,54 +58,50 @@ pub struct RegionRemove {
pub region_id: RegionId,
}
/// The region manifest data
/// The region manifest data.
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionManifest {
pub metadata: RegionMetadata,
pub version: RegionVersion,
/// Metadata of the region.
pub metadata: RegionMetadataRef,
/// SST files.
pub files: HashMap<FileId, FileMeta>,
/// Current manifest version.
pub manifest_version: ManifestVersion,
}
#[derive(Debug, Default)]
pub struct RegionManifestBuilder {
metadata: Option<RegionMetadata>,
version: Option<RegionVersion>,
metadata: Option<RegionMetadataRef>,
files: HashMap<FileId, FileMeta>,
manifest_version: ManifestVersion,
}
impl RegionManifestBuilder {
/// Start with a checkpoint
/// Start with a checkpoint.
pub fn with_checkpoint(checkpoint: Option<RegionManifest>) -> Self {
if let Some(s) = checkpoint {
Self {
metadata: Some(s.metadata),
version: Some(s.version),
files: s.files,
manifest_version: s.manifest_version,
}
} else {
Default::default()
}
}
pub fn apply_change(&mut self, change: RegionChange) {
pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) {
self.metadata = Some(change.metadata);
self.manifest_version = manifest_version;
}
pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) {
if let Some(version) = &mut self.version {
version.manifest_version = manifest_version;
for file in edit.files_to_add {
let _ = version.files.insert(file.file_id, file);
}
for file in edit.files_to_remove {
let _ = version.files.remove(&file.file_id);
}
} else {
self.version = Some(RegionVersion {
manifest_version,
files: edit
.files_to_add
.into_iter()
.map(|f| (f.file_id, f))
.collect(),
});
self.manifest_version = manifest_version;
for file in edit.files_to_add {
self.files.insert(file.file_id, file);
}
for file in edit.files_to_remove {
self.files.remove(&file.file_id);
}
}
@@ -117,24 +112,14 @@ impl RegionManifestBuilder {
pub fn try_build(self) -> Result<RegionManifest> {
let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?;
let version = self.version.unwrap_or_else(|| {
info!(
"Create new default region version for region {:?}",
metadata.region_id
);
RegionVersion::default()
});
Ok(RegionManifest { metadata, version })
Ok(RegionManifest {
metadata,
files: self.files,
manifest_version: self.manifest_version,
})
}
}
/// The region version checkpoint
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)]
pub struct RegionVersion {
pub manifest_version: ManifestVersion,
pub files: HashMap<FileId, FileMeta>,
}
// The checkpoint of region manifest, generated by checkpointer.
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
pub struct RegionCheckpoint {
@@ -170,22 +155,17 @@ impl RegionCheckpoint {
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct RegionMetaActionList {
pub actions: Vec<RegionMetaAction>,
pub prev_version: ManifestVersion,
}
impl RegionMetaActionList {
pub fn with_action(action: RegionMetaAction) -> Self {
Self {
actions: vec![action],
prev_version: 0,
}
}
pub fn new(actions: Vec<RegionMetaAction>) -> Self {
Self {
actions,
prev_version: 0,
}
Self { actions }
}
}
@@ -195,11 +175,7 @@ impl RegionMetaActionList {
self.actions.insert(0, RegionMetaAction::Protocol(action));
}
pub fn set_prev_version(&mut self, version: ManifestVersion) {
self.prev_version = version;
}
/// Encode self into json in the form of string lines, starts with prev_version and then action json list.
/// Encode self into json in the form of string lines.
pub fn encode(&self) -> Result<Vec<u8>> {
let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?;

View File

@@ -12,22 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::atomic::Ordering;
use std::sync::Arc;
use arc_swap::ArcSwap;
use common_telemetry::{debug, info};
use snafu::OptionExt;
use store_api::manifest::action::{ProtocolAction, ProtocolVersion};
use store_api::manifest::{AtomicManifestVersion, ManifestVersion, MAX_VERSION, MIN_VERSION};
use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION};
use tokio::sync::RwLock;
use crate::error::{InitialMetadataSnafu, Result};
use crate::error::Result;
use crate::manifest::action::{
RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction,
RegionMetaActionIter, RegionMetaActionList,
};
use crate::manifest::options::RegionManifestOptions;
use crate::manifest::storage::ManifestObjectStore;
use crate::metadata::RegionMetadataRef;
// rewrite note:
// trait Checkpoint -> struct RegionCheckpoint
@@ -36,32 +35,63 @@ use crate::manifest::storage::ManifestObjectStore;
/// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted
/// metadata.
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct RegionManifestManager {
inner: Arc<RegionManifestManagerInner>,
inner: RwLock<RegionManifestManagerInner>,
}
impl RegionManifestManager {
/// Construct and recover a region's manifest from storage.
pub async fn new(options: RegionManifestOptions) -> Result<Self> {
let inner = RegionManifestManagerInner::new(options).await?;
/// Construct a region's manifest and persist it.
pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
let inner = RegionManifestManagerInner::new(metadata, options).await?;
Ok(Self {
inner: Arc::new(inner),
inner: RwLock::new(inner),
})
}
/// Open an existing manifest.
pub async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
if let Some(inner) = RegionManifestManagerInner::open(options).await? {
Ok(Some(Self {
inner: RwLock::new(inner),
}))
} else {
Ok(None)
}
}
/// Stop background tasks gracefully.
pub async fn stop(&self) -> Result<()> {
self.inner.stop().await
let mut inner = self.inner.write().await;
inner.stop().await
}
/// Update the manifest. Return the current manifest version number.
pub async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
self.inner.update(action_list).await
let mut inner = self.inner.write().await;
inner.update(action_list).await
}
/// Retrieve the current [RegionManifest].
pub fn manifest(&self) -> Arc<RegionManifest> {
self.inner.manifest.load().clone()
pub async fn manifest(&self) -> Arc<RegionManifest> {
let inner = self.inner.read().await;
inner.manifest.clone()
}
}
#[cfg(test)]
impl RegionManifestManager {
pub(crate) async fn validate_manifest(
&self,
expect: &RegionMetadataRef,
last_version: ManifestVersion,
) {
let manifest = self.manifest().await;
assert_eq!(manifest.metadata, *expect);
let inner = self.inner.read().await;
assert_eq!(inner.manifest.manifest_version, inner.last_version);
assert_eq!(last_version, inner.last_version);
}
}
@@ -69,12 +99,60 @@ impl RegionManifestManager {
struct RegionManifestManagerInner {
store: ManifestObjectStore,
options: RegionManifestOptions,
version: AtomicManifestVersion,
manifest: ArcSwap<RegionManifest>,
last_version: ManifestVersion,
manifest: Arc<RegionManifest>,
}
impl RegionManifestManagerInner {
pub async fn new(mut options: RegionManifestOptions) -> Result<Self> {
/// Creates a new manifest.
async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result<Self> {
// construct storage
let store = ManifestObjectStore::new(
&options.manifest_dir,
options.object_store.clone(),
options.compress_type,
);
info!(
"Creating region manifest in {} with metadata {:?}",
options.manifest_dir, metadata
);
let version = MIN_VERSION;
let mut manifest_builder = RegionManifestBuilder::default();
// set the initial metadata.
manifest_builder.apply_change(
version,
RegionChange {
metadata: metadata.clone(),
},
);
let manifest = manifest_builder.try_build()?;
debug!(
"Build region manifest in {}, manifest: {:?}",
options.manifest_dir, manifest
);
// Persist region change.
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata }));
store.save(version, &action_list.encode()?).await?;
// todo: start gc task
Ok(Self {
store,
options,
last_version: version,
manifest: Arc::new(manifest),
})
}
/// Open an existing manifest.
///
/// Returns `Ok(None)` if no such manifest.
async fn open(options: RegionManifestOptions) -> Result<Option<Self>> {
// construct storage
let store = ManifestObjectStore::new(
&options.manifest_dir,
@@ -91,13 +169,16 @@ impl RegionManifestManagerInner {
.transpose()?;
let mut manifest_builder = if let Some(checkpoint) = checkpoint {
info!(
"Recover region manifest from checkpoint version {}",
checkpoint.last_version
"Recover region manifest {} from checkpoint version {}",
options.manifest_dir, checkpoint.last_version
);
version = version.max(checkpoint.last_version + 1);
RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint)
} else {
info!("Checkpoint not found, build manifest from scratch");
info!(
"Checkpoint not found in {}, build manifest from scratch",
options.manifest_dir
);
RegionManifestBuilder::default()
};
@@ -108,13 +189,16 @@ impl RegionManifestManagerInner {
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
manifest_builder.apply_change(action);
manifest_builder.apply_change(manifest_version, action);
}
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(manifest_version, action);
}
RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => {
debug!("Unhandled action: {:?}", action);
debug!(
"Unhandled action in {}, action: {:?}",
options.manifest_dir, action
);
}
}
}
@@ -122,63 +206,66 @@ impl RegionManifestManagerInner {
// set the initial metadata if necessary
if !manifest_builder.contains_metadata() {
let metadata = options
.initial_metadata
.take()
.context(InitialMetadataSnafu)?;
info!("Creating region manifest with metadata {:?}", metadata);
manifest_builder.apply_change(RegionChange { metadata });
debug!("No region manifest in {}", options.manifest_dir);
return Ok(None);
}
let manifest = manifest_builder.try_build()?;
debug!("Recovered region manifest: {:?}", manifest);
let version = manifest.version.manifest_version;
debug!(
"Recovered region manifest from {}, manifest: {:?}",
options.manifest_dir, manifest
);
let version = manifest.manifest_version;
// todo: start gc task
Ok(Self {
Ok(Some(Self {
store,
options,
version: AtomicManifestVersion::new(version),
manifest: ArcSwap::new(Arc::new(manifest)),
})
last_version: version,
manifest: Arc::new(manifest),
}))
}
pub async fn stop(&self) -> Result<()> {
async fn stop(&mut self) -> Result<()> {
// todo: stop gc task
Ok(())
}
pub async fn update(&self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let version = self.inc_version();
async fn update(&mut self, action_list: RegionMetaActionList) -> Result<ManifestVersion> {
let version = self.increase_version();
self.store.save(version, &action_list.encode()?).await?;
let mut manifest_builder =
RegionManifestBuilder::with_checkpoint(Some(self.manifest.load().as_ref().clone()));
RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone()));
for action in action_list.actions {
match action {
RegionMetaAction::Change(action) => {
manifest_builder.apply_change(action);
manifest_builder.apply_change(version, action);
}
RegionMetaAction::Edit(action) => {
manifest_builder.apply_edit(version, action);
}
RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => {
debug!("Unhandled action: {:?}", action);
debug!(
"Unhandled action for region {}, action: {:?}",
self.manifest.metadata.region_id, action
);
}
}
}
let new_manifest = manifest_builder.try_build()?;
self.manifest.store(Arc::new(new_manifest));
self.manifest = Arc::new(new_manifest);
Ok(version)
}
}
impl RegionManifestManagerInner {
fn inc_version(&self) -> ManifestVersion {
self.version.fetch_add(1, Ordering::Relaxed)
/// Increases last version and returns the increased version.
fn increase_version(&mut self) -> ManifestVersion {
self.last_version += 1;
self.last_version
}
// pub (crate) fn checkpointer(&self) -> Checkpointer {
@@ -254,7 +341,6 @@ mod test {
use store_api::storage::RegionId;
use super::*;
use crate::error::Error;
use crate::manifest::action::RegionChange;
use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType};
use crate::test_util::TestEnv;
@@ -288,58 +374,83 @@ mod test {
builder.build().unwrap()
}
#[tokio::test]
async fn create_region_without_initial_metadata() {
let env = TestEnv::new("");
let result = env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
.await;
assert!(matches!(
result.err().unwrap(),
Error::InitialMetadata { .. }
))
}
#[tokio::test]
async fn create_manifest_manager() {
let metadata = basic_region_metadata();
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new("");
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
.unwrap()
.unwrap();
let manifest = manager.manifest();
assert_eq!(manifest.metadata, metadata);
manager.validate_manifest(&metadata, 0).await;
}
#[tokio::test]
async fn open_manifest_manager() {
let env = TestEnv::new("");
// Try to opens an empty manifest.
assert!(env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
.await
.unwrap()
.is_none());
// Creates a manifest.
let metadata = Arc::new(basic_region_metadata());
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
.unwrap()
.unwrap();
// Stops it.
manager.stop().await.unwrap();
// Open it.
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
.await
.unwrap()
.unwrap();
manager.validate_manifest(&metadata, 0).await;
}
#[tokio::test]
async fn region_change_add_column() {
let metadata = basic_region_metadata();
let metadata = Arc::new(basic_region_metadata());
let env = TestEnv::new("");
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone()))
.await
.unwrap()
.unwrap();
let mut new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1);
let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone(), 1);
new_metadata_builder.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 252,
});
let new_metadata = new_metadata_builder.build().unwrap();
let new_metadata = Arc::new(new_metadata_builder.build().unwrap());
let mut action_list =
let action_list =
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
}));
action_list.set_prev_version(0);
let prev_version = manager.update(action_list).await.unwrap();
assert_eq!(prev_version, 0);
let current_version = manager.update(action_list).await.unwrap();
assert_eq!(current_version, 1);
manager.validate_manifest(&new_metadata, 1).await;
let manifest = manager.manifest();
assert_eq!(manifest.metadata, new_metadata);
// Reopen the manager.
manager.stop().await.unwrap();
let manager = env
.create_manifest_manager(CompressionType::Uncompressed, 10, None)
.await
.unwrap()
.unwrap();
manager.validate_manifest(&new_metadata, 1).await;
}
}

View File

@@ -17,17 +17,13 @@
use common_datasource::compression::CompressionType;
use object_store::ObjectStore;
use crate::metadata::RegionMetadata;
/// Options for manifest.
#[derive(Debug, Clone)]
pub struct RegionManifestOptions {
/// Directory to store manifest.
pub manifest_dir: String,
pub object_store: ObjectStore,
pub compress_type: CompressionType,
/// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints.
pub checkpoint_interval: u64,
/// Initial [RegionMetadata](crate::metadata::RegionMetadata) of this region.
/// Only need to set when create a new region, otherwise it will be ignored.
// TODO(yingwen): Could we pass RegionMetadataRef?
pub initial_metadata: Option<RegionMetadata>,
}

View File

@@ -60,19 +60,18 @@ impl RegionOpener {
/// Writes region manifest and creates a new region.
pub(crate) async fn create(self, config: &MitoConfig) -> Result<MitoRegion> {
let region_id = self.metadata.region_id;
let metadata = Arc::new(self.metadata);
// Create a manifest manager for this region.
let options = RegionManifestOptions {
manifest_dir: new_manifest_dir(&self.region_dir),
object_store: self.object_store,
compress_type: config.manifest_compress_type,
checkpoint_interval: config.manifest_checkpoint_interval,
// We are creating a new region, so we need to set this field.
initial_metadata: Some(self.metadata.clone()),
};
// Writes regions to the manifest file.
let manifest_manager = RegionManifestManager::new(options).await?;
let manifest_manager = RegionManifestManager::new(metadata.clone(), options).await?;
let metadata = Arc::new(self.metadata);
let mutable = self.memtable_builder.build(&metadata);
let version = VersionBuilder::new(metadata, mutable).build();

View File

@@ -15,6 +15,7 @@
//! Sorted strings tables.
pub mod file;
pub mod file_purger;
pub mod parquet;
mod stream_writer;
pub(crate) mod version;

View File

@@ -16,7 +16,7 @@
use std::fmt;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_time::Timestamp;
@@ -26,6 +26,8 @@ use snafu::{ResultExt, Snafu};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
/// Type to store SST level.
pub type Level = u8;
/// Maximum level of SSTs.
@@ -99,8 +101,8 @@ pub struct FileHandle {
impl fmt::Debug for FileHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileHandle")
.field("file_id", &self.inner.meta.file_id)
.field("region_id", &self.inner.meta.region_id)
.field("file_id", &self.inner.meta.file_id)
.field("time_range", &self.inner.meta.time_range)
.field("size", &self.inner.meta.file_size)
.field("level", &self.inner.meta.level)
@@ -129,6 +131,18 @@ struct FileHandleInner {
meta: FileMeta,
compacting: AtomicBool,
deleted: AtomicBool,
file_purger: FilePurgerRef,
}
impl Drop for FileHandleInner {
fn drop(&mut self) {
if self.deleted.load(Ordering::Relaxed) {
self.file_purger.send_request(PurgeRequest {
region_id: self.meta.region_id,
file_id: self.meta.file_id,
});
}
}
}
#[cfg(test)]

View File

@@ -0,0 +1,45 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use store_api::storage::RegionId;
use crate::sst::file::FileId;
/// Request to remove a file.
#[derive(Debug)]
pub struct PurgeRequest {
/// Region id of the file.
pub region_id: RegionId,
/// Id of the file.
pub file_id: FileId,
}
/// A worker to delete files in background.
pub trait FilePurger: Send + Sync {
/// Send a purge request to the background worker.
fn send_request(&self, request: PurgeRequest);
}
pub type FilePurgerRef = Arc<dyn FilePurger>;
// TODO(yingwen): Remove this once we implement the real purger.
/// A purger that does nothing.
#[derive(Debug)]
struct NoopPurger {}
impl FilePurger for NoopPurger {
fn send_request(&self, _request: PurgeRequest) {}
}

View File

@@ -32,7 +32,7 @@ use crate::engine::MitoEngine;
use crate::error::Result;
use crate::manifest::manager::RegionManifestManager;
use crate::manifest::options::RegionManifestOptions;
use crate::metadata::{ColumnMetadata, RegionMetadata, SemanticType};
use crate::metadata::{ColumnMetadata, RegionMetadataRef, SemanticType};
use crate::worker::request::{CreateRequest, RegionOptions};
use crate::worker::WorkerGroup;
@@ -77,17 +77,19 @@ impl TestEnv {
(log_store, object_store)
}
/// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata`
/// is `None`, opens an existing manifest and returns `None` if no such manifest.
pub async fn create_manifest_manager(
&self,
compress_type: CompressionType,
checkpoint_interval: u64,
initial_metadata: Option<RegionMetadata>,
) -> Result<RegionManifestManager> {
initial_metadata: Option<RegionMetadataRef>,
) -> Result<Option<RegionManifestManager>> {
let data_home = self.data_home.path().to_str().unwrap();
let manifest_dir = join_dir(data_home, "manifest");
let mut builder = Fs::default();
let _ = builder.root(&manifest_dir);
builder.root(&manifest_dir);
let object_store = ObjectStore::new(builder).unwrap().finish();
let manifest_opts = RegionManifestOptions {
@@ -95,10 +97,15 @@ impl TestEnv {
object_store,
compress_type,
checkpoint_interval,
initial_metadata,
};
RegionManifestManager::new(manifest_opts).await
if let Some(metadata) = initial_metadata {
RegionManifestManager::new(metadata, manifest_opts)
.await
.map(Some)
} else {
RegionManifestManager::open(manifest_opts).await
}
}
}

View File

@@ -16,8 +16,6 @@
pub mod action;
mod storage;
use std::sync::atomic::AtomicU64;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
use serde::de::DeserializeOwned;
@@ -27,7 +25,6 @@ use crate::manifest::action::{ProtocolAction, ProtocolVersion};
pub use crate::manifest::storage::*;
pub type ManifestVersion = u64;
pub type AtomicManifestVersion = AtomicU64;
pub const MIN_VERSION: u64 = 0;
pub const MAX_VERSION: u64 = u64::MAX;