feat: introduce IndexBuildTask for async index build (#6927)

* feat: add framework for asynchronous index building

Signed-off-by: SNC123 <sinhco@outlook.com>

* test: add unit tests for IndexBuildTask

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: clippy,format,fix-udeps

Signed-off-by: SNC123 <sinhco@outlook.com>

* fix: correct write cache logic in IndexBuildTask

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: clippy, resolve conflicts

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: resolve conflicts

Signed-off-by: SNC123 <sinhco@outlook.com>

* fix: apply review suggestions

Signed-off-by: SNC123 <sinhco@outlook.com>

* chore: resolve conflicts

Signed-off-by: SNC123 <sinhco@outlook.com>

* fix: clean up index files in aborted case

Signed-off-by: SNC123 <sinhco@outlook.com>

* refactor: move manifest update logic into IndexBuildTask

Signed-off-by: SNC123 <sinhco@outlook.com>

* fix: enhance check file logic and error handling

Signed-off-by: SNC123 <sinhco@outlook.com>

---------

Signed-off-by: SNC123 <sinhco@outlook.com>
This commit is contained in:
Sicong Hu
2025-10-10 11:29:32 +08:00
committed by GitHub
parent 47c1ef672a
commit 779865d389
15 changed files with 1049 additions and 55 deletions

View File

@@ -32,7 +32,7 @@ use store_api::storage::{FileId, RegionId, SequenceNumber};
use crate::cache::CacheManagerRef;
use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::cache::write_cache::SstUploadRequest;
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, IndexConfig, InvertedIndexConfig};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::metrics::{COMPACTION_STAGE_ELAPSED, FLUSH_ELAPSED};
use crate::read::{FlatSource, Source};
@@ -40,7 +40,7 @@ use crate::region::options::IndexOptions;
use crate::sst::file::{FileHandle, RegionFileId};
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
use crate::sst::location::{self, region_dir_from_table_dir};
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
@@ -204,6 +204,14 @@ impl AccessLayer {
&self.intermediate_manager
}
/// Build the puffin manager.
pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
let store = self.object_store.clone();
let path_provider =
RegionFilePathFactory::new(self.table_dir().to_string(), self.path_type());
self.puffin_manager_factory.build(store, path_provider)
}
/// Deletes a SST file (and its index file if it has one) with given file id.
pub(crate) async fn delete_sst(&self, region_file_id: &RegionFileId) -> Result<()> {
let path = location::sst_file_path(&self.table_dir, *region_file_id, self.path_type);
@@ -273,7 +281,7 @@ impl AccessLayer {
let store = self.object_store.clone();
let path_provider = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
let indexer_builder = IndexerBuilderImpl {
op_type: request.op_type,
build_type: request.op_type.into(),
metadata: request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self
@@ -292,6 +300,7 @@ impl AccessLayer {
let mut writer = ParquetWriter::new_with_object_store(
self.object_store.clone(),
request.metadata,
request.index_config,
indexer_builder,
path_provider,
Metrics::new(write_type),
@@ -435,6 +444,7 @@ pub struct SstWriteRequest {
/// Configs for index
pub index_options: IndexOptions,
pub index_config: IndexConfig,
pub inverted_index_config: InvertedIndexConfig,
pub fulltext_index_config: FulltextIndexConfig,
pub bloom_filter_index_config: BloomFilterConfig,

View File

@@ -36,7 +36,7 @@ use crate::metrics::{
use crate::sst::file::RegionFileId;
use crate::sst::index::IndexerBuilderImpl;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::puffin_manager::{PuffinManagerFactory, SstPuffinManager};
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
@@ -101,6 +101,13 @@ impl WriteCache {
self.file_cache.clone()
}
/// Build the puffin manager
pub(crate) fn build_puffin_manager(&self) -> SstPuffinManager {
let store = self.file_cache.local_store();
let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
self.puffin_manager_factory.build(store, path_provider)
}
/// Put encoded SST data to the cache and upload to the remote object store.
pub(crate) async fn put_and_upload_sst(
&self,
@@ -151,6 +158,11 @@ impl WriteCache {
Ok(metrics)
}
/// Returns the intermediate manager of the write cache.
pub(crate) fn intermediate_manager(&self) -> &IntermediateManager {
&self.intermediate_manager
}
/// Writes SST to the cache and then uploads it to the remote object store.
pub(crate) async fn write_and_upload_sst(
&self,
@@ -164,7 +176,7 @@ impl WriteCache {
let store = self.file_cache.local_store();
let path_provider = WriteCachePathProvider::new(self.file_cache.clone());
let indexer = IndexerBuilderImpl {
op_type: write_request.op_type,
build_type: write_request.op_type.into(),
metadata: write_request.metadata.clone(),
row_group_size: write_opts.row_group_size,
puffin_manager: self
@@ -182,6 +194,7 @@ impl WriteCache {
let mut writer = ParquetWriter::new_with_object_store(
store.clone(),
write_request.metadata,
write_request.index_config,
indexer,
path_provider.clone(),
Metrics::new(write_type),
@@ -342,7 +355,7 @@ impl WriteCache {
}
/// Uploads a Parquet file or a Puffin file to the remote object store.
async fn upload(
pub(crate) async fn upload(
&self,
index_key: IndexKey,
upload_path: &str,
@@ -423,7 +436,7 @@ pub struct SstUploadRequest {
}
/// A structs to track files to upload and clean them if upload failed.
struct UploadTracker {
pub(crate) struct UploadTracker {
/// Id of the region to track.
region_id: RegionId,
/// Paths of files uploaded successfully.
@@ -432,7 +445,7 @@ struct UploadTracker {
impl UploadTracker {
/// Creates a new instance of `UploadTracker` for a given region.
fn new(region_id: RegionId) -> Self {
pub(crate) fn new(region_id: RegionId) -> Self {
Self {
region_id,
files_uploaded: Vec::new(),
@@ -440,12 +453,12 @@ impl UploadTracker {
}
/// Add a file path to the list of uploaded files.
fn push_uploaded_file(&mut self, path: String) {
pub(crate) fn push_uploaded_file(&mut self, path: String) {
self.files_uploaded.push(path);
}
/// Cleans uploaded files and files in the file cache at best effort.
async fn clean(
pub(crate) async fn clean(
&self,
sst_info: &SstInfoArray,
file_cache: &FileCacheRef,
@@ -529,6 +542,7 @@ mod tests {
max_sequence: None,
cache_manager: Default::default(),
index_options: IndexOptions::default(),
index_config: Default::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
@@ -627,6 +641,7 @@ mod tests {
max_sequence: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
index_config: Default::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
@@ -706,6 +721,7 @@ mod tests {
max_sequence: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
index_config: Default::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),

View File

@@ -345,6 +345,7 @@ impl Compactor for DefaultCompactor {
let flat_format = compaction_region
.engine_config
.enable_experimental_flat_format;
let index_config = compaction_region.engine_config.index.clone();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =
@@ -389,6 +390,7 @@ impl Compactor for DefaultCompactor {
storage,
max_sequence: max_sequence.map(NonZero::get),
index_options,
index_config,
inverted_index_config,
fulltext_index_config,
bloom_filter_index_config,

View File

@@ -80,6 +80,8 @@ pub struct MitoConfig {
pub compress_manifest: bool,
// Background job configs:
/// Max number of running background index build jobs (default: 1/8 of cpu cores).
pub max_background_index_builds: usize,
/// Max number of running background flush jobs (default: 1/2 of cpu cores).
pub max_background_flushes: usize,
/// Max number of running background compaction jobs (default: 1/4 of cpu cores).
@@ -157,6 +159,7 @@ impl Default for MitoConfig {
experimental_manifest_keep_removed_file_count: 256,
experimental_manifest_keep_removed_file_ttl: Duration::from_secs(60 * 60),
compress_manifest: false,
max_background_index_builds: divide_num_cpus(8),
max_background_flushes: divide_num_cpus(2),
max_background_compactions: divide_num_cpus(4),
max_background_purges: common_config::utils::get_cpus(),
@@ -308,6 +311,17 @@ impl MitoConfig {
}
}
/// Index build mode.
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum IndexBuildMode {
/// Build index synchronously.
#[default]
Sync,
/// Build index asynchronously.
Async,
}
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
@@ -331,6 +345,9 @@ pub struct IndexConfig {
#[serde(with = "humantime_serde")]
pub staging_ttl: Option<Duration>,
/// Index Build Mode
pub build_mode: IndexBuildMode,
/// Write buffer size for creating the index.
pub write_buffer_size: ReadableSize,
@@ -350,6 +367,7 @@ impl Default for IndexConfig {
aux_path: String::new(),
staging_size: ReadableSize::gb(2),
staging_ttl: Some(Duration::from_secs(7 * 24 * 60 * 60)),
build_mode: IndexBuildMode::default(),
write_buffer_size: ReadableSize::mb(8),
metadata_cache_size: ReadableSize::mb(64),
content_cache_size: ReadableSize::mb(128),

View File

@@ -658,6 +658,7 @@ impl RegionFlushTask {
storage: version.options.storage.clone(),
max_sequence: Some(max_sequence),
index_options: self.index_options.clone(),
index_config: self.engine_config.index.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
fulltext_index_config: self.engine_config.fulltext_index.clone(),
bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),

View File

@@ -50,6 +50,8 @@ use crate::manifest::action::{RegionEdit, TruncateKind};
use crate::memtable::MemtableId;
use crate::memtable::bulk::part::BulkPart;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
use crate::sst::file::FileMeta;
use crate::sst::index::IndexBuildType;
use crate::wal::EntryId;
use crate::wal::entry_distributor::WalEntryReceiver;
@@ -597,6 +599,10 @@ pub(crate) enum WorkerRequest {
/// Keep the manifest of a region up to date.
SyncRegion(RegionSyncRequest),
/// Build indexes of a region.
#[allow(dead_code)]
BuildIndexRegion(RegionBuildIndexRequest),
/// Bulk inserts request and region metadata.
BulkInserts {
metadata: Option<RegionMetadataRef>,
@@ -776,6 +782,11 @@ pub(crate) enum BackgroundNotify {
FlushFinished(FlushFinished),
/// Flush has failed.
FlushFailed(FlushFailed),
/// Index build has finished.
IndexBuildFinished(IndexBuildFinished),
/// Index build has failed.
#[allow(dead_code)]
IndexBuildFailed(IndexBuildFailed),
/// Compaction has finished.
CompactionFinished(CompactionFinished),
/// Compaction has failed.
@@ -832,6 +843,20 @@ pub(crate) struct FlushFailed {
pub(crate) err: Arc<Error>,
}
#[derive(Debug)]
pub(crate) struct IndexBuildFinished {
#[allow(dead_code)]
pub(crate) region_id: RegionId,
pub(crate) edit: RegionEdit,
}
/// Notifies an index build job has failed.
#[derive(Debug)]
pub(crate) struct IndexBuildFailed {
#[allow(dead_code)]
pub(crate) err: Arc<Error>,
}
/// Notifies a compaction job has finished.
#[derive(Debug)]
pub(crate) struct CompactionFinished {
@@ -924,6 +949,14 @@ pub(crate) struct RegionEditResult {
pub(crate) result: Result<()>,
}
#[derive(Debug)]
pub(crate) struct RegionBuildIndexRequest {
pub(crate) region_id: RegionId,
pub(crate) build_type: IndexBuildType,
/// files need to build index, empty means all.
pub(crate) file_metas: Vec<FileMeta>,
}
#[derive(Debug)]
pub(crate) struct RegionSyncRequest {
pub(crate) region_id: RegionId,

View File

@@ -317,6 +317,10 @@ impl FileHandle {
&self.inner.meta
}
pub fn file_purger(&self) -> FilePurgerRef {
self.inner.file_purger.clone()
}
pub fn size(&self) -> u64 {
self.inner.meta.file_size
}
@@ -332,6 +336,10 @@ impl FileHandle {
pub fn level(&self) -> Level {
self.inner.meta.level
}
pub fn is_deleted(&self) -> bool {
self.inner.deleted.load(Ordering::Relaxed)
}
}
/// Inner data of [FileHandle].

View File

@@ -22,25 +22,40 @@ mod statistics;
pub(crate) mod store;
use std::num::NonZeroUsize;
use std::sync::Arc;
use bloom_filter::creator::BloomFilterIndexer;
use common_telemetry::{debug, warn};
use common_telemetry::{debug, info, warn};
use datatypes::arrow::record_batch::RecordBatch;
use puffin_manager::SstPuffinManager;
use smallvec::SmallVec;
use smallvec::{SmallVec, smallvec};
use statistics::{ByteCount, RowCount};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, FileId, RegionId};
use strum::IntoStaticStr;
use tokio::sync::{mpsc, oneshot};
use crate::access_layer::OperationType;
use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
use crate::error::Result;
use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
use crate::read::Batch;
use crate::read::{Batch, BatchReader};
use crate::region::options::IndexOptions;
use crate::sst::file::IndexType;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::region::{ManifestContextRef, RegionLeaderState};
use crate::request::{
BackgroundNotify, IndexBuildFailed, IndexBuildFinished, WorkerRequest, WorkerRequestWithTime,
};
use crate::schedule::scheduler::{Job, SchedulerRef};
use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId};
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::inverted_index::creator::InvertedIndexer;
use crate::sst::parquet::SstInfo;
pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
@@ -179,9 +194,9 @@ pub trait IndexerBuilder {
/// Builds indexer of given file id to [index_file_path].
async fn build(&self, file_id: FileId) -> Indexer;
}
#[derive(Clone)]
pub(crate) struct IndexerBuilderImpl {
pub(crate) op_type: OperationType,
pub(crate) build_type: IndexBuildType,
pub(crate) metadata: RegionMetadataRef,
pub(crate) row_group_size: usize,
pub(crate) puffin_manager: SstPuffinManager,
@@ -221,9 +236,10 @@ impl IndexerBuilder for IndexerBuilderImpl {
impl IndexerBuilderImpl {
fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
let create = match self.op_type {
OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
let create = match self.build_type {
IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
_ => true,
};
if !create {
@@ -281,9 +297,10 @@ impl IndexerBuilderImpl {
}
async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
let create = match self.op_type {
OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
let create = match self.build_type {
IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
_ => true,
};
if !create {
@@ -334,9 +351,10 @@ impl IndexerBuilderImpl {
}
fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
let create = match self.op_type {
OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
let create = match self.build_type {
IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
_ => true,
};
if !create {
@@ -384,11 +402,279 @@ impl IndexerBuilderImpl {
}
}
/// Type of an index build task.
#[derive(Debug, Clone, PartialEq, IntoStaticStr)]
pub enum IndexBuildType {
/// Build index when schema change.
SchemaChange,
/// Create or update index after flush.
Flush,
/// Create or update index after compact.
Compact,
/// Manually build index.
Manual,
}
impl IndexBuildType {
fn as_str(&self) -> &'static str {
self.into()
}
}
impl From<OperationType> for IndexBuildType {
fn from(op_type: OperationType) -> Self {
match op_type {
OperationType::Flush => IndexBuildType::Flush,
OperationType::Compact => IndexBuildType::Compact,
}
}
}
/// Outcome of an index build task.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum IndexBuildOutcome {
Finished,
Aborted(String),
}
pub struct IndexBuildTask {
/// The file meta to build index for.
pub file_meta: FileMeta,
pub reason: IndexBuildType,
pub access_layer: AccessLayerRef,
pub(crate) manifest_ctx: ManifestContextRef,
pub write_cache: Option<WriteCacheRef>,
pub file_purger: FilePurgerRef,
/// When write cache is enabled, the indexer builder should be built from the write cache.
/// Otherwise, it should be built from the access layer.
pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
/// Request sender to notify the region worker.
pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
/// Optional sender to send the result back to the caller.
pub result_sender: Option<oneshot::Sender<IndexBuildOutcome>>,
}
impl IndexBuildTask {
fn into_index_build_job(mut self, version_control: &VersionControlRef) -> Job {
let version_data = version_control.current();
Box::pin(async move {
self.do_index_build(version_data).await;
})
}
async fn do_index_build(&mut self, version_data: VersionControlData) {
let outcome = match self.index_build(&version_data).await {
Ok(outcome) => outcome,
Err(e) => {
warn!(
e; "Index build task failed, region: {}, file_id: {}",
self.file_meta.region_id, self.file_meta.file_id,
);
IndexBuildOutcome::Aborted(format!("Index build failed: {}", e))
}
};
if let Some(sender) = self.result_sender.take() {
let _ = sender.send(outcome);
}
}
// Checks if the SST file still exists in object store and version to avoid conflict with compaction.
async fn check_sst_file_exists(&self, version: &VersionRef) -> bool {
let region_id = self.file_meta.region_id;
let file_id = self.file_meta.file_id;
let found_in_version = version
.ssts
.levels()
.iter()
.flat_map(|level| level.files.iter())
.any(|(id, handle)| {
*id == self.file_meta.file_id && !handle.is_deleted() && !handle.compacting()
});
if !found_in_version {
warn!(
"File id {} not found in region version for index build, region: {}",
file_id, region_id
);
false
} else {
// If the file's metadata is present in the current version, the physical SST file
// is guaranteed to exist on object store. The file purger removes the physical
// file only after its metadata is removed from the version.
true
}
}
async fn index_build(
&mut self,
version_data: &VersionControlData,
) -> Result<IndexBuildOutcome> {
let version = &version_data.version;
let mut indexer = self.indexer_builder.build(self.file_meta.file_id).await;
let mut parquet_reader = self
.access_layer
.read_sst(FileHandle::new(
self.file_meta.clone(),
self.file_purger.clone(),
))
.build()
.await?;
// TODO(SNC123): optimize index batch
loop {
match parquet_reader.next_batch().await {
Ok(Some(batch)) => {
indexer.update(&mut batch.clone()).await;
}
Ok(None) => break,
Err(e) => {
indexer.abort().await;
return Err(e);
}
}
}
let index_output = indexer.finish().await;
if index_output.file_size > 0 {
// Check SST file existence again after building index.
if !self.check_sst_file_exists(version).await {
// Calls abort to clean up index files.
indexer.abort().await;
return Ok(IndexBuildOutcome::Aborted(format!(
"SST file not found during index build, region: {}, file_id: {}",
self.file_meta.region_id, self.file_meta.file_id
)));
}
// Upload index file if write cache is enabled.
self.maybe_upload_index_file(index_output.clone()).await?;
let worker_request = match self.update_manifest(index_output).await {
Ok(edit) => {
let index_build_finished = IndexBuildFinished {
region_id: self.file_meta.region_id,
edit,
};
WorkerRequest::Background {
region_id: self.file_meta.region_id,
notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
}
}
Err(e) => {
let err = Arc::new(e);
WorkerRequest::Background {
region_id: self.file_meta.region_id,
notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
}
}
};
let _ = self
.request_sender
.send(WorkerRequestWithTime::new(worker_request))
.await;
}
Ok(IndexBuildOutcome::Finished)
}
async fn maybe_upload_index_file(&self, output: IndexOutput) -> Result<()> {
if let Some(write_cache) = &self.write_cache {
let file_id = self.file_meta.file_id;
let region_id = self.file_meta.region_id;
let remote_store = self.access_layer.object_store();
let mut upload_tracker = UploadTracker::new(region_id);
let mut err = None;
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let puffin_path = RegionFilePathFactory::new(
self.access_layer.table_dir().to_string(),
self.access_layer.path_type(),
)
.build_index_file_path(RegionFileId::new(region_id, file_id));
if let Err(e) = write_cache
.upload(puffin_key, &puffin_path, remote_store)
.await
{
err = Some(e);
}
upload_tracker.push_uploaded_file(puffin_path);
if let Some(err) = err {
// Cleans index files on failure.
upload_tracker
.clean(
&smallvec![SstInfo {
file_id,
index_metadata: output,
..Default::default()
}],
&write_cache.file_cache(),
remote_store,
)
.await;
return Err(err);
}
} else {
debug!("write cache is not available, skip uploading index file");
}
Ok(())
}
async fn update_manifest(&mut self, output: IndexOutput) -> Result<RegionEdit> {
self.file_meta.available_indexes = output.build_available_indexes();
self.file_meta.index_file_size = output.file_size;
let edit = RegionEdit {
files_to_add: vec![self.file_meta.clone()],
files_to_remove: vec![],
timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
flushed_sequence: None,
flushed_entry_id: None,
committed_sequence: None,
compaction_time_window: None,
};
let version = self
.manifest_ctx
.update_manifest(
RegionLeaderState::Writable,
RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
)
.await?;
info!(
"Successfully update manifest version to {version}, region: {}, reason: {}",
self.file_meta.region_id,
self.reason.as_str()
);
Ok(edit)
}
}
#[derive(Clone)]
pub struct IndexBuildScheduler {
scheduler: SchedulerRef,
}
impl IndexBuildScheduler {
pub fn new(scheduler: SchedulerRef) -> Self {
IndexBuildScheduler { scheduler }
}
pub(crate) fn schedule_build(
&mut self,
version_control: &VersionControlRef,
task: IndexBuildTask,
) -> Result<()> {
let job = task.into_index_build_job(version_control);
self.scheduler.schedule(job)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use api::v1::SemanticType;
use common_base::readable_size::ReadableSize;
use datafusion_common::HashMap;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{
ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
@@ -397,11 +683,21 @@ mod tests {
use object_store::services::Memory;
use puffin_manager::PuffinManagerFactory;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use tokio::sync::{mpsc, oneshot};
use super::*;
use crate::access_layer::FilePathProvider;
use crate::config::{FulltextIndexConfig, Mode};
use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType};
use crate::cache::write_cache::WriteCache;
use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
use crate::memtable::time_partition::TimePartitions;
use crate::region::version::{VersionBuilder, VersionControl};
use crate::sst::file::RegionFileId;
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::location;
use crate::sst::parquet::WriteOptions;
use crate::test_util::memtable_util::EmptyMemtableBuilder;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
struct MetaConfig {
with_inverted: bool,
@@ -489,7 +785,6 @@ mod tests {
async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
IntermediateManager::init_fs(path).await.unwrap()
}
struct NoopPathProvider;
impl FilePathProvider for NoopPathProvider {
@@ -502,6 +797,82 @@ mod tests {
}
}
async fn mock_sst_file(
metadata: RegionMetadataRef,
env: &SchedulerEnv,
build_mode: IndexBuildMode,
) -> SstInfo {
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
let mut index_config = MitoConfig::default().index;
index_config.build_mode = build_mode;
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
metadata: metadata.clone(),
source: either::Left(source),
storage: None,
max_sequence: None,
cache_manager: Default::default(),
index_options: IndexOptions::default(),
index_config,
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
bloom_filter_index_config: Default::default(),
};
env.access_layer
.write_sst(write_request, &WriteOptions::default(), WriteType::Flush)
.await
.unwrap()
.0
.remove(0)
}
async fn mock_version_control(
metadata: RegionMetadataRef,
file_purger: FilePurgerRef,
files: HashMap<FileId, FileMeta>,
) -> VersionControlRef {
let mutable = Arc::new(TimePartitions::new(
metadata.clone(),
Arc::new(EmptyMemtableBuilder::default()),
0,
None,
));
let version_builder = VersionBuilder::new(metadata, mutable)
.add_files(file_purger, files.values().cloned())
.build();
Arc::new(VersionControl::new(version_builder))
}
async fn mock_indexer_builder(
metadata: RegionMetadataRef,
env: &SchedulerEnv,
) -> Arc<dyn IndexerBuilder + Send + Sync> {
let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
let puffin_manager = factory.build(
env.access_layer.object_store().clone(),
RegionFilePathFactory::new(
env.access_layer.table_dir().to_string(),
env.access_layer.path_type(),
),
);
Arc::new(IndexerBuilderImpl {
build_type: IndexBuildType::Flush,
metadata,
row_group_size: 1024,
puffin_manager,
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
})
}
#[tokio::test]
async fn test_build_indexer_basic() {
let (dir, factory) =
@@ -514,7 +885,7 @@ mod tests {
with_skipping_bloom: true,
});
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
build_type: IndexBuildType::Flush,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
@@ -544,7 +915,7 @@ mod tests {
with_skipping_bloom: true,
});
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
build_type: IndexBuildType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
@@ -565,7 +936,7 @@ mod tests {
assert!(indexer.bloom_filter_indexer.is_some());
let indexer = IndexerBuilderImpl {
op_type: OperationType::Compact,
build_type: IndexBuildType::Compact,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
@@ -586,7 +957,7 @@ mod tests {
assert!(indexer.bloom_filter_indexer.is_some());
let indexer = IndexerBuilderImpl {
op_type: OperationType::Compact,
build_type: IndexBuildType::Compact,
metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
@@ -619,7 +990,7 @@ mod tests {
with_skipping_bloom: true,
});
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
build_type: IndexBuildType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
@@ -642,7 +1013,7 @@ mod tests {
with_skipping_bloom: true,
});
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
build_type: IndexBuildType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
@@ -665,7 +1036,7 @@ mod tests {
with_skipping_bloom: false,
});
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
build_type: IndexBuildType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
@@ -695,7 +1066,7 @@ mod tests {
with_skipping_bloom: true,
});
let indexer = IndexerBuilderImpl {
op_type: OperationType::Flush,
build_type: IndexBuildType::Flush,
metadata,
row_group_size: 0,
puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
@@ -710,4 +1081,321 @@ mod tests {
assert!(indexer.inverted_indexer.is_none());
}
#[tokio::test]
async fn test_index_build_task_sst_not_exist() {
let env = SchedulerEnv::new().await;
let (tx, _rx) = mpsc::channel(4);
let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
let mut scheduler = env.mock_index_build_scheduler();
let metadata = Arc::new(sst_region_metadata());
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
let file_purger = Arc::new(NoopFilePurger {});
let files = HashMap::new();
let version_control =
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
let region_id = metadata.region_id;
let indexer_builder = mock_indexer_builder(metadata, &env).await;
// Create mock task.
let task = IndexBuildTask {
file_meta: FileMeta {
region_id,
file_id: FileId::random(),
file_size: 100,
..Default::default()
},
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
manifest_ctx,
write_cache: None,
file_purger,
indexer_builder,
request_sender: tx,
result_sender: Some(result_tx),
};
// Schedule the build task and check result.
scheduler.schedule_build(&version_control, task).unwrap();
match result_rx.await.unwrap() {
IndexBuildOutcome::Aborted(_) => {}
_ => panic!("Expect aborted result due to missing SST file"),
}
}
#[tokio::test]
async fn test_index_build_task_sst_exist() {
let env = SchedulerEnv::new().await;
let mut scheduler = env.mock_index_build_scheduler();
let metadata = Arc::new(sst_region_metadata());
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
let region_id = metadata.region_id;
let file_purger = Arc::new(NoopFilePurger {});
let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
let file_meta = FileMeta {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
..Default::default()
};
let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
let version_control =
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
// Create mock task.
let (tx, mut rx) = mpsc::channel(4);
let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
let task = IndexBuildTask {
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
manifest_ctx,
write_cache: None,
file_purger,
indexer_builder,
request_sender: tx,
result_sender: Some(result_tx),
};
scheduler.schedule_build(&version_control, task).unwrap();
// The task should finish successfully.
assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
// A notification should be sent to the worker to update the manifest.
let worker_req = rx.recv().await.unwrap().request;
match worker_req {
WorkerRequest::Background {
region_id: req_region_id,
notify: BackgroundNotify::IndexBuildFinished(finished),
} => {
assert_eq!(req_region_id, region_id);
assert_eq!(finished.edit.files_to_add.len(), 1);
let updated_meta = &finished.edit.files_to_add[0];
// The mock indexer builder creates all index types.
assert!(!updated_meta.available_indexes.is_empty());
assert!(updated_meta.index_file_size > 0);
assert_eq!(updated_meta.file_id, file_meta.file_id);
}
_ => panic!("Unexpected worker request: {:?}", worker_req),
}
}
async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
let env = SchedulerEnv::new().await;
let mut scheduler = env.mock_index_build_scheduler();
let metadata = Arc::new(sst_region_metadata());
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
let file_purger = Arc::new(NoopFilePurger {});
let region_id = metadata.region_id;
let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
let file_meta = FileMeta {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
..Default::default()
};
let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
let version_control =
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
// Create mock task.
let (tx, _rx) = mpsc::channel(4);
let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
let task = IndexBuildTask {
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
manifest_ctx,
write_cache: None,
file_purger,
indexer_builder,
request_sender: tx,
result_sender: Some(result_tx),
};
scheduler.schedule_build(&version_control, task).unwrap();
let puffin_path = location::index_file_path(
env.access_layer.table_dir(),
RegionFileId::new(region_id, file_meta.file_id),
env.access_layer.path_type(),
);
if build_mode == IndexBuildMode::Async {
// The index file should not exist before the task finishes.
assert!(
!env.access_layer
.object_store()
.exists(&puffin_path)
.await
.unwrap()
);
} else {
// The index file should exist before the task finishes.
assert!(
env.access_layer
.object_store()
.exists(&puffin_path)
.await
.unwrap()
);
}
// The task should finish successfully.
assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
// The index file should exist after the task finishes.
assert!(
env.access_layer
.object_store()
.exists(&puffin_path)
.await
.unwrap()
);
}
#[tokio::test]
async fn test_index_build_task_build_mode() {
schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
}
#[tokio::test]
async fn test_index_build_task_no_index() {
let env = SchedulerEnv::new().await;
let mut scheduler = env.mock_index_build_scheduler();
let mut metadata = sst_region_metadata();
// Unset indexes in metadata to simulate no index scenario.
metadata.column_metadatas.iter_mut().for_each(|col| {
col.column_schema.set_inverted_index(false);
let _ = col.column_schema.unset_skipping_options();
});
let region_id = metadata.region_id;
let metadata = Arc::new(metadata);
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
let file_purger = Arc::new(NoopFilePurger {});
let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
let file_meta = FileMeta {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
..Default::default()
};
let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
let version_control =
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
// Create mock task.
let (tx, mut rx) = mpsc::channel(4);
let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
let task = IndexBuildTask {
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
manifest_ctx,
write_cache: None,
file_purger,
indexer_builder,
request_sender: tx,
result_sender: Some(result_tx),
};
scheduler.schedule_build(&version_control, task).unwrap();
// The task should finish successfully.
assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
// No index is built, so no notification should be sent to the worker.
let _ = rx.recv().await.is_none();
}
#[tokio::test]
async fn test_index_build_task_with_write_cache() {
let env = SchedulerEnv::new().await;
let mut scheduler = env.mock_index_build_scheduler();
let metadata = Arc::new(sst_region_metadata());
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
let file_purger = Arc::new(NoopFilePurger {});
let region_id = metadata.region_id;
let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
// Create mock write cache
let write_cache = Arc::new(
WriteCache::new_fs(
dir.path().to_str().unwrap(),
ReadableSize::mb(10),
None,
factory,
intm_manager,
)
.await
.unwrap(),
);
// Indexer builder built from write cache.
let indexer_builder = Arc::new(IndexerBuilderImpl {
build_type: IndexBuildType::Flush,
metadata: metadata.clone(),
row_group_size: 1024,
puffin_manager: write_cache.build_puffin_manager().clone(),
intermediate_manager: write_cache.intermediate_manager().clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
bloom_filter_index_config: BloomFilterConfig::default(),
});
let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
let file_meta = FileMeta {
region_id,
file_id: sst_info.file_id,
file_size: sst_info.file_size,
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
..Default::default()
};
let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
let version_control =
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
// Create mock task.
let (tx, mut _rx) = mpsc::channel(4);
let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
let task = IndexBuildTask {
file_meta: file_meta.clone(),
reason: IndexBuildType::Flush,
access_layer: env.access_layer.clone(),
manifest_ctx,
write_cache: Some(write_cache.clone()),
file_purger,
indexer_builder,
request_sender: tx,
result_sender: Some(result_tx),
};
scheduler.schedule_build(&version_control, task).unwrap();
// The task should finish successfully.
assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
// The write cache should contain the uploaded index file.
let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
assert!(write_cache.file_cache().contains_key(&index_key));
}
}

View File

@@ -67,7 +67,7 @@ impl Default for WriteOptions {
}
/// Parquet SST info returned by the writer.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct SstInfo {
/// SST file id.
pub file_id: FileId,
@@ -110,17 +110,16 @@ mod tests {
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use super::*;
use crate::access_layer::{
FilePathProvider, Metrics, OperationType, RegionFilePathFactory, WriteType,
};
use crate::access_layer::{FilePathProvider, Metrics, RegionFilePathFactory, WriteType};
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::config::IndexConfig;
use crate::read::{BatchBuilder, BatchReader, FlatSource};
use crate::region::options::{IndexOptions, InvertedIndexOptions};
use crate::sst::file::{FileHandle, FileMeta, RegionFileId};
use crate::sst::file_purger::NoopFilePurger;
use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::{Indexer, IndexerBuilder, IndexerBuilderImpl};
use crate::sst::index::{IndexBuildType, Indexer, IndexerBuilder, IndexerBuilderImpl};
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::reader::{ParquetReader, ParquetReaderBuilder, ReaderMetrics};
use crate::sst::parquet::writer::ParquetWriter;
@@ -183,6 +182,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
file_path,
Metrics::new(WriteType::Flush),
@@ -244,6 +244,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
FixedPathProvider {
region_file_id: handle.file_id(),
@@ -329,6 +330,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
FixedPathProvider {
region_file_id: handle.file_id(),
@@ -377,6 +379,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
FixedPathProvider {
region_file_id: handle.file_id(),
@@ -435,6 +438,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
FixedPathProvider {
region_file_id: handle.file_id(),
@@ -478,6 +482,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
FixedPathProvider {
region_file_id: handle.file_id(),
@@ -635,6 +640,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
path_provider,
Metrics::new(WriteType::Flush),
@@ -692,7 +698,7 @@ mod tests {
let intermediate_manager = env.get_intermediate_manager();
let indexer_builder = IndexerBuilderImpl {
op_type: OperationType::Flush,
build_type: IndexBuildType::Flush,
metadata: metadata.clone(),
row_group_size,
puffin_manager,
@@ -711,6 +717,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
indexer_builder,
file_path.clone(),
Metrics::new(WriteType::Flush),
@@ -1066,7 +1073,7 @@ mod tests {
let intermediate_manager = env.get_intermediate_manager();
let indexer_builder = IndexerBuilderImpl {
op_type: OperationType::Flush,
build_type: IndexBuildType::Flush,
metadata: metadata.clone(),
row_group_size,
puffin_manager,
@@ -1085,6 +1092,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
indexer_builder,
file_path.clone(),
Metrics::new(WriteType::Flush),
@@ -1140,6 +1148,7 @@ mod tests {
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
IndexConfig::default(),
NoopIndexBuilder,
file_path,
Metrics::new(WriteType::Flush),

View File

@@ -46,12 +46,13 @@ use tokio::io::AsyncWrite;
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
use crate::config::{IndexBuildMode, IndexConfig};
use crate::error::{
InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
};
use crate::read::{Batch, FlatSource, Source};
use crate::sst::file::RegionFileId;
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
use crate::sst::parquet::format::PrimaryKeyWriteFormat;
use crate::sst::parquet::helper::parse_parquet_metadata;
@@ -68,6 +69,8 @@ pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvide
writer_factory: F,
/// Region metadata of the source and the target SST.
metadata: RegionMetadataRef,
/// Global index config.
index_config: IndexConfig,
/// Indexer build that can create indexer for multiple files.
indexer_builder: I,
/// Current active indexer.
@@ -110,6 +113,7 @@ where
pub async fn new_with_object_store(
object_store: ObjectStore,
metadata: RegionMetadataRef,
index_config: IndexConfig,
indexer_builder: I,
path_provider: P,
metrics: Metrics,
@@ -117,6 +121,7 @@ where
ParquetWriter::new(
ObjectStoreWriterFactory { object_store },
metadata,
index_config,
indexer_builder,
path_provider,
metrics,
@@ -140,6 +145,7 @@ where
pub async fn new(
factory: F,
metadata: RegionMetadataRef,
index_config: IndexConfig,
indexer_builder: I,
path_provider: P,
metrics: Metrics,
@@ -153,6 +159,7 @@ where
current_file: init_file,
writer_factory: factory,
metadata,
index_config,
indexer_builder,
current_indexer: Some(indexer),
bytes_written: Arc::new(AtomicUsize::new(0)),
@@ -182,7 +189,18 @@ where
// Finish indexer and writer.
// safety: writer and index can only be both present or not.
let index_output = self.current_indexer.as_mut().unwrap().finish().await;
let mut index_output = IndexOutput::default();
match self.index_config.build_mode {
IndexBuildMode::Sync => {
index_output = self.current_indexer.as_mut().unwrap().finish().await;
}
IndexBuildMode::Async => {
debug!(
"Index for file {} will be built asynchronously later",
self.current_file
);
}
}
current_writer.flush().await.context(WriteParquetSnafu)?;
let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
@@ -252,11 +270,16 @@ where
stats.update(&batch);
let start = Instant::now();
// safety: self.current_indexer must be set when first batch has been written.
self.current_indexer
.as_mut()
.unwrap()
.update(&mut batch)
.await;
match self.index_config.build_mode {
IndexBuildMode::Sync => {
self.current_indexer
.as_mut()
.unwrap()
.update(&mut batch)
.await;
}
IndexBuildMode::Async => {}
}
self.metrics.update_index += start.elapsed();
if let Some(max_file_size) = opts.max_file_size
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size

View File

@@ -45,7 +45,8 @@ impl SstVersion {
&self.levels
}
/// Add files to the version.
/// Add files to the version. If a file with the same `file_id` already exists,
/// it will be overwritten with the new file.
///
/// # Panics
/// Panics if level of [FileMeta] is greater than [MAX_LEVEL].
@@ -58,8 +59,7 @@ impl SstVersion {
let level = file.level;
self.levels[level as usize]
.files
.entry(file.file_id)
.or_insert_with(|| FileHandle::new(file, file_purger.clone()));
.insert(file.file_id, FileHandle::new(file, file_purger.clone()));
}
}

View File

@@ -35,6 +35,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::region::{ManifestContext, ManifestContextRef, RegionLeaderState, RegionRoleState};
use crate::request::WorkerRequestWithTime;
use crate::schedule::scheduler::{Job, LocalScheduler, Scheduler, SchedulerRef};
use crate::sst::index::IndexBuildScheduler;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::worker::WorkerListener;
@@ -42,7 +43,7 @@ use crate::worker::WorkerListener;
/// Scheduler mocker.
pub(crate) struct SchedulerEnv {
#[allow(unused)]
path: TempDir,
pub(crate) path: TempDir,
/// Mock access layer for test.
pub(crate) access_layer: AccessLayerRef,
scheduler: Option<SchedulerRef>,
@@ -108,6 +109,13 @@ impl SchedulerEnv {
FlushScheduler::new(scheduler)
}
/// Creates a new index build scheduler.
pub(crate) fn mock_index_build_scheduler(&self) -> IndexBuildScheduler {
let scheduler = self.get_scheduler();
IndexBuildScheduler::new(scheduler)
}
/// Creates a new manifest context.
pub(crate) async fn mock_manifest_context(
&self,

View File

@@ -24,6 +24,7 @@ mod handle_drop;
mod handle_flush;
mod handle_manifest;
mod handle_open;
mod handle_rebuild_index;
mod handle_truncate;
mod handle_write;
@@ -67,6 +68,7 @@ use crate::request::{
};
use crate::schedule::scheduler::{LocalScheduler, SchedulerRef};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::IndexBuildScheduler;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
@@ -127,6 +129,8 @@ pub(crate) struct WorkerGroup {
flush_job_pool: SchedulerRef,
/// Compaction background job pool.
compact_job_pool: SchedulerRef,
/// Scheduler for index build jobs.
index_build_job_pool: SchedulerRef,
/// Scheduler for file purgers.
purge_scheduler: SchedulerRef,
/// Cache.
@@ -163,6 +167,8 @@ impl WorkerGroup {
let intermediate_manager = IntermediateManager::init_fs(&config.index.aux_path)
.await?
.with_buffer_size(Some(config.index.write_buffer_size.as_bytes() as _));
let index_build_job_pool =
Arc::new(LocalScheduler::new(config.max_background_index_builds));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
@@ -198,6 +204,7 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
index_build_job_pool: index_build_job_pool.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
@@ -222,6 +229,7 @@ impl WorkerGroup {
workers,
flush_job_pool,
compact_job_pool,
index_build_job_pool,
purge_scheduler,
cache_manager,
file_ref_manager,
@@ -239,6 +247,8 @@ impl WorkerGroup {
self.flush_job_pool.stop(true).await?;
// Stops the purge scheduler gracefully.
self.purge_scheduler.stop(true).await?;
// Stops the index build job pool gracefully.
self.index_build_job_pool.stop(true).await?;
try_join_all(self.workers.iter().map(|worker| worker.stop())).await?;
@@ -319,6 +329,8 @@ impl WorkerGroup {
.with_notifier(flush_sender.clone()),
)
});
let index_build_job_pool =
Arc::new(LocalScheduler::new(config.max_background_index_builds));
let flush_job_pool = Arc::new(LocalScheduler::new(config.max_background_flushes));
let compact_job_pool = Arc::new(LocalScheduler::new(config.max_background_compactions));
let flush_semaphore = Arc::new(Semaphore::new(config.max_background_flushes));
@@ -356,6 +368,7 @@ impl WorkerGroup {
log_store: log_store.clone(),
object_store_manager: object_store_manager.clone(),
write_buffer_manager: write_buffer_manager.clone(),
index_build_job_pool: index_build_job_pool.clone(),
flush_job_pool: flush_job_pool.clone(),
compact_job_pool: compact_job_pool.clone(),
purge_scheduler: purge_scheduler.clone(),
@@ -380,6 +393,7 @@ impl WorkerGroup {
workers,
flush_job_pool,
compact_job_pool,
index_build_job_pool,
purge_scheduler,
cache_manager,
file_ref_manager,
@@ -437,6 +451,7 @@ struct WorkerStarter<S> {
object_store_manager: ObjectStoreManagerRef,
write_buffer_manager: WriteBufferManagerRef,
compact_job_pool: SchedulerRef,
index_build_job_pool: SchedulerRef,
flush_job_pool: SchedulerRef,
purge_scheduler: SchedulerRef,
listener: WorkerListener,
@@ -482,6 +497,7 @@ impl<S: LogStore> WorkerStarter<S> {
),
purge_scheduler: self.purge_scheduler.clone(),
write_buffer_manager: self.write_buffer_manager,
index_build_scheduler: IndexBuildScheduler::new(self.index_build_job_pool),
flush_scheduler: FlushScheduler::new(self.flush_job_pool),
compaction_scheduler: CompactionScheduler::new(
self.compact_job_pool,
@@ -725,6 +741,8 @@ struct RegionWorkerLoop<S> {
purge_scheduler: SchedulerRef,
/// Engine write buffer manager.
write_buffer_manager: WriteBufferManagerRef,
/// Scheduler for index build task.
index_build_scheduler: IndexBuildScheduler,
/// Schedules background flush requests.
flush_scheduler: FlushScheduler,
/// Scheduler for compaction tasks.
@@ -913,6 +931,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
WorkerRequest::EditRegion(request) => {
self.handle_region_edit(request).await;
}
WorkerRequest::BuildIndexRegion(request) => {
self.handle_rebuild_index(request).await;
}
WorkerRequest::Stop => {
debug_assert!(!self.running.load(Ordering::Relaxed));
}
@@ -1021,6 +1042,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.handle_flush_finished(region_id, req).await
}
BackgroundNotify::FlushFailed(req) => self.handle_flush_failed(region_id, req).await,
BackgroundNotify::IndexBuildFinished(req) => {
self.handle_index_build_finished(region_id, req).await
}
BackgroundNotify::IndexBuildFailed(req) => {
self.handle_index_build_failed(region_id, req).await
}
BackgroundNotify::CompactionFinished(req) => {
self.handle_compaction_finished(region_id, req).await
}

View File

@@ -0,0 +1,149 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Handles index build requests.
use std::collections::HashMap;
use std::sync::Arc;
use common_telemetry::{error, warn};
use store_api::storage::{FileId, RegionId};
use tokio::sync::oneshot;
use crate::region::MitoRegionRef;
use crate::request::{IndexBuildFailed, IndexBuildFinished, RegionBuildIndexRequest};
use crate::sst::file::FileHandle;
use crate::sst::index::{IndexBuildOutcome, IndexBuildTask, IndexBuildType, IndexerBuilderImpl};
use crate::sst::parquet::WriteOptions;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
pub(crate) fn new_index_build_task(
&self,
region: &MitoRegionRef,
file: FileHandle,
build_type: IndexBuildType,
result_sender: Option<oneshot::Sender<IndexBuildOutcome>>,
) -> IndexBuildTask {
let version = region.version();
let access_layer = region.access_layer.clone();
let puffin_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
write_cache.build_puffin_manager()
} else {
access_layer.build_puffin_manager()
};
let intermediate_manager = if let Some(write_cache) = self.cache_manager.write_cache() {
write_cache.intermediate_manager().clone()
} else {
access_layer.intermediate_manager().clone()
};
let indexer_builder_ref = Arc::new(IndexerBuilderImpl {
build_type: build_type.clone(),
metadata: version.metadata.clone(),
inverted_index_config: self.config.inverted_index.clone(),
fulltext_index_config: self.config.fulltext_index.clone(),
bloom_filter_index_config: self.config.bloom_filter_index.clone(),
index_options: version.options.index_options.clone(),
row_group_size: WriteOptions::default().row_group_size,
intermediate_manager,
puffin_manager,
});
IndexBuildTask {
file_meta: file.meta_ref().clone(),
reason: build_type,
access_layer: access_layer.clone(),
manifest_ctx: region.manifest_ctx.clone(),
write_cache: self.cache_manager.write_cache().cloned(),
file_purger: file.file_purger(),
request_sender: self.sender.clone(),
indexer_builder: indexer_builder_ref.clone(),
result_sender,
}
}
pub(crate) async fn handle_rebuild_index(&mut self, request: RegionBuildIndexRequest) {
let region_id = request.region_id;
let Some(region) = self.regions.get_region(region_id) else {
return;
};
let version_control = region.version_control.clone();
let version = version_control.current().version;
let all_files: HashMap<FileId, FileHandle> = version
.ssts
.levels()
.iter()
.flat_map(|level| level.files.iter())
.filter(|(_, handle)| !handle.is_deleted() && !handle.compacting())
.map(|(id, handle)| (*id, handle.clone()))
.collect();
let build_tasks = if request.file_metas.is_empty() {
// NOTE: Currently, rebuilding the index will reconstruct the index for all
// files in the region, which is a simplified approach and is not yet available for
// production use; further optimization is required.
all_files.values().cloned().collect::<Vec<_>>()
} else {
request
.file_metas
.iter()
.filter_map(|meta| all_files.get(&meta.file_id).cloned())
.collect::<Vec<_>>()
};
for file_handle in build_tasks {
let task =
self.new_index_build_task(&region, file_handle, request.build_type.clone(), None);
let _ = self
.index_build_scheduler
.schedule_build(&region.version_control, task);
}
}
pub(crate) async fn handle_index_build_finished(
&mut self,
region_id: RegionId,
request: IndexBuildFinished,
) {
let region = match self.regions.get_region(region_id) {
Some(region) => region,
None => {
warn!(
"Region not found for index build finished, region_id: {}",
region_id
);
return;
}
};
region.version_control.apply_edit(
Some(request.edit.clone()),
&[],
region.file_purger.clone(),
);
}
pub(crate) async fn handle_index_build_failed(
&mut self,
region_id: RegionId,
request: IndexBuildFailed,
) {
error!(request.err; "Index build failed for region: {}", region_id);
// TODO(SNC123): Implement error handling logic after IndexBuildScheduler optimization.
}
}

View File

@@ -1508,6 +1508,7 @@ enable_experimental_flat_format = false
aux_path = ""
staging_size = "2GiB"
staging_ttl = "7days"
build_mode = "sync"
write_buffer_size = "8MiB"
content_cache_page_size = "64KiB"
@@ -1591,6 +1592,7 @@ fn drop_lines_with_inconsistent_results(input: String) -> String {
"result_cache_size =",
"name =",
"recovery_parallelism =",
"max_background_index_builds =",
"max_background_flushes =",
"max_background_compactions =",
"max_background_purges =",