From 4d5ecb54c5ac7e6a4f2e3c1245e648a009d3ec34 Mon Sep 17 00:00:00 2001 From: Yingwen Date: Mon, 31 Jul 2023 19:04:22 +0900 Subject: [PATCH] feat(mito): Implement open for RegionManifestManager (#2036) * feat: file purger trait * feat: Implement open for RegionManifestManager * feat: remove RegionVersion * feat: Use RwLock * chore: remove AtomicManifestVersion * feat: Remove unused error * feat: store meta action * chore: update comment --- src/mito2/src/error.rs | 13 +- src/mito2/src/manifest/action.rs | 84 ++++------ src/mito2/src/manifest/manager.rs | 251 +++++++++++++++++++++--------- src/mito2/src/manifest/options.rs | 8 +- src/mito2/src/region/opener.rs | 7 +- src/mito2/src/sst.rs | 1 + src/mito2/src/sst/file.rs | 18 ++- src/mito2/src/sst/file_purger.rs | 45 ++++++ src/mito2/src/test_util.rs | 19 ++- src/store-api/src/manifest.rs | 3 - 10 files changed, 294 insertions(+), 155 deletions(-) create mode 100644 src/mito2/src/sst/file_purger.rs diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs index 6c52e3d1e1..b44c5c3020 100644 --- a/src/mito2/src/error.rs +++ b/src/mito2/src/error.rs @@ -98,12 +98,6 @@ pub enum Error { location: Location, }, - #[snafu(display( - "Expect initial region metadata on creating/opening a new region, location: {}", - location - ))] - InitialMetadata { location: Location }, - #[snafu(display("Invalid metadata, {}, location: {}", reason, location))] InvalidMeta { reason: String, location: Location }, @@ -180,10 +174,9 @@ impl ErrorExt for Error { | Utf8 { .. } | RegionExists { .. } | NewRecordBatch { .. } => StatusCode::Unexpected, - InvalidScanIndex { .. } - | InitialMetadata { .. } - | InvalidMeta { .. } - | InvalidSchema { .. } => StatusCode::InvalidArguments, + InvalidScanIndex { .. } | InvalidMeta { .. } | InvalidSchema { .. } => { + StatusCode::InvalidArguments + } RegionMetadataNotFound { .. } | Join { .. } | WorkerStopped { .. } | Recv { .. } => { StatusCode::Internal } diff --git a/src/mito2/src/manifest/action.rs b/src/mito2/src/manifest/action.rs index 0cf5ac4727..a996ec0c2d 100644 --- a/src/mito2/src/manifest/action.rs +++ b/src/mito2/src/manifest/action.rs @@ -14,7 +14,6 @@ use std::collections::HashMap; -use common_telemetry::info; use serde::{Deserialize, Serialize}; use snafu::{OptionExt, ResultExt}; use storage::metadata::VersionNumber; @@ -24,7 +23,7 @@ use store_api::manifest::ManifestVersion; use store_api::storage::{RegionId, SequenceNumber}; use crate::error::{RegionMetadataNotFoundSnafu, Result, SerdeJsonSnafu, Utf8Snafu}; -use crate::metadata::RegionMetadata; +use crate::metadata::RegionMetadataRef; /// Actions that can be applied to region manifest. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -42,7 +41,7 @@ pub enum RegionMetaAction { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionChange { /// The metadata after changed. - pub metadata: RegionMetadata, + pub metadata: RegionMetadataRef, } #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] @@ -59,54 +58,50 @@ pub struct RegionRemove { pub region_id: RegionId, } -/// The region manifest data +/// The region manifest data. #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionManifest { - pub metadata: RegionMetadata, - pub version: RegionVersion, + /// Metadata of the region. + pub metadata: RegionMetadataRef, + /// SST files. + pub files: HashMap, + /// Current manifest version. + pub manifest_version: ManifestVersion, } #[derive(Debug, Default)] pub struct RegionManifestBuilder { - metadata: Option, - version: Option, + metadata: Option, + files: HashMap, + manifest_version: ManifestVersion, } impl RegionManifestBuilder { - /// Start with a checkpoint + /// Start with a checkpoint. pub fn with_checkpoint(checkpoint: Option) -> Self { if let Some(s) = checkpoint { Self { metadata: Some(s.metadata), - version: Some(s.version), + files: s.files, + manifest_version: s.manifest_version, } } else { Default::default() } } - pub fn apply_change(&mut self, change: RegionChange) { + pub fn apply_change(&mut self, manifest_version: ManifestVersion, change: RegionChange) { self.metadata = Some(change.metadata); + self.manifest_version = manifest_version; } pub fn apply_edit(&mut self, manifest_version: ManifestVersion, edit: RegionEdit) { - if let Some(version) = &mut self.version { - version.manifest_version = manifest_version; - for file in edit.files_to_add { - let _ = version.files.insert(file.file_id, file); - } - for file in edit.files_to_remove { - let _ = version.files.remove(&file.file_id); - } - } else { - self.version = Some(RegionVersion { - manifest_version, - files: edit - .files_to_add - .into_iter() - .map(|f| (f.file_id, f)) - .collect(), - }); + self.manifest_version = manifest_version; + for file in edit.files_to_add { + self.files.insert(file.file_id, file); + } + for file in edit.files_to_remove { + self.files.remove(&file.file_id); } } @@ -117,24 +112,14 @@ impl RegionManifestBuilder { pub fn try_build(self) -> Result { let metadata = self.metadata.context(RegionMetadataNotFoundSnafu)?; - let version = self.version.unwrap_or_else(|| { - info!( - "Create new default region version for region {:?}", - metadata.region_id - ); - RegionVersion::default() - }); - Ok(RegionManifest { metadata, version }) + Ok(RegionManifest { + metadata, + files: self.files, + manifest_version: self.manifest_version, + }) } } -/// The region version checkpoint -#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq, Default)] -pub struct RegionVersion { - pub manifest_version: ManifestVersion, - pub files: HashMap, -} - // The checkpoint of region manifest, generated by checkpointer. #[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)] pub struct RegionCheckpoint { @@ -170,22 +155,17 @@ impl RegionCheckpoint { #[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)] pub struct RegionMetaActionList { pub actions: Vec, - pub prev_version: ManifestVersion, } impl RegionMetaActionList { pub fn with_action(action: RegionMetaAction) -> Self { Self { actions: vec![action], - prev_version: 0, } } pub fn new(actions: Vec) -> Self { - Self { - actions, - prev_version: 0, - } + Self { actions } } } @@ -195,11 +175,7 @@ impl RegionMetaActionList { self.actions.insert(0, RegionMetaAction::Protocol(action)); } - pub fn set_prev_version(&mut self, version: ManifestVersion) { - self.prev_version = version; - } - - /// Encode self into json in the form of string lines, starts with prev_version and then action json list. + /// Encode self into json in the form of string lines. pub fn encode(&self) -> Result> { let json = serde_json::to_string(&self).context(SerdeJsonSnafu)?; diff --git a/src/mito2/src/manifest/manager.rs b/src/mito2/src/manifest/manager.rs index 7a82312e98..40d90ed0f4 100644 --- a/src/mito2/src/manifest/manager.rs +++ b/src/mito2/src/manifest/manager.rs @@ -12,22 +12,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::atomic::Ordering; use std::sync::Arc; -use arc_swap::ArcSwap; use common_telemetry::{debug, info}; -use snafu::OptionExt; use store_api::manifest::action::{ProtocolAction, ProtocolVersion}; -use store_api::manifest::{AtomicManifestVersion, ManifestVersion, MAX_VERSION, MIN_VERSION}; +use store_api::manifest::{ManifestVersion, MAX_VERSION, MIN_VERSION}; +use tokio::sync::RwLock; -use crate::error::{InitialMetadataSnafu, Result}; +use crate::error::Result; use crate::manifest::action::{ RegionChange, RegionCheckpoint, RegionManifest, RegionManifestBuilder, RegionMetaAction, RegionMetaActionIter, RegionMetaActionList, }; use crate::manifest::options::RegionManifestOptions; use crate::manifest::storage::ManifestObjectStore; +use crate::metadata::RegionMetadataRef; // rewrite note: // trait Checkpoint -> struct RegionCheckpoint @@ -36,32 +35,63 @@ use crate::manifest::storage::ManifestObjectStore; /// Manage region's manifest. Provide APIs to access (create/modify/recover) region's persisted /// metadata. -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct RegionManifestManager { - inner: Arc, + inner: RwLock, } impl RegionManifestManager { - /// Construct and recover a region's manifest from storage. - pub async fn new(options: RegionManifestOptions) -> Result { - let inner = RegionManifestManagerInner::new(options).await?; + /// Construct a region's manifest and persist it. + pub async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { + let inner = RegionManifestManagerInner::new(metadata, options).await?; Ok(Self { - inner: Arc::new(inner), + inner: RwLock::new(inner), }) } + /// Open an existing manifest. + pub async fn open(options: RegionManifestOptions) -> Result> { + if let Some(inner) = RegionManifestManagerInner::open(options).await? { + Ok(Some(Self { + inner: RwLock::new(inner), + })) + } else { + Ok(None) + } + } + + /// Stop background tasks gracefully. pub async fn stop(&self) -> Result<()> { - self.inner.stop().await + let mut inner = self.inner.write().await; + inner.stop().await } /// Update the manifest. Return the current manifest version number. pub async fn update(&self, action_list: RegionMetaActionList) -> Result { - self.inner.update(action_list).await + let mut inner = self.inner.write().await; + inner.update(action_list).await } /// Retrieve the current [RegionManifest]. - pub fn manifest(&self) -> Arc { - self.inner.manifest.load().clone() + pub async fn manifest(&self) -> Arc { + let inner = self.inner.read().await; + inner.manifest.clone() + } +} + +#[cfg(test)] +impl RegionManifestManager { + pub(crate) async fn validate_manifest( + &self, + expect: &RegionMetadataRef, + last_version: ManifestVersion, + ) { + let manifest = self.manifest().await; + assert_eq!(manifest.metadata, *expect); + + let inner = self.inner.read().await; + assert_eq!(inner.manifest.manifest_version, inner.last_version); + assert_eq!(last_version, inner.last_version); } } @@ -69,12 +99,60 @@ impl RegionManifestManager { struct RegionManifestManagerInner { store: ManifestObjectStore, options: RegionManifestOptions, - version: AtomicManifestVersion, - manifest: ArcSwap, + last_version: ManifestVersion, + manifest: Arc, } impl RegionManifestManagerInner { - pub async fn new(mut options: RegionManifestOptions) -> Result { + /// Creates a new manifest. + async fn new(metadata: RegionMetadataRef, options: RegionManifestOptions) -> Result { + // construct storage + let store = ManifestObjectStore::new( + &options.manifest_dir, + options.object_store.clone(), + options.compress_type, + ); + + info!( + "Creating region manifest in {} with metadata {:?}", + options.manifest_dir, metadata + ); + + let version = MIN_VERSION; + let mut manifest_builder = RegionManifestBuilder::default(); + // set the initial metadata. + manifest_builder.apply_change( + version, + RegionChange { + metadata: metadata.clone(), + }, + ); + let manifest = manifest_builder.try_build()?; + + debug!( + "Build region manifest in {}, manifest: {:?}", + options.manifest_dir, manifest + ); + + // Persist region change. + let action_list = + RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata })); + store.save(version, &action_list.encode()?).await?; + + // todo: start gc task + + Ok(Self { + store, + options, + last_version: version, + manifest: Arc::new(manifest), + }) + } + + /// Open an existing manifest. + /// + /// Returns `Ok(None)` if no such manifest. + async fn open(options: RegionManifestOptions) -> Result> { // construct storage let store = ManifestObjectStore::new( &options.manifest_dir, @@ -91,13 +169,16 @@ impl RegionManifestManagerInner { .transpose()?; let mut manifest_builder = if let Some(checkpoint) = checkpoint { info!( - "Recover region manifest from checkpoint version {}", - checkpoint.last_version + "Recover region manifest {} from checkpoint version {}", + options.manifest_dir, checkpoint.last_version ); version = version.max(checkpoint.last_version + 1); RegionManifestBuilder::with_checkpoint(checkpoint.checkpoint) } else { - info!("Checkpoint not found, build manifest from scratch"); + info!( + "Checkpoint not found in {}, build manifest from scratch", + options.manifest_dir + ); RegionManifestBuilder::default() }; @@ -108,13 +189,16 @@ impl RegionManifestManagerInner { for action in action_list.actions { match action { RegionMetaAction::Change(action) => { - manifest_builder.apply_change(action); + manifest_builder.apply_change(manifest_version, action); } RegionMetaAction::Edit(action) => { manifest_builder.apply_edit(manifest_version, action); } RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => { - debug!("Unhandled action: {:?}", action); + debug!( + "Unhandled action in {}, action: {:?}", + options.manifest_dir, action + ); } } } @@ -122,63 +206,66 @@ impl RegionManifestManagerInner { // set the initial metadata if necessary if !manifest_builder.contains_metadata() { - let metadata = options - .initial_metadata - .take() - .context(InitialMetadataSnafu)?; - info!("Creating region manifest with metadata {:?}", metadata); - manifest_builder.apply_change(RegionChange { metadata }); + debug!("No region manifest in {}", options.manifest_dir); + return Ok(None); } let manifest = manifest_builder.try_build()?; - debug!("Recovered region manifest: {:?}", manifest); - let version = manifest.version.manifest_version; + debug!( + "Recovered region manifest from {}, manifest: {:?}", + options.manifest_dir, manifest + ); + let version = manifest.manifest_version; // todo: start gc task - Ok(Self { + Ok(Some(Self { store, options, - version: AtomicManifestVersion::new(version), - manifest: ArcSwap::new(Arc::new(manifest)), - }) + last_version: version, + manifest: Arc::new(manifest), + })) } - pub async fn stop(&self) -> Result<()> { + async fn stop(&mut self) -> Result<()> { // todo: stop gc task Ok(()) } - pub async fn update(&self, action_list: RegionMetaActionList) -> Result { - let version = self.inc_version(); - + async fn update(&mut self, action_list: RegionMetaActionList) -> Result { + let version = self.increase_version(); self.store.save(version, &action_list.encode()?).await?; let mut manifest_builder = - RegionManifestBuilder::with_checkpoint(Some(self.manifest.load().as_ref().clone())); + RegionManifestBuilder::with_checkpoint(Some(self.manifest.as_ref().clone())); for action in action_list.actions { match action { RegionMetaAction::Change(action) => { - manifest_builder.apply_change(action); + manifest_builder.apply_change(version, action); } RegionMetaAction::Edit(action) => { manifest_builder.apply_edit(version, action); } RegionMetaAction::Remove(_) | RegionMetaAction::Protocol(_) => { - debug!("Unhandled action: {:?}", action); + debug!( + "Unhandled action for region {}, action: {:?}", + self.manifest.metadata.region_id, action + ); } } } let new_manifest = manifest_builder.try_build()?; - self.manifest.store(Arc::new(new_manifest)); + self.manifest = Arc::new(new_manifest); Ok(version) } } impl RegionManifestManagerInner { - fn inc_version(&self) -> ManifestVersion { - self.version.fetch_add(1, Ordering::Relaxed) + /// Increases last version and returns the increased version. + fn increase_version(&mut self) -> ManifestVersion { + self.last_version += 1; + self.last_version } // pub (crate) fn checkpointer(&self) -> Checkpointer { @@ -254,7 +341,6 @@ mod test { use store_api::storage::RegionId; use super::*; - use crate::error::Error; use crate::manifest::action::RegionChange; use crate::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder, SemanticType}; use crate::test_util::TestEnv; @@ -288,58 +374,83 @@ mod test { builder.build().unwrap() } - #[tokio::test] - async fn create_region_without_initial_metadata() { - let env = TestEnv::new(""); - let result = env - .create_manifest_manager(CompressionType::Uncompressed, 10, None) - .await; - assert!(matches!( - result.err().unwrap(), - Error::InitialMetadata { .. } - )) - } - #[tokio::test] async fn create_manifest_manager() { - let metadata = basic_region_metadata(); + let metadata = Arc::new(basic_region_metadata()); let env = TestEnv::new(""); let manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await + .unwrap() .unwrap(); - let manifest = manager.manifest(); - assert_eq!(manifest.metadata, metadata); + manager.validate_manifest(&metadata, 0).await; + } + + #[tokio::test] + async fn open_manifest_manager() { + let env = TestEnv::new(""); + // Try to opens an empty manifest. + assert!(env + .create_manifest_manager(CompressionType::Uncompressed, 10, None) + .await + .unwrap() + .is_none()); + + // Creates a manifest. + let metadata = Arc::new(basic_region_metadata()); + let manager = env + .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) + .await + .unwrap() + .unwrap(); + // Stops it. + manager.stop().await.unwrap(); + + // Open it. + let manager = env + .create_manifest_manager(CompressionType::Uncompressed, 10, None) + .await + .unwrap() + .unwrap(); + + manager.validate_manifest(&metadata, 0).await; } #[tokio::test] async fn region_change_add_column() { - let metadata = basic_region_metadata(); + let metadata = Arc::new(basic_region_metadata()); let env = TestEnv::new(""); let manager = env .create_manifest_manager(CompressionType::Uncompressed, 10, Some(metadata.clone())) .await + .unwrap() .unwrap(); - let mut new_metadata_builder = RegionMetadataBuilder::from_existing(metadata, 1); + let mut new_metadata_builder = RegionMetadataBuilder::from_existing((*metadata).clone(), 1); new_metadata_builder.push_column_metadata(ColumnMetadata { column_schema: ColumnSchema::new("val2", ConcreteDataType::float64_datatype(), false), semantic_type: SemanticType::Field, column_id: 252, }); - let new_metadata = new_metadata_builder.build().unwrap(); + let new_metadata = Arc::new(new_metadata_builder.build().unwrap()); - let mut action_list = + let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange { metadata: new_metadata.clone(), })); - action_list.set_prev_version(0); - let prev_version = manager.update(action_list).await.unwrap(); - assert_eq!(prev_version, 0); + let current_version = manager.update(action_list).await.unwrap(); + assert_eq!(current_version, 1); + manager.validate_manifest(&new_metadata, 1).await; - let manifest = manager.manifest(); - assert_eq!(manifest.metadata, new_metadata); + // Reopen the manager. + manager.stop().await.unwrap(); + let manager = env + .create_manifest_manager(CompressionType::Uncompressed, 10, None) + .await + .unwrap() + .unwrap(); + manager.validate_manifest(&new_metadata, 1).await; } } diff --git a/src/mito2/src/manifest/options.rs b/src/mito2/src/manifest/options.rs index 6f6d64cfd1..d72ea5ff23 100644 --- a/src/mito2/src/manifest/options.rs +++ b/src/mito2/src/manifest/options.rs @@ -17,17 +17,13 @@ use common_datasource::compression::CompressionType; use object_store::ObjectStore; -use crate::metadata::RegionMetadata; - +/// Options for manifest. #[derive(Debug, Clone)] pub struct RegionManifestOptions { + /// Directory to store manifest. pub manifest_dir: String, pub object_store: ObjectStore, pub compress_type: CompressionType, /// Interval of version ([ManifestVersion](store_api::manifest::ManifestVersion)) between two checkpoints. pub checkpoint_interval: u64, - /// Initial [RegionMetadata](crate::metadata::RegionMetadata) of this region. - /// Only need to set when create a new region, otherwise it will be ignored. - // TODO(yingwen): Could we pass RegionMetadataRef? - pub initial_metadata: Option, } diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 2ff78b696b..2b2f8e1f4f 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -60,19 +60,18 @@ impl RegionOpener { /// Writes region manifest and creates a new region. pub(crate) async fn create(self, config: &MitoConfig) -> Result { let region_id = self.metadata.region_id; + let metadata = Arc::new(self.metadata); + // Create a manifest manager for this region. let options = RegionManifestOptions { manifest_dir: new_manifest_dir(&self.region_dir), object_store: self.object_store, compress_type: config.manifest_compress_type, checkpoint_interval: config.manifest_checkpoint_interval, - // We are creating a new region, so we need to set this field. - initial_metadata: Some(self.metadata.clone()), }; // Writes regions to the manifest file. - let manifest_manager = RegionManifestManager::new(options).await?; + let manifest_manager = RegionManifestManager::new(metadata.clone(), options).await?; - let metadata = Arc::new(self.metadata); let mutable = self.memtable_builder.build(&metadata); let version = VersionBuilder::new(metadata, mutable).build(); diff --git a/src/mito2/src/sst.rs b/src/mito2/src/sst.rs index bb31fdc3dd..b12fa962da 100644 --- a/src/mito2/src/sst.rs +++ b/src/mito2/src/sst.rs @@ -15,6 +15,7 @@ //! Sorted strings tables. pub mod file; +pub mod file_purger; pub mod parquet; mod stream_writer; pub(crate) mod version; diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 8ebba3a2f9..8affb3a0d3 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -16,7 +16,7 @@ use std::fmt; use std::str::FromStr; -use std::sync::atomic::AtomicBool; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use common_time::Timestamp; @@ -26,6 +26,8 @@ use snafu::{ResultExt, Snafu}; use store_api::storage::RegionId; use uuid::Uuid; +use crate::sst::file_purger::{FilePurgerRef, PurgeRequest}; + /// Type to store SST level. pub type Level = u8; /// Maximum level of SSTs. @@ -99,8 +101,8 @@ pub struct FileHandle { impl fmt::Debug for FileHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FileHandle") - .field("file_id", &self.inner.meta.file_id) .field("region_id", &self.inner.meta.region_id) + .field("file_id", &self.inner.meta.file_id) .field("time_range", &self.inner.meta.time_range) .field("size", &self.inner.meta.file_size) .field("level", &self.inner.meta.level) @@ -129,6 +131,18 @@ struct FileHandleInner { meta: FileMeta, compacting: AtomicBool, deleted: AtomicBool, + file_purger: FilePurgerRef, +} + +impl Drop for FileHandleInner { + fn drop(&mut self) { + if self.deleted.load(Ordering::Relaxed) { + self.file_purger.send_request(PurgeRequest { + region_id: self.meta.region_id, + file_id: self.meta.file_id, + }); + } + } } #[cfg(test)] diff --git a/src/mito2/src/sst/file_purger.rs b/src/mito2/src/sst/file_purger.rs new file mode 100644 index 0000000000..010fc3ffc2 --- /dev/null +++ b/src/mito2/src/sst/file_purger.rs @@ -0,0 +1,45 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use store_api::storage::RegionId; + +use crate::sst::file::FileId; + +/// Request to remove a file. +#[derive(Debug)] +pub struct PurgeRequest { + /// Region id of the file. + pub region_id: RegionId, + /// Id of the file. + pub file_id: FileId, +} + +/// A worker to delete files in background. +pub trait FilePurger: Send + Sync { + /// Send a purge request to the background worker. + fn send_request(&self, request: PurgeRequest); +} + +pub type FilePurgerRef = Arc; + +// TODO(yingwen): Remove this once we implement the real purger. +/// A purger that does nothing. +#[derive(Debug)] +struct NoopPurger {} + +impl FilePurger for NoopPurger { + fn send_request(&self, _request: PurgeRequest) {} +} diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index ba663bca4d..71244f6a19 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -32,7 +32,7 @@ use crate::engine::MitoEngine; use crate::error::Result; use crate::manifest::manager::RegionManifestManager; use crate::manifest::options::RegionManifestOptions; -use crate::metadata::{ColumnMetadata, RegionMetadata, SemanticType}; +use crate::metadata::{ColumnMetadata, RegionMetadataRef, SemanticType}; use crate::worker::request::{CreateRequest, RegionOptions}; use crate::worker::WorkerGroup; @@ -77,17 +77,19 @@ impl TestEnv { (log_store, object_store) } + /// If `initial_metadata` is `Some`, creates a new manifest. If `initial_metadata` + /// is `None`, opens an existing manifest and returns `None` if no such manifest. pub async fn create_manifest_manager( &self, compress_type: CompressionType, checkpoint_interval: u64, - initial_metadata: Option, - ) -> Result { + initial_metadata: Option, + ) -> Result> { let data_home = self.data_home.path().to_str().unwrap(); let manifest_dir = join_dir(data_home, "manifest"); let mut builder = Fs::default(); - let _ = builder.root(&manifest_dir); + builder.root(&manifest_dir); let object_store = ObjectStore::new(builder).unwrap().finish(); let manifest_opts = RegionManifestOptions { @@ -95,10 +97,15 @@ impl TestEnv { object_store, compress_type, checkpoint_interval, - initial_metadata, }; - RegionManifestManager::new(manifest_opts).await + if let Some(metadata) = initial_metadata { + RegionManifestManager::new(metadata, manifest_opts) + .await + .map(Some) + } else { + RegionManifestManager::open(manifest_opts).await + } } } diff --git a/src/store-api/src/manifest.rs b/src/store-api/src/manifest.rs index b43185ba4b..985339c694 100644 --- a/src/store-api/src/manifest.rs +++ b/src/store-api/src/manifest.rs @@ -16,8 +16,6 @@ pub mod action; mod storage; -use std::sync::atomic::AtomicU64; - use async_trait::async_trait; use common_error::ext::ErrorExt; use serde::de::DeserializeOwned; @@ -27,7 +25,6 @@ use crate::manifest::action::{ProtocolAction, ProtocolVersion}; pub use crate::manifest::storage::*; pub type ManifestVersion = u64; -pub type AtomicManifestVersion = AtomicU64; pub const MIN_VERSION: u64 = 0; pub const MAX_VERSION: u64 = u64::MAX;