diff --git a/Cargo.lock b/Cargo.lock index 72475affe7..eeee32a6bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11209,6 +11209,7 @@ version = "0.13.0" dependencies = [ "mito2", "object-store", + "snafu 0.8.5", "store-api", ] diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index f1a22cf54d..3f92c3db68 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -121,7 +121,7 @@ impl AccessLayer { /// Writes a SST with specific `file_id` and `metadata` to the layer. /// /// Returns the info of the SST. If no data written, returns None. - pub(crate) async fn write_sst( + pub async fn write_sst( &self, request: SstWriteRequest, write_opts: &WriteOptions, @@ -191,26 +191,26 @@ impl AccessLayer { /// `OperationType` represents the origin of the `SstWriteRequest`. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub(crate) enum OperationType { +pub enum OperationType { Flush, Compact, } /// Contents to build a SST. -pub(crate) struct SstWriteRequest { - pub(crate) op_type: OperationType, - pub(crate) metadata: RegionMetadataRef, - pub(crate) source: Source, - pub(crate) cache_manager: CacheManagerRef, +pub struct SstWriteRequest { + pub op_type: OperationType, + pub metadata: RegionMetadataRef, + pub source: Source, + pub cache_manager: CacheManagerRef, #[allow(dead_code)] - pub(crate) storage: Option, - pub(crate) max_sequence: Option, + pub storage: Option, + pub max_sequence: Option, /// Configs for index - pub(crate) index_options: IndexOptions, - pub(crate) inverted_index_config: InvertedIndexConfig, - pub(crate) fulltext_index_config: FulltextIndexConfig, - pub(crate) bloom_filter_index_config: BloomFilterConfig, + pub index_options: IndexOptions, + pub inverted_index_config: InvertedIndexConfig, + pub fulltext_index_config: FulltextIndexConfig, + pub bloom_filter_index_config: BloomFilterConfig, } pub(crate) async fn new_fs_cache_store(root: &str) -> Result { diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs index cd0cb37102..2c64ea38b1 100644 --- a/src/mito2/src/lib.rs +++ b/src/mito2/src/lib.rs @@ -23,8 +23,8 @@ #[cfg_attr(feature = "test", allow(unused))] pub mod test_util; -mod access_layer; -mod cache; +pub mod access_layer; +pub mod cache; pub mod compaction; pub mod config; pub mod engine; diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index 4a5beb0cb5..a2b55bc7b1 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -14,7 +14,7 @@ //! Mito region. -pub(crate) mod opener; +pub mod opener; pub mod options; pub(crate) mod version; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 2992a475ab..49804cc833 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -15,7 +15,7 @@ //! Region opener. use std::collections::HashMap; -use std::sync::atomic::AtomicI64; +use std::sync::atomic::{AtomicI64, AtomicU64}; use std::sync::Arc; use common_telemetry::{debug, error, info, warn}; @@ -27,7 +27,9 @@ use object_store::util::{join_dir, normalize_dir}; use snafu::{ensure, OptionExt, ResultExt}; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; -use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder}; +use store_api::metadata::{ + ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, +}; use store_api::region_engine::RegionRole; use store_api::storage::{ColumnId, RegionId}; @@ -203,11 +205,16 @@ impl RegionOpener { } // Safety: must be set before calling this method. let options = self.options.take().unwrap(); - let object_store = self.object_store(&options.storage)?.clone(); + let object_store = get_object_store(&options.storage, &self.object_store_manager)?.clone(); let provider = self.provider(&options.wal_options); let metadata = Arc::new(metadata); // Create a manifest manager for this region and writes regions to the manifest file. - let region_manifest_options = self.manifest_options(config, &options)?; + let region_manifest_options = Self::manifest_options( + config, + &options, + &self.region_dir, + &self.object_store_manager, + )?; let manifest_manager = RegionManifestManager::new( metadata.clone(), region_manifest_options, @@ -312,7 +319,12 @@ impl RegionOpener { ) -> Result> { let region_options = self.options.as_ref().unwrap().clone(); - let region_manifest_options = self.manifest_options(config, ®ion_options)?; + let region_manifest_options = Self::manifest_options( + config, + ®ion_options, + &self.region_dir, + &self.object_store_manager, + )?; let Some(manifest_manager) = RegionManifestManager::open( region_manifest_options, self.stats.total_manifest_size.clone(), @@ -332,7 +344,7 @@ impl RegionOpener { .take() .unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None)); let on_region_opened = wal.on_region_opened(); - let object_store = self.object_store(®ion_options.storage)?.clone(); + let object_store = get_object_store(®ion_options.storage, &self.object_store_manager)?; debug!("Open region {} with options: {:?}", region_id, self.options); @@ -422,13 +434,14 @@ impl RegionOpener { /// Returns a new manifest options. fn manifest_options( - &self, config: &MitoConfig, options: &RegionOptions, + region_dir: &str, + object_store_manager: &ObjectStoreManagerRef, ) -> Result { - let object_store = self.object_store(&options.storage)?.clone(); + let object_store = get_object_store(&options.storage, object_store_manager)?; Ok(RegionManifestOptions { - manifest_dir: new_manifest_dir(&self.region_dir), + manifest_dir: new_manifest_dir(region_dir), object_store, // We don't allow users to set the compression algorithm as we use it as a file suffix. // Currently, the manifest storage doesn't have good support for changing compression algorithms. @@ -436,20 +449,62 @@ impl RegionOpener { checkpoint_distance: config.manifest_checkpoint_distance, }) } +} - /// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store. - fn object_store(&self, name: &Option) -> Result<&object_store::ObjectStore> { - if let Some(name) = name { - Ok(self - .object_store_manager - .find(name) - .context(ObjectStoreNotFoundSnafu { - object_store: name.to_string(), - })?) - } else { - Ok(self.object_store_manager.default_object_store()) +/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store. +pub fn get_object_store( + name: &Option, + object_store_manager: &ObjectStoreManagerRef, +) -> Result { + if let Some(name) = name { + Ok(object_store_manager + .find(name) + .context(ObjectStoreNotFoundSnafu { + object_store: name.to_string(), + })? + .clone()) + } else { + Ok(object_store_manager.default_object_store().clone()) + } +} + +/// A loader for loading metadata from a region dir. +pub struct RegionMetadataLoader { + config: Arc, + object_store_manager: ObjectStoreManagerRef, +} + +impl RegionMetadataLoader { + /// Creates a new `RegionOpenerBuilder`. + pub fn new(config: Arc, object_store_manager: ObjectStoreManagerRef) -> Self { + Self { + config, + object_store_manager, } } + + /// Loads the metadata of the region from the region dir. + pub async fn load( + &self, + region_dir: &str, + region_options: &RegionOptions, + ) -> Result> { + let region_manifest_options = RegionOpener::manifest_options( + &self.config, + region_options, + region_dir, + &self.object_store_manager, + )?; + let Some(manifest_manager) = + RegionManifestManager::open(region_manifest_options, Arc::new(AtomicU64::new(0))) + .await? + else { + return Ok(None); + }; + + let manifest = manifest_manager.manifest(); + Ok(Some(manifest.metadata.clone())) + } } /// Checks whether the recovered region has the same schema as region to create. diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index ade501fbc4..77a7345317 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -16,9 +16,9 @@ pub(crate) mod bloom_filter; mod codec; pub(crate) mod fulltext_index; mod indexer; -pub(crate) mod intermediate; +pub mod intermediate; pub(crate) mod inverted_index; -pub(crate) mod puffin_manager; +pub mod puffin_manager; mod statistics; pub(crate) mod store; diff --git a/src/sst-convert/Cargo.toml b/src/sst-convert/Cargo.toml index 436b1dfd8b..2e71c2bf21 100644 --- a/src/sst-convert/Cargo.toml +++ b/src/sst-convert/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] mito2.workspace = true object-store.workspace = true +snafu.workspace = true store-api.workspace = true [lints] diff --git a/src/sst-convert/src/lib.rs b/src/sst-convert/src/lib.rs index 85ce8bb821..11cb656ec7 100644 --- a/src/sst-convert/src/lib.rs +++ b/src/sst-convert/src/lib.rs @@ -14,4 +14,4 @@ mod reader; mod table; -mod writer; +pub mod writer; diff --git a/src/sst-convert/src/writer.rs b/src/sst-convert/src/writer.rs index 5aaa6832ee..74edc42928 100644 --- a/src/sst-convert/src/writer.rs +++ b/src/sst-convert/src/writer.rs @@ -13,3 +13,129 @@ // limitations under the License. //! Utilities for writing SST files. + +use std::sync::Arc; + +use mito2::access_layer::{ + AccessLayer, AccessLayerRef, OperationType, SstInfoArray, SstWriteRequest, +}; +use mito2::cache::CacheManager; +use mito2::config::MitoConfig; +use mito2::error::{RegionMetadataNotFoundSnafu, Result}; +use mito2::read::Source; +use mito2::region::opener::RegionMetadataLoader; +use mito2::region::options::RegionOptions; +use mito2::sst::index::intermediate::IntermediateManager; +use mito2::sst::index::puffin_manager::PuffinManagerFactory; +use mito2::sst::parquet::WriteOptions; +use object_store::manager::ObjectStoreManagerRef; +use snafu::OptionExt; +use store_api::metadata::RegionMetadataRef; + +/// A writer that can create multiple SST files for a region. +pub struct RegionWriter { + /// Mito engine config. + config: Arc, + /// Metadata of the region. + metadata: RegionMetadataRef, + /// Options of the region. + region_options: RegionOptions, + /// SST access layer. + access_layer: AccessLayerRef, +} + +impl RegionWriter { + /// Writes data from a source to SST files. + pub async fn write_sst( + &self, + source: Source, + write_opts: &WriteOptions, + ) -> Result { + let request = SstWriteRequest { + op_type: OperationType::Flush, + metadata: self.metadata.clone(), + source, + cache_manager: Arc::new(CacheManager::default()), + storage: None, + max_sequence: None, + index_options: self.region_options.index_options.clone(), + inverted_index_config: self.config.inverted_index.clone(), + fulltext_index_config: self.config.fulltext_index.clone(), + bloom_filter_index_config: self.config.bloom_filter_index.clone(), + }; + + self.access_layer.write_sst(request, write_opts).await + } +} + +/// Creator to create [`RegionWriter`] for different regions. +pub struct RegionWriterCreator { + /// Mito engine config. + config: Arc, + /// Object stores. + object_store_manager: ObjectStoreManagerRef, + /// Loader to load region metadata. + metadata_loader: RegionMetadataLoader, + puffin_manager_factory: PuffinManagerFactory, + intermediate_manager: IntermediateManager, +} + +impl RegionWriterCreator { + /// Create a new [`RegionContextCreator`]. + pub async fn new( + config: MitoConfig, + object_store_manager: ObjectStoreManagerRef, + ) -> Result { + let puffin_manager_factory = PuffinManagerFactory::new( + &config.index.aux_path, + config.index.staging_size.as_bytes(), + Some(config.index.write_buffer_size.as_bytes() as _), + config.index.staging_ttl, + ) + .await?; + let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path) + .await? + .with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _)); + let config = Arc::new(config); + let metadata_loader = + RegionMetadataLoader::new(config.clone(), object_store_manager.clone()); + + Ok(Self { + config, + object_store_manager, + metadata_loader, + puffin_manager_factory, + intermediate_manager, + }) + } + + /// Builds a [`RegionWriter`] for the given region directory. + pub async fn build( + &self, + region_dir: &str, + region_options: RegionOptions, + ) -> Result { + let metadata = self + .metadata_loader + .load(region_dir, ®ion_options) + .await? + .context(RegionMetadataNotFoundSnafu)?; + let object_store = mito2::region::opener::get_object_store( + ®ion_options.storage, + &self.object_store_manager, + )?; + let access_layer = Arc::new(AccessLayer::new( + region_dir, + object_store, + self.puffin_manager_factory.clone(), + self.intermediate_manager.clone(), + )); + + Ok(RegionWriter { + config: self.config.clone(), + metadata, + region_options, + access_layer, + }) + } +}