feat: implement sst writer

This commit is contained in:
evenyag
2025-03-08 17:22:03 +08:00
parent cb3fad0c2d
commit 9f14edbb28
9 changed files with 222 additions and 39 deletions

1
Cargo.lock generated
View File

@@ -11209,6 +11209,7 @@ version = "0.13.0"
dependencies = [
"mito2",
"object-store",
"snafu 0.8.5",
"store-api",
]

View File

@@ -121,7 +121,7 @@ impl AccessLayer {
/// Writes a SST with specific `file_id` and `metadata` to the layer.
///
/// Returns the info of the SST. If no data written, returns None.
pub(crate) async fn write_sst(
pub async fn write_sst(
&self,
request: SstWriteRequest,
write_opts: &WriteOptions,
@@ -191,26 +191,26 @@ impl AccessLayer {
/// `OperationType` represents the origin of the `SstWriteRequest`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum OperationType {
pub enum OperationType {
Flush,
Compact,
}
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
pub struct SstWriteRequest {
pub op_type: OperationType,
pub metadata: RegionMetadataRef,
pub source: Source,
pub cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
pub(crate) max_sequence: Option<SequenceNumber>,
pub storage: Option<String>,
pub max_sequence: Option<SequenceNumber>,
/// Configs for index
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
pub(crate) bloom_filter_index_config: BloomFilterConfig,
pub index_options: IndexOptions,
pub inverted_index_config: InvertedIndexConfig,
pub fulltext_index_config: FulltextIndexConfig,
pub bloom_filter_index_config: BloomFilterConfig,
}
pub(crate) async fn new_fs_cache_store(root: &str) -> Result<ObjectStore> {

View File

@@ -23,8 +23,8 @@
#[cfg_attr(feature = "test", allow(unused))]
pub mod test_util;
mod access_layer;
mod cache;
pub mod access_layer;
pub mod cache;
pub mod compaction;
pub mod config;
pub mod engine;

View File

@@ -14,7 +14,7 @@
//! Mito region.
pub(crate) mod opener;
pub mod opener;
pub mod options;
pub(crate) mod version;

View File

@@ -15,7 +15,7 @@
//! Region opener.
use std::collections::HashMap;
use std::sync::atomic::AtomicI64;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::sync::Arc;
use common_telemetry::{debug, error, info, warn};
@@ -27,7 +27,9 @@ use object_store::util::{join_dir, normalize_dir};
use snafu::{ensure, OptionExt, ResultExt};
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::metadata::{
ColumnMetadata, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef,
};
use store_api::region_engine::RegionRole;
use store_api::storage::{ColumnId, RegionId};
@@ -203,11 +205,16 @@ impl RegionOpener {
}
// Safety: must be set before calling this method.
let options = self.options.take().unwrap();
let object_store = self.object_store(&options.storage)?.clone();
let object_store = get_object_store(&options.storage, &self.object_store_manager)?.clone();
let provider = self.provider(&options.wal_options);
let metadata = Arc::new(metadata);
// Create a manifest manager for this region and writes regions to the manifest file.
let region_manifest_options = self.manifest_options(config, &options)?;
let region_manifest_options = Self::manifest_options(
config,
&options,
&self.region_dir,
&self.object_store_manager,
)?;
let manifest_manager = RegionManifestManager::new(
metadata.clone(),
region_manifest_options,
@@ -312,7 +319,12 @@ impl RegionOpener {
) -> Result<Option<MitoRegion>> {
let region_options = self.options.as_ref().unwrap().clone();
let region_manifest_options = self.manifest_options(config, &region_options)?;
let region_manifest_options = Self::manifest_options(
config,
&region_options,
&self.region_dir,
&self.object_store_manager,
)?;
let Some(manifest_manager) = RegionManifestManager::open(
region_manifest_options,
self.stats.total_manifest_size.clone(),
@@ -332,7 +344,7 @@ impl RegionOpener {
.take()
.unwrap_or_else(|| wal.wal_entry_reader(&provider, region_id, None));
let on_region_opened = wal.on_region_opened();
let object_store = self.object_store(&region_options.storage)?.clone();
let object_store = get_object_store(&region_options.storage, &self.object_store_manager)?;
debug!("Open region {} with options: {:?}", region_id, self.options);
@@ -422,13 +434,14 @@ impl RegionOpener {
/// Returns a new manifest options.
fn manifest_options(
&self,
config: &MitoConfig,
options: &RegionOptions,
region_dir: &str,
object_store_manager: &ObjectStoreManagerRef,
) -> Result<RegionManifestOptions> {
let object_store = self.object_store(&options.storage)?.clone();
let object_store = get_object_store(&options.storage, object_store_manager)?;
Ok(RegionManifestOptions {
manifest_dir: new_manifest_dir(&self.region_dir),
manifest_dir: new_manifest_dir(region_dir),
object_store,
// We don't allow users to set the compression algorithm as we use it as a file suffix.
// Currently, the manifest storage doesn't have good support for changing compression algorithms.
@@ -436,20 +449,62 @@ impl RegionOpener {
checkpoint_distance: config.manifest_checkpoint_distance,
})
}
}
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
fn object_store(&self, name: &Option<String>) -> Result<&object_store::ObjectStore> {
if let Some(name) = name {
Ok(self
.object_store_manager
.find(name)
.context(ObjectStoreNotFoundSnafu {
object_store: name.to_string(),
})?)
} else {
Ok(self.object_store_manager.default_object_store())
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
pub fn get_object_store(
name: &Option<String>,
object_store_manager: &ObjectStoreManagerRef,
) -> Result<object_store::ObjectStore> {
if let Some(name) = name {
Ok(object_store_manager
.find(name)
.context(ObjectStoreNotFoundSnafu {
object_store: name.to_string(),
})?
.clone())
} else {
Ok(object_store_manager.default_object_store().clone())
}
}
/// A loader for loading metadata from a region dir.
pub struct RegionMetadataLoader {
config: Arc<MitoConfig>,
object_store_manager: ObjectStoreManagerRef,
}
impl RegionMetadataLoader {
/// Creates a new `RegionOpenerBuilder`.
pub fn new(config: Arc<MitoConfig>, object_store_manager: ObjectStoreManagerRef) -> Self {
Self {
config,
object_store_manager,
}
}
/// Loads the metadata of the region from the region dir.
pub async fn load(
&self,
region_dir: &str,
region_options: &RegionOptions,
) -> Result<Option<RegionMetadataRef>> {
let region_manifest_options = RegionOpener::manifest_options(
&self.config,
region_options,
region_dir,
&self.object_store_manager,
)?;
let Some(manifest_manager) =
RegionManifestManager::open(region_manifest_options, Arc::new(AtomicU64::new(0)))
.await?
else {
return Ok(None);
};
let manifest = manifest_manager.manifest();
Ok(Some(manifest.metadata.clone()))
}
}
/// Checks whether the recovered region has the same schema as region to create.

View File

@@ -16,9 +16,9 @@ pub(crate) mod bloom_filter;
mod codec;
pub(crate) mod fulltext_index;
mod indexer;
pub(crate) mod intermediate;
pub mod intermediate;
pub(crate) mod inverted_index;
pub(crate) mod puffin_manager;
pub mod puffin_manager;
mod statistics;
pub(crate) mod store;

View File

@@ -7,6 +7,7 @@ license.workspace = true
[dependencies]
mito2.workspace = true
object-store.workspace = true
snafu.workspace = true
store-api.workspace = true
[lints]

View File

@@ -14,4 +14,4 @@
mod reader;
mod table;
mod writer;
pub mod writer;

View File

@@ -13,3 +13,129 @@
// limitations under the License.
//! Utilities for writing SST files.
use std::sync::Arc;
use mito2::access_layer::{
AccessLayer, AccessLayerRef, OperationType, SstInfoArray, SstWriteRequest,
};
use mito2::cache::CacheManager;
use mito2::config::MitoConfig;
use mito2::error::{RegionMetadataNotFoundSnafu, Result};
use mito2::read::Source;
use mito2::region::opener::RegionMetadataLoader;
use mito2::region::options::RegionOptions;
use mito2::sst::index::intermediate::IntermediateManager;
use mito2::sst::index::puffin_manager::PuffinManagerFactory;
use mito2::sst::parquet::WriteOptions;
use object_store::manager::ObjectStoreManagerRef;
use snafu::OptionExt;
use store_api::metadata::RegionMetadataRef;
/// A writer that can create multiple SST files for a region.
pub struct RegionWriter {
/// Mito engine config.
config: Arc<MitoConfig>,
/// Metadata of the region.
metadata: RegionMetadataRef,
/// Options of the region.
region_options: RegionOptions,
/// SST access layer.
access_layer: AccessLayerRef,
}
impl RegionWriter {
/// Writes data from a source to SST files.
pub async fn write_sst(
&self,
source: Source,
write_opts: &WriteOptions,
) -> Result<SstInfoArray> {
let request = SstWriteRequest {
op_type: OperationType::Flush,
metadata: self.metadata.clone(),
source,
cache_manager: Arc::new(CacheManager::default()),
storage: None,
max_sequence: None,
index_options: self.region_options.index_options.clone(),
inverted_index_config: self.config.inverted_index.clone(),
fulltext_index_config: self.config.fulltext_index.clone(),
bloom_filter_index_config: self.config.bloom_filter_index.clone(),
};
self.access_layer.write_sst(request, write_opts).await
}
}
/// Creator to create [`RegionWriter`] for different regions.
pub struct RegionWriterCreator {
/// Mito engine config.
config: Arc<MitoConfig>,
/// Object stores.
object_store_manager: ObjectStoreManagerRef,
/// Loader to load region metadata.
metadata_loader: RegionMetadataLoader,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
}
impl RegionWriterCreator {
/// Create a new [`RegionContextCreator`].
pub async fn new(
config: MitoConfig,
object_store_manager: ObjectStoreManagerRef,
) -> Result<Self> {
let puffin_manager_factory = PuffinManagerFactory::new(
&config.index.aux_path,
config.index.staging_size.as_bytes(),
Some(config.index.write_buffer_size.as_bytes() as _),
config.index.staging_ttl,
)
.await?;
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let config = Arc::new(config);
let metadata_loader =
RegionMetadataLoader::new(config.clone(), object_store_manager.clone());
Ok(Self {
config,
object_store_manager,
metadata_loader,
puffin_manager_factory,
intermediate_manager,
})
}
/// Builds a [`RegionWriter`] for the given region directory.
pub async fn build(
&self,
region_dir: &str,
region_options: RegionOptions,
) -> Result<RegionWriter> {
let metadata = self
.metadata_loader
.load(region_dir, &region_options)
.await?
.context(RegionMetadataNotFoundSnafu)?;
let object_store = mito2::region::opener::get_object_store(
&region_options.storage,
&self.object_store_manager,
)?;
let access_layer = Arc::new(AccessLayer::new(
region_dir,
object_store,
self.puffin_manager_factory.clone(),
self.intermediate_manager.clone(),
));
Ok(RegionWriter {
config: self.config.clone(),
metadata,
region_options,
access_layer,
})
}
}