refactor(inverted_index): integrate puffin manager with sst indexer (#4285)

* refactor(puffin): adjust generic parameters

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor(inverted_index): integrate puffin manager for build

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* Revert "refactor(puffin): adjust generic parameters"

This reverts commit 81ea1b6ee4.

* fix: column_ids remove ignore columns

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: remove with_ignore_column_ids

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* docs: add comments for IndexOutput

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* tiny fix

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* config: hide compress

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: index_size > 0 indicates index available

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* perf: reduce to_string`

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* fix: clippy

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: address comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: address comment

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-07-05 15:12:50 +08:00
committed by GitHub
parent 8a119aa0b2
commit f71b7b997d
32 changed files with 646 additions and 352 deletions

View File

@@ -22,6 +22,7 @@ use store_api::metadata::RegionMetadataRef;
use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::config::InvertedIndexConfig;
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::region::options::IndexOptions;
@@ -141,19 +142,20 @@ impl AccessLayer {
.await?
} else {
// Write cache is disabled.
let store = self.object_store.clone();
let indexer = IndexerBuilder {
create_inverted_index: request.create_inverted_index,
mem_threshold_index_create: request.mem_threshold_index_create,
write_buffer_size: request.index_write_buffer_size,
op_type: request.op_type,
file_id,
file_path: index_file_path,
metadata: &request.metadata,
row_group_size: write_opts.row_group_size,
object_store: self.object_store.clone(),
puffin_manager: self.puffin_manager_factory.build(store),
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
}
.build();
.build()
.await;
let mut writer = ParquetWriter::new_with_object_store(
self.object_store.clone(),
file_path,
@@ -182,22 +184,26 @@ impl AccessLayer {
}
}
/// `OperationType` represents the origin of the `SstWriteRequest`.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub(crate) enum OperationType {
Flush,
Compact,
}
/// Contents to build a SST.
pub(crate) struct SstWriteRequest {
pub(crate) op_type: OperationType,
pub(crate) file_id: FileId,
pub(crate) metadata: RegionMetadataRef,
pub(crate) source: Source,
pub(crate) cache_manager: CacheManagerRef,
#[allow(dead_code)]
pub(crate) storage: Option<String>,
/// Whether to create inverted index.
pub(crate) create_inverted_index: bool,
/// The threshold of memory size to create inverted index.
pub(crate) mem_threshold_index_create: Option<usize>,
/// The size of write buffer for index.
pub(crate) index_write_buffer_size: Option<usize>,
/// The options of the index for the region.
/// Configs for index
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
}
/// Creates a fs object store with atomic write dir.

View File

@@ -29,6 +29,7 @@ use crate::cache::file_cache::{FileCache, FileCacheRef, FileType, IndexKey, Inde
use crate::error::{self, Result};
use crate::metrics::{FLUSH_ELAPSED, UPLOAD_BYTES_TOTAL};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::index::IndexerBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::parquet::{SstInfo, WriteOptions};
@@ -44,7 +45,9 @@ pub struct WriteCache {
#[allow(unused)]
/// TODO: Remove unused after implementing async write cache
object_store_manager: ObjectStoreManagerRef,
/// Intermediate manager for inverted index.
/// Puffin manager factory for index.
puffin_manager_factory: PuffinManagerFactory,
/// Intermediate manager for index.
intermediate_manager: IntermediateManager,
}
@@ -58,6 +61,7 @@ impl WriteCache {
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
let file_cache = FileCache::new(local_store, cache_capacity, ttl);
@@ -66,6 +70,7 @@ impl WriteCache {
Ok(Self {
file_cache: Arc::new(file_cache),
object_store_manager,
puffin_manager_factory,
intermediate_manager,
})
}
@@ -76,6 +81,7 @@ impl WriteCache {
object_store_manager: ObjectStoreManagerRef,
cache_capacity: ReadableSize,
ttl: Option<Duration>,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Self> {
info!("Init write cache on {cache_dir}, capacity: {cache_capacity}");
@@ -86,6 +92,7 @@ impl WriteCache {
object_store_manager,
cache_capacity,
ttl,
puffin_manager_factory,
intermediate_manager,
)
.await
@@ -112,19 +119,20 @@ impl WriteCache {
let parquet_key = IndexKey::new(region_id, file_id, FileType::Parquet);
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let store = self.file_cache.local_store();
let indexer = IndexerBuilder {
create_inverted_index: write_request.create_inverted_index,
mem_threshold_index_create: write_request.mem_threshold_index_create,
write_buffer_size: write_request.index_write_buffer_size,
op_type: write_request.op_type,
file_id,
file_path: self.file_cache.cache_file_path(puffin_key),
metadata: &write_request.metadata,
row_group_size: write_opts.row_group_size,
object_store: self.file_cache.local_store(),
puffin_manager: self.puffin_manager_factory.build(store),
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
}
.build();
.build()
.await;
// Write to FileCache.
let mut writer = ParquetWriter::new_with_object_store(
@@ -148,7 +156,7 @@ impl WriteCache {
let remote_store = &upload_request.remote_store;
self.upload(parquet_key, parquet_path, remote_store).await?;
if sst_info.inverted_index_available {
if sst_info.index_metadata.file_size > 0 {
let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
let puffin_path = &upload_request.index_upload_path;
self.upload(puffin_key, puffin_path, remote_store).await?;
@@ -251,6 +259,7 @@ mod tests {
use common_test_util::temp_dir::create_temp_dir;
use super::*;
use crate::access_layer::OperationType;
use crate::cache::test_util::new_fs_store;
use crate::cache::CacheManager;
use crate::region::options::IndexOptions;
@@ -290,15 +299,14 @@ mod tests {
]);
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
file_id,
metadata,
source,
storage: None,
create_inverted_index: true,
mem_threshold_index_create: None,
index_write_buffer_size: None,
cache_manager: Default::default(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
};
let upload_request = SstUploadRequest {
@@ -375,15 +383,14 @@ mod tests {
// Write to local cache and upload sst to mock remote store
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
file_id,
metadata,
source,
storage: None,
create_inverted_index: false,
mem_threshold_index_create: None,
index_write_buffer_size: None,
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,

View File

@@ -24,7 +24,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::access_layer::{AccessLayer, AccessLayerRef, SstWriteRequest};
use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
use crate::cache::{CacheManager, CacheManagerRef};
use crate::compaction::picker::{new_picker, PickerOutput};
use crate::compaction::CompactionSstReaderBuilder;
@@ -260,23 +260,6 @@ impl Compactor for DefaultCompactor {
write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
..Default::default()
};
let create_inverted_index = compaction_region
.engine_config
.inverted_index
.create_on_compaction
.auto();
let mem_threshold_index_create = compaction_region
.engine_config
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let index_write_buffer_size = Some(
compaction_region
.engine_config
.index
.write_buffer_size
.as_bytes() as usize,
);
let region_metadata = compaction_region.region_metadata.clone();
let sst_layer = compaction_region.access_layer.clone();
@@ -291,6 +274,7 @@ impl Compactor for DefaultCompactor {
.clone();
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
futs.push(async move {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
@@ -307,15 +291,14 @@ impl Compactor for DefaultCompactor {
let file_meta_opt = sst_layer
.write_sst(
SstWriteRequest {
op_type: OperationType::Compact,
file_id,
metadata: region_metadata,
source: Source::Reader(reader),
cache_manager,
storage,
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options,
inverted_index_config,
},
&write_opts,
)
@@ -326,11 +309,14 @@ impl Compactor for DefaultCompactor {
time_range: sst_info.time_range,
level: output.output_level,
file_size: sst_info.file_size,
available_indexes: sst_info
.inverted_index_available
.then(|| SmallVec::from_iter([IndexType::InvertedIndex]))
.unwrap_or_default(),
index_file_size: sst_info.index_file_size,
available_indexes: {
let mut indexes = SmallVec::new();
if sst_info.index_metadata.inverted_index.is_available() {
indexes.push(IndexType::InvertedIndex);
}
indexes
},
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
});

View File

@@ -354,6 +354,9 @@ pub struct InvertedIndexConfig {
#[serde_as(as = "NoneAsEmptyString")]
pub mem_threshold_on_create: Option<ReadableSize>,
/// Whether to compress the index data.
pub compress: bool,
#[deprecated = "use [IndexConfig::aux_path] instead"]
#[serde(skip_serializing)]
pub intermediate_path: String,
@@ -370,8 +373,10 @@ impl Default for InvertedIndexConfig {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
write_buffer_size: ReadableSize::mb(8),
compress: true,
mem_threshold_on_create: Some(ReadableSize::mb(64)),
write_buffer_size: ReadableSize::mb(8),
intermediate_path: String::new(),
}
}

View File

@@ -580,7 +580,7 @@ async fn test_region_usage() {
flush_region(&engine, region_id, None).await;
let region_stat = region.region_usage();
assert_eq!(region_stat.sst_usage, 3010);
assert_eq!(region_stat.sst_usage, 3026);
// region total usage
// Some memtables may share items.

View File

@@ -597,13 +597,6 @@ pub enum Error {
location: Location,
},
#[snafu(display("Failed to write puffin completely"))]
PuffinFinish {
source: puffin::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to add blob to puffin file"))]
PuffinAddBlob {
source: puffin::error::Error,
@@ -891,7 +884,6 @@ impl ErrorExt for Error {
| IndexFinish { source, .. } => source.status_code(),
PuffinReadMetadata { source, .. }
| PuffinReadBlob { source, .. }
| PuffinFinish { source, .. }
| PuffinAddBlob { source, .. }
| PuffinInitStager { source, .. }
| PuffinBuildReader { source, .. } => source.status_code(),

View File

@@ -25,7 +25,7 @@ use store_api::storage::RegionId;
use strum::IntoStaticStr;
use tokio::sync::mpsc;
use crate::access_layer::{AccessLayerRef, SstWriteRequest};
use crate::access_layer::{AccessLayerRef, OperationType, SstWriteRequest};
use crate::cache::CacheManagerRef;
use crate::config::MitoConfig;
use crate::error::{
@@ -321,26 +321,17 @@ impl RegionFlushTask {
let file_id = FileId::random();
let iter = mem.iter(None, None)?;
let source = Source::Iter(iter);
let create_inverted_index = self.engine_config.inverted_index.create_on_flush.auto();
let mem_threshold_index_create = self
.engine_config
.inverted_index
.mem_threshold_on_create
.map(|m| m.as_bytes() as _);
let index_write_buffer_size =
Some(self.engine_config.index.write_buffer_size.as_bytes() as usize);
// Flush to level 0.
let write_request = SstWriteRequest {
op_type: OperationType::Flush,
file_id,
metadata: version.metadata.clone(),
source,
cache_manager: self.cache_manager.clone(),
storage: version.options.storage.clone(),
create_inverted_index,
mem_threshold_index_create,
index_write_buffer_size,
index_options: self.index_options.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
};
let Some(sst_info) = self
.access_layer
@@ -358,11 +349,14 @@ impl RegionFlushTask {
time_range: sst_info.time_range,
level: 0,
file_size: sst_info.file_size,
available_indexes: sst_info
.inverted_index_available
.then(|| SmallVec::from_iter([IndexType::InvertedIndex]))
.unwrap_or_default(),
index_file_size: sst_info.index_file_size,
available_indexes: {
let mut indexes = SmallVec::new();
if sst_info.index_metadata.inverted_index.is_available() {
indexes.push(IndexType::InvertedIndex);
}
indexes
},
index_file_size: sst_info.index_metadata.file_size,
num_rows: sst_info.num_rows as u64,
num_row_groups: sst_info.num_row_groups,
};

View File

@@ -45,8 +45,8 @@ use crate::read::{Batch, Source};
use crate::region::options::MergeMode;
use crate::region::version::VersionRef;
use crate::sst::file::{overlaps, FileHandle, FileMeta};
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::SstIndexApplierRef;
use crate::sst::parquet::file_range::FileRange;
/// A scanner scans a region and returns a [SendableRecordBatchStream].

View File

@@ -12,151 +12,151 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod applier;
mod codec;
pub(crate) mod creator;
mod indexer;
pub(crate) mod intermediate;
pub(crate) mod inverted_index;
pub(crate) mod puffin_manager;
mod statistics;
mod store;
use std::num::NonZeroUsize;
use common_telemetry::{debug, warn};
use creator::SstIndexCreator;
use object_store::ObjectStore;
use puffin::puffin_manager::PuffinManager;
use puffin_manager::{SstPuffinManager, SstPuffinWriter};
use statistics::{ByteCount, RowCount};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::OperationType;
use crate::config::InvertedIndexConfig;
use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
use crate::read::Batch;
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer;
const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";
/// Output of the index creation.
#[derive(Debug, Clone, Default)]
pub struct IndexOutput {
/// Size of the file.
pub file_size: u64,
/// Inverted index output.
pub inverted_index: InvertedIndexOutput,
}
/// Output of the inverted index creation.
#[derive(Debug, Clone, Default)]
pub struct InvertedIndexOutput {
/// Size of the index.
pub index_size: ByteCount,
/// Number of rows in the index.
pub row_count: RowCount,
/// Available columns in the index.
pub columns: Vec<ColumnId>,
}
impl InvertedIndexOutput {
pub fn is_available(&self) -> bool {
self.index_size > 0
}
}
/// The index creator that hides the error handling details.
#[derive(Default)]
pub struct Indexer {
file_id: FileId,
region_id: RegionId,
inner: Option<SstIndexCreator>,
last_memory_usage: usize,
inverted_indexer: Option<InvertedIndexer>,
puffin_writer: Option<SstPuffinWriter>,
}
impl Indexer {
/// Update the index with the given batch.
/// Updates the index with the given batch.
pub async fn update(&mut self, batch: &Batch) {
if let Some(creator) = self.inner.as_mut() {
if let Err(err) = creator.update(batch).await {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to update index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to update index, skip creating index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
self.do_update(batch).await;
// Skip index creation if error occurs.
self.inner = None;
}
}
if let Some(creator) = self.inner.as_ref() {
let memory_usage = creator.memory_usage();
INDEX_CREATE_MEMORY_USAGE.add(memory_usage as i64 - self.last_memory_usage as i64);
self.last_memory_usage = memory_usage;
} else {
INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64);
self.last_memory_usage = 0;
}
let memory_usage = self.memory_usage();
INDEX_CREATE_MEMORY_USAGE.add(memory_usage as i64 - self.last_memory_usage as i64);
self.last_memory_usage = memory_usage;
}
/// Finish the index creation.
/// Returns the number of bytes written if success or None if failed.
pub async fn finish(&mut self) -> Option<u64> {
if let Some(mut creator) = self.inner.take() {
match creator.finish().await {
Ok((row_count, byte_count)) => {
debug!(
"Create index successfully, region_id: {}, file_id: {}, bytes: {}, rows: {}",
self.region_id, self.file_id, byte_count, row_count
);
INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64);
self.last_memory_usage = 0;
return Some(byte_count);
}
Err(err) => {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to create index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
}
}
}
/// Finalizes the index creation.
pub async fn finish(&mut self) -> IndexOutput {
INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64);
self.last_memory_usage = 0;
None
self.do_finish().await
}
/// Abort the index creation.
/// Aborts the index creation.
pub async fn abort(&mut self) {
if let Some(mut creator) = self.inner.take() {
if let Err(err) = creator.abort().await {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to abort index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to abort index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
}
}
INDEX_CREATE_MEMORY_USAGE.sub(self.last_memory_usage as i64);
self.last_memory_usage = 0;
self.do_abort().await;
}
fn memory_usage(&self) -> usize {
self.inverted_indexer
.as_ref()
.map_or(0, |creator| creator.memory_usage())
}
}
pub(crate) struct IndexerBuilder<'a> {
pub(crate) create_inverted_index: bool,
pub(crate) mem_threshold_index_create: Option<usize>,
pub(crate) write_buffer_size: Option<usize>,
pub(crate) op_type: OperationType,
pub(crate) file_id: FileId,
pub(crate) file_path: String,
pub(crate) metadata: &'a RegionMetadataRef,
pub(crate) row_group_size: usize,
pub(crate) object_store: ObjectStore,
pub(crate) puffin_manager: SstPuffinManager,
pub(crate) intermediate_manager: IntermediateManager,
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
}
impl<'a> IndexerBuilder<'a> {
/// Sanity check for arguments and create a new [Indexer]
/// with inner [SstIndexCreator] if arguments are valid.
pub(crate) fn build(self) -> Indexer {
if !self.create_inverted_index {
/// Sanity check for arguments and create a new [Indexer] if arguments are valid.
pub(crate) async fn build(self) -> Indexer {
let mut indexer = Indexer {
file_id: self.file_id,
region_id: self.metadata.region_id,
last_memory_usage: 0,
..Default::default()
};
indexer.inverted_indexer = self.build_inverted_indexer();
if indexer.inverted_indexer.is_none() {
indexer.abort().await;
return Indexer::default();
}
indexer.puffin_writer = self.build_puffin_writer().await;
if indexer.puffin_writer.is_none() {
indexer.abort().await;
return Indexer::default();
}
indexer
}
fn build_inverted_indexer(&self) -> Option<InvertedIndexer> {
let create = match self.op_type {
OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
};
if !create {
debug!(
"Skip creating index due to request, region_id: {}, file_id: {}",
"Skip creating inverted index due to config, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return Indexer::default();
return None;
}
if self.metadata.primary_key.is_empty() {
@@ -164,7 +164,7 @@ impl<'a> IndexerBuilder<'a> {
"No tag columns, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return Indexer::default();
return None;
}
let Some(mut segment_row_count) =
@@ -174,7 +174,7 @@ impl<'a> IndexerBuilder<'a> {
"Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return Indexer::default();
return None;
};
let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
@@ -182,7 +182,7 @@ impl<'a> IndexerBuilder<'a> {
"Row group size is 0, skip creating index, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return Indexer::default();
return None;
};
// if segment row count not aligned with row group size, adjust it to be aligned.
@@ -190,31 +190,43 @@ impl<'a> IndexerBuilder<'a> {
segment_row_count = row_group_size;
}
let creator = SstIndexCreator::new(
self.file_path,
let mem_threshold = self
.inverted_index_config
.mem_threshold_on_create
.map(|t| t.as_bytes() as usize);
let indexer = InvertedIndexer::new(
self.file_id,
self.metadata,
self.object_store,
self.intermediate_manager,
self.mem_threshold_index_create,
self.intermediate_manager.clone(),
mem_threshold,
segment_row_count,
)
.with_buffer_size(self.write_buffer_size)
.with_ignore_column_ids(
self.index_options
.inverted_index
.ignore_column_ids
.iter()
.map(|i| i.to_string())
.collect(),
self.inverted_index_config.compress,
&self.index_options.inverted_index.ignore_column_ids,
);
Indexer {
file_id: self.file_id,
region_id: self.metadata.region_id,
inner: Some(creator),
last_memory_usage: 0,
Some(indexer)
}
async fn build_puffin_writer(&self) -> Option<SstPuffinWriter> {
let err = match self.puffin_manager.writer(&self.file_path).await {
Ok(writer) => return Some(writer),
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create puffin writer, region_id: {}, file_id: {}, err: {}",
self.metadata.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to create puffin writer, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
}
None
}
}
@@ -226,9 +238,12 @@ mod tests {
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin_manager::PuffinManagerFactory;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use super::*;
use crate::config::Mode;
fn mock_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
@@ -291,83 +306,102 @@ mod tests {
IntermediateManager::new(mock_object_store())
}
#[test]
fn test_build_indexer_basic() {
#[tokio::test]
async fn test_build_indexer_basic() {
let (_d, factory) =
PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
let store = mock_object_store();
let puffin_manager = factory.build(store);
let metadata = mock_region_metadata();
let indexer = IndexerBuilder {
create_inverted_index: true,
mem_threshold_index_create: Some(1024),
write_buffer_size: None,
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
object_store: mock_object_store(),
puffin_manager,
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
}
.build();
.build()
.await;
assert!(indexer.inner.is_some());
assert!(indexer.inverted_indexer.is_some());
}
#[test]
fn test_build_indexer_disable_create() {
#[tokio::test]
async fn test_build_indexer_disable_create() {
let (_d, factory) =
PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
let store = mock_object_store();
let puffin_manager = factory.build(store);
let metadata = mock_region_metadata();
let indexer = IndexerBuilder {
create_inverted_index: false,
mem_threshold_index_create: Some(1024),
write_buffer_size: None,
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
object_store: mock_object_store(),
puffin_manager,
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig {
create_on_flush: Mode::Disable,
..Default::default()
},
}
.build();
.build()
.await;
assert!(indexer.inner.is_none());
assert!(indexer.inverted_indexer.is_none());
}
#[test]
fn test_build_indexer_no_tag() {
#[tokio::test]
async fn test_build_indexer_no_tag() {
let (_d, factory) =
PuffinManagerFactory::new_for_test_async("test_build_indexer_no_tag_").await;
let store = mock_object_store();
let puffin_manager = factory.build(store);
let metadata = no_tag_region_metadata();
let indexer = IndexerBuilder {
create_inverted_index: true,
mem_threshold_index_create: Some(1024),
write_buffer_size: None,
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
object_store: mock_object_store(),
puffin_manager,
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
}
.build();
.build()
.await;
assert!(indexer.inner.is_none());
assert!(indexer.inverted_indexer.is_none());
}
#[test]
fn test_build_indexer_zero_row_group() {
#[tokio::test]
async fn test_build_indexer_zero_row_group() {
let (_d, factory) =
PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
let store = mock_object_store();
let puffin_manager = factory.build(store);
let metadata = mock_region_metadata();
let indexer = IndexerBuilder {
create_inverted_index: true,
mem_threshold_index_create: Some(1024),
write_buffer_size: None,
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 0,
object_store: mock_object_store(),
puffin_manager,
intermediate_manager: mock_intm_mgr(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
}
.build();
.build()
.await;
assert!(indexer.inner.is_none());
assert!(indexer.inverted_indexer.is_none());
}
}

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod abort;
mod finish;
mod update;

View File

@@ -0,0 +1,69 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::warn;
use puffin::puffin_manager::PuffinWriter;
use crate::sst::index::Indexer;
impl Indexer {
pub(crate) async fn do_abort(&mut self) {
self.do_abort_inverted_index().await;
self.do_abort_puffin_writer().await;
}
async fn do_abort_inverted_index(&mut self) {
let Some(mut indexer) = self.inverted_indexer.take() else {
return;
};
let Err(err) = indexer.abort().await else {
return;
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to abort inverted index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to abort inverted index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
}
async fn do_abort_puffin_writer(&mut self) {
let Some(puffin_writer) = self.puffin_writer.take() else {
return;
};
let err = match puffin_writer.finish().await {
Ok(_) => return,
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to abort puffin writer, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to abort puffin writer, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
}
}

View File

@@ -0,0 +1,118 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{debug, warn};
use puffin::puffin_manager::PuffinWriter;
use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer;
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount};
use crate::sst::index::{IndexOutput, Indexer, InvertedIndexOutput};
impl Indexer {
pub(crate) async fn do_finish(&mut self) -> IndexOutput {
let mut output = IndexOutput::default();
let Some(mut writer) = self.puffin_writer.take() else {
return output;
};
let success = self
.do_finish_inverted_index(&mut writer, &mut output)
.await;
if !success {
self.do_abort().await;
return IndexOutput::default();
}
output.file_size = self.do_finish_puffin_writer(writer).await;
output
}
async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
let err = match writer.finish().await {
Ok(size) => return size,
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to finish puffin writer, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
0
}
/// Returns false if the finish failed.
async fn do_finish_inverted_index(
&mut self,
puffin_writer: &mut SstPuffinWriter,
index_output: &mut IndexOutput,
) -> bool {
let Some(mut indexer) = self.inverted_indexer.take() else {
return true;
};
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_inverted_index_output(
&mut index_output.inverted_index,
row_count,
byte_count,
&indexer,
);
return true;
}
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to finish inverted index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to finish inverted index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
fn fill_inverted_index_output(
&mut self,
output: &mut InvertedIndexOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &InvertedIndexer,
) {
debug!(
"Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
}
}

View File

@@ -0,0 +1,55 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::warn;
use crate::read::Batch;
use crate::sst::index::Indexer;
impl Indexer {
pub(crate) async fn do_update(&mut self, batch: &Batch) {
if batch.is_empty() {
return;
}
if !self.do_update_inverted_index(batch).await {
self.do_abort().await;
}
}
/// Returns false if the update failed.
async fn do_update_inverted_index(&mut self, batch: &Batch) -> bool {
let Some(creator) = self.inverted_indexer.as_mut() else {
return true;
};
let Err(err) = creator.update(batch).await else {
return true;
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to update inverted index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to update inverted index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
}

View File

@@ -0,0 +1,19 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod applier;
mod codec;
pub(crate) mod creator;
const INDEX_BLOB_TYPE: &str = "greptime-inverted-index-v1";

View File

@@ -30,8 +30,8 @@ use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::error::{ApplyIndexSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu, Result};
use crate::metrics::{INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE};
use crate::sst::file::FileId;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
use crate::sst::index::INDEX_BLOB_TYPE;
use crate::sst::location;
/// The [`SstIndexApplier`] is responsible for applying predicates to the provided SST files

View File

@@ -36,8 +36,8 @@ use store_api::storage::ColumnId;
use crate::cache::file_cache::FileCacheRef;
use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
use crate::row_converter::SortField;
use crate::sst::index::applier::SstIndexApplier;
use crate::sst::index::codec::IndexValueCodec;
use crate::sst::index::inverted_index::applier::SstIndexApplier;
use crate::sst::index::inverted_index::codec::IndexValueCodec;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
/// Constructs an [`SstIndexApplier`] which applies predicates to SST files during scan.

View File

@@ -16,7 +16,7 @@ use datafusion_expr::Between;
use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePredicate};
use crate::error::Result;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
/// Collects a `BETWEEN` expression in the form of `column BETWEEN lit AND lit`.
@@ -62,7 +62,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::sst::index::applier::builder::tests::{
use crate::sst::index::inverted_index::applier::builder::tests::{
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
test_object_store, test_region_metadata,
};

View File

@@ -17,7 +17,7 @@ use index::inverted_index::search::predicate::{Bound, Predicate, Range, RangePre
use index::inverted_index::Bytes;
use crate::error::Result;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
/// Collects a comparison expression in the form of
@@ -134,7 +134,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::sst::index::applier::builder::tests::{
use crate::sst::index::inverted_index::applier::builder::tests::{
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
test_object_store, test_region_metadata,
};

View File

@@ -20,7 +20,7 @@ use index::inverted_index::search::predicate::{InListPredicate, Predicate};
use index::inverted_index::Bytes;
use crate::error::Result;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
/// Collects an eq expression in the form of `column = lit`.
@@ -124,7 +124,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
mod tests {
use super::*;
use crate::error::Error;
use crate::sst::index::applier::builder::tests::{
use crate::sst::index::inverted_index::applier::builder::tests::{
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
tag_column2, test_object_store, test_region_metadata,
};

View File

@@ -18,7 +18,7 @@ use datafusion_expr::expr::InList;
use index::inverted_index::search::predicate::{InListPredicate, Predicate};
use crate::error::Result;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
/// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
@@ -55,7 +55,7 @@ impl<'a> SstIndexApplierBuilder<'a> {
mod tests {
use super::*;
use crate::error::Error;
use crate::sst::index::applier::builder::tests::{
use crate::sst::index::inverted_index::applier::builder::tests::{
encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
test_object_store, test_region_metadata,
};

View File

@@ -17,7 +17,7 @@ use datafusion_expr::Expr as DfExpr;
use index::inverted_index::search::predicate::{Predicate, RegexMatchPredicate};
use crate::error::Result;
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
impl<'a> SstIndexApplierBuilder<'a> {
/// Collects a regex match expression in the form of `column ~ pattern`.
@@ -49,7 +49,7 @@ mod tests {
use super::*;
use crate::error::Error;
use crate::sst::index::applier::builder::tests::{
use crate::sst::index::inverted_index::applier::builder::tests::{
field_column, int64_lit, nonexistent_column, string_lit, tag_column, test_object_store,
test_region_metadata,
};

View File

@@ -17,6 +17,7 @@ use datatypes::value::{Value, ValueRef};
use memcomparable::Serializer;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::ColumnMetadata;
use store_api::storage::ColumnId;
use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
@@ -57,12 +58,11 @@ impl IndexValueCodec {
}
}
pub(crate) type ColumnId = String;
/// Decodes primary key values into their corresponding column ids, data types and values.
pub struct IndexValuesCodec {
/// The tag column ids.
column_ids: Vec<ColumnId>,
/// Tuples containing column id and its corresponding index_name (result of `to_string` on ColumnId),
/// to minimize redundant `to_string` calls.
column_ids: Vec<(ColumnId, String)>,
/// The data types of tag columns.
fields: Vec<SortField>,
/// The decoder for the primary key.
@@ -75,7 +75,7 @@ impl IndexValuesCodec {
let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns
.map(|column| {
(
column.column_id.to_string(),
(column.column_id, column.column_id.to_string()),
SortField::new(column.column_schema.data_type.clone()),
)
})
@@ -93,7 +93,7 @@ impl IndexValuesCodec {
pub fn decode(
&self,
primary_key: &[u8],
) -> Result<impl Iterator<Item = (&ColumnId, &SortField, Option<Value>)>> {
) -> Result<impl Iterator<Item = (&(ColumnId, String), &SortField, Option<Value>)>> {
let values = self.decoder.decode(primary_key)?;
let iter = values
@@ -175,13 +175,15 @@ mod tests {
let codec = IndexValuesCodec::from_tag_columns(tag_columns.iter());
let mut iter = codec.decode(&primary_key).unwrap();
let (column_id, field, value) = iter.next().unwrap();
assert_eq!(column_id, "1");
let ((column_id, col_id_str), field, value) = iter.next().unwrap();
assert_eq!(*column_id, 1);
assert_eq!(col_id_str, "1");
assert_eq!(field, &SortField::new(ConcreteDataType::string_datatype()));
assert_eq!(value, None);
let (column_id, field, value) = iter.next().unwrap();
assert_eq!(column_id, "2");
let ((column_id, col_id_str), field, value) = iter.next().unwrap();
assert_eq!(*column_id, 2);
assert_eq!(col_id_str, "2");
assert_eq!(field, &SortField::new(ConcreteDataType::int64_datatype()));
assert_eq!(value, Some(Value::Int64(10)));

View File

@@ -12,10 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.
mod statistics;
mod temp_provider;
pub(crate) mod temp_provider;
use std::collections::{HashMap, HashSet};
use std::collections::HashSet;
use std::num::NonZeroUsize;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
@@ -25,28 +24,26 @@ use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
use index::inverted_index::format::writer::InvertedIndexBlobWriter;
use object_store::ObjectStore;
use puffin::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter};
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use crate::error::{
BiSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PuffinFinishSnafu,
PushIndexValueSnafu, Result,
};
use crate::metrics::{
INDEX_PUFFIN_FLUSH_OP_TOTAL, INDEX_PUFFIN_WRITE_BYTES_TOTAL, INDEX_PUFFIN_WRITE_OP_TOTAL,
BiSnafu, IndexFinishSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushIndexValueSnafu,
Result,
};
use crate::read::Batch;
use crate::sst::file::FileId;
use crate::sst::index::codec::{ColumnId, IndexValueCodec, IndexValuesCodec};
use crate::sst::index::creator::statistics::Statistics;
use crate::sst::index::creator::temp_provider::TempFileProvider;
use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager};
use crate::sst::index::store::InstrumentedStore;
use crate::sst::index::INDEX_BLOB_TYPE;
use crate::sst::index::inverted_index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::inverted_index::creator::temp_provider::TempFileProvider;
use crate::sst::index::inverted_index::INDEX_BLOB_TYPE;
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
/// The minimum memory usage threshold for one column.
const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
@@ -54,15 +51,8 @@ const MIN_MEMORY_USAGE_THRESHOLD_PER_COLUMN: usize = 1024 * 1024; // 1MB
/// The buffer size for the pipe used to send index data to the puffin blob.
const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
type ByteCount = u64;
type RowCount = usize;
/// Creates SST index.
pub struct SstIndexCreator {
/// Path of index file to write.
file_path: String,
/// The store to write index files.
store: InstrumentedStore,
/// The index creator.
index_creator: Box<dyn InvertedIndexCreator>,
/// The provider of intermediate files.
@@ -78,24 +68,27 @@ pub struct SstIndexCreator {
/// Whether the index creation is aborted.
aborted: bool,
/// Ignore column IDs for index creation.
ignore_column_ids: HashSet<ColumnId>,
/// The memory usage of the index creator.
memory_usage: Arc<AtomicUsize>,
/// Whether to compress the index data.
compress: bool,
/// Ids of indexed columns.
column_ids: HashSet<ColumnId>,
}
impl SstIndexCreator {
/// Creates a new `SstIndexCreator`.
/// Should ensure that the number of tag columns is greater than 0.
pub fn new(
file_path: String,
sst_file_id: FileId,
metadata: &RegionMetadataRef,
index_store: ObjectStore,
intermediate_manager: IntermediateManager,
memory_usage_threshold: Option<usize>,
segment_row_count: NonZeroUsize,
compress: bool,
ignore_column_ids: &[ColumnId],
) -> Self {
let temp_file_provider = Arc::new(TempFileProvider::new(
IntermediateLocation::new(&metadata.region_id, &sst_file_id),
@@ -113,35 +106,27 @@ impl SstIndexCreator {
let index_creator = Box::new(SortIndexCreator::new(sorter, segment_row_count));
let codec = IndexValuesCodec::from_tag_columns(metadata.primary_key_columns());
let mut column_ids = metadata
.primary_key_columns()
.map(|c| c.column_id)
.collect::<HashSet<_>>();
for id in ignore_column_ids {
column_ids.remove(id);
}
Self {
file_path,
store: InstrumentedStore::new(index_store),
codec,
index_creator,
temp_file_provider,
value_buf: vec![],
stats: Statistics::default(),
aborted: false,
ignore_column_ids: HashSet::default(),
memory_usage,
compress,
column_ids,
}
}
/// Sets the write buffer size of the store.
pub fn with_buffer_size(mut self, write_buffer_size: Option<usize>) -> Self {
self.store = self.store.with_write_buffer_size(write_buffer_size);
self
}
/// Sets the ignore column IDs for index creation.
pub fn with_ignore_column_ids(mut self, ignore_column_ids: HashSet<ColumnId>) -> Self {
self.ignore_column_ids = ignore_column_ids;
self
}
/// Updates index with a batch of rows.
/// Garbage will be cleaned up if failed to update.
pub async fn update(&mut self, batch: &Batch) -> Result<()> {
@@ -155,12 +140,9 @@ impl SstIndexCreator {
// clean up garbage if failed to update
if let Err(err) = self.do_cleanup().await {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to clean up index creator, file_path: {}, err: {}",
self.file_path, err
);
panic!("Failed to clean up index creator, err: {err}",);
} else {
warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path);
warn!(err; "Failed to clean up index creator");
}
}
return Err(update_err);
@@ -171,7 +153,10 @@ impl SstIndexCreator {
/// Finishes index creation and cleans up garbage.
/// Returns the number of rows and bytes written.
pub async fn finish(&mut self) -> Result<(RowCount, ByteCount)> {
pub async fn finish(
&mut self,
puffin_writer: &mut SstPuffinWriter,
) -> Result<(RowCount, ByteCount)> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if self.stats.row_count() == 0 {
@@ -179,16 +164,13 @@ impl SstIndexCreator {
return Ok((0, 0));
}
let finish_res = self.do_finish().await;
let finish_res = self.do_finish(puffin_writer).await;
// clean up garbage no matter finish successfully or not
if let Err(err) = self.do_cleanup().await {
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to clean up index creator, file_path: {}, err: {}",
self.file_path, err
);
panic!("Failed to clean up index creator, err: {err}",);
} else {
warn!(err; "Failed to clean up index creator, file_path: {}", self.file_path);
warn!(err; "Failed to clean up index creator");
}
}
@@ -211,8 +193,8 @@ impl SstIndexCreator {
let n = batch.num_rows();
guard.inc_row_count(n);
for (column_id, field, value) in self.codec.decode(batch.primary_key())? {
if self.ignore_column_ids.contains(column_id) {
for ((col_id, col_id_str), field, value) in self.codec.decode(batch.primary_key())? {
if !self.column_ids.contains(col_id) {
continue;
}
@@ -228,7 +210,7 @@ impl SstIndexCreator {
// non-null value -> Some(encoded_bytes), null value -> None
let value = value.is_some().then_some(self.value_buf.as_slice());
self.index_creator
.push_with_name_n(column_id, value, n)
.push_with_name_n(col_id_str, value, n)
.await
.context(PushIndexValueSnafu)?;
}
@@ -254,32 +236,18 @@ impl SstIndexCreator {
/// └─────────────┘ └────────────────►│ File │
/// └──────┘
/// ```
async fn do_finish(&mut self) -> Result<()> {
async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
let mut guard = self.stats.record_finish();
let file_writer = self
.store
.writer(
&self.file_path,
&INDEX_PUFFIN_WRITE_BYTES_TOTAL,
&INDEX_PUFFIN_WRITE_OP_TOTAL,
&INDEX_PUFFIN_FLUSH_OP_TOTAL,
)
.await?;
let mut puffin_writer = PuffinFileWriter::new(file_writer);
let (tx, rx) = duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
let blob = Blob {
blob_type: INDEX_BLOB_TYPE.to_string(),
compressed_data: rx.compat(),
properties: HashMap::default(),
compression_codec: None,
};
let mut index_writer = InvertedIndexBlobWriter::new(tx.compat_write());
let put_options = PutOptions {
compression: self.compress.then_some(CompressionCodec::Zstd),
};
let (index_finish, puffin_add_blob) = futures::join!(
self.index_creator.finish(&mut index_writer),
puffin_writer.add_blob(blob)
puffin_writer.put_blob(INDEX_BLOB_TYPE, rx.compat(), put_options)
);
match (
@@ -294,11 +262,11 @@ impl SstIndexCreator {
(Ok(_), e @ Err(_)) => e?,
(e @ Err(_), Ok(_)) => e.map(|_| ())?,
_ => {}
(Ok(written_bytes), Ok(_)) => {
guard.inc_byte_count(written_bytes);
}
}
let byte_count = puffin_writer.finish().await.context(PuffinFinishSnafu)?;
guard.inc_byte_count(byte_count);
Ok(())
}
@@ -308,6 +276,10 @@ impl SstIndexCreator {
self.temp_file_provider.cleanup().await
}
pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
self.column_ids.iter().copied()
}
pub fn memory_usage(&self) -> usize {
self.memory_usage.load(std::sync::atomic::Ordering::Relaxed)
}
@@ -326,12 +298,14 @@ mod tests {
use datatypes::vectors::{UInt64Vector, UInt8Vector};
use futures::future::BoxFuture;
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin::puffin_manager::PuffinManager;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use store_api::storage::RegionId;
use super::*;
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
use crate::sst::index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::builder::SstIndexApplierBuilder;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
@@ -418,13 +392,13 @@ mod tests {
let segment_row_count = 2;
let mut creator = SstIndexCreator::new(
file_path,
sst_file_id,
&region_metadata,
object_store.clone(),
intm_mgr,
memory_threshold,
NonZeroUsize::new(segment_row_count).unwrap(),
false,
&[],
);
for (str_tag, i32_tag) in &tags {
@@ -432,8 +406,11 @@ mod tests {
creator.update(&batch).await.unwrap();
}
let (row_count, _) = creator.finish().await.unwrap();
let puffin_manager = factory.build(object_store.clone());
let mut writer = puffin_manager.writer(&file_path).await.unwrap();
let (row_count, _) = creator.finish(&mut writer).await.unwrap();
assert_eq!(row_count, tags.len() * segment_row_count);
writer.finish().await.unwrap();
move |expr| {
let _d = &d;

View File

@@ -22,7 +22,7 @@ use puffin::error::{self as puffin_error, Result as PuffinResult};
use puffin::puffin_manager::file_accessor::PuffinFileAccessor;
use puffin::puffin_manager::fs_puffin_manager::FsPuffinManager;
use puffin::puffin_manager::stager::{BoundedStager, FsBlobGuard};
use puffin::puffin_manager::BlobGuard;
use puffin::puffin_manager::{BlobGuard, PuffinManager};
use snafu::ResultExt;
use crate::error::{PuffinInitStagerSnafu, Result};
@@ -36,6 +36,7 @@ type InstrumentedAsyncRead = store::InstrumentedAsyncRead<'static, FuturesAsyncR
type InstrumentedAsyncWrite = store::InstrumentedAsyncWrite<'static, FuturesAsyncWriter>;
pub(crate) type BlobReader = <Arc<FsBlobGuard> as BlobGuard>::Reader;
pub(crate) type SstPuffinWriter = <SstPuffinManager as PuffinManager>::Writer;
pub(crate) type SstPuffinManager =
FsPuffinManager<Arc<BoundedStager>, ObjectStorePuffinFileAccessor>;

View File

@@ -20,6 +20,7 @@ use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;
use crate::sst::file::FileTimeRange;
use crate::sst::index::IndexOutput;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
pub(crate) mod file_range;
@@ -71,10 +72,8 @@ pub struct SstInfo {
pub num_row_groups: u64,
/// File Meta Data
pub file_metadata: Option<Arc<ParquetMetaData>>,
/// Whether inverted index is available.
pub inverted_index_available: bool,
/// Index file size in bytes.
pub index_file_size: u64,
/// Index Meta Data
pub index_metadata: IndexOutput,
}
#[cfg(test)]

View File

@@ -52,7 +52,7 @@ use crate::metrics::{
use crate::read::{Batch, BatchReader};
use crate::row_converter::{McmpRowCodec, SortField};
use crate::sst::file::FileHandle;
use crate::sst::index::applier::SstIndexApplierRef;
use crate::sst::index::inverted_index::applier::SstIndexApplierRef;
use crate::sst::parquet::file_range::{FileRangeContext, FileRangeContextRef};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::metadata::MetadataLoader;

View File

@@ -134,9 +134,7 @@ where
}
}
let index_size = self.indexer.finish().await;
let inverted_index_available = index_size.is_some();
let index_file_size = index_size.unwrap_or(0);
let index_output = self.indexer.finish().await;
if stats.num_rows == 0 {
return Ok(None);
@@ -165,8 +163,7 @@ where
num_rows: stats.num_rows,
num_row_groups: parquet_metadata.num_row_groups() as u64,
file_metadata: Some(Arc::new(parquet_metadata)),
inverted_index_available,
index_file_size,
index_metadata: index_output,
}))
}

View File

@@ -69,6 +69,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
use crate::read::{Batch, BatchBuilder, BatchReader};
use crate::sst::file_purger::{FilePurger, FilePurgerRef, PurgeRequest};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::time_provider::{StdTimeProvider, TimeProviderRef};
use crate::worker::WorkerGroup;
@@ -604,15 +605,25 @@ impl TestEnv {
) -> WriteCacheRef {
let data_home = self.data_home().display().to_string();
let intm_mgr = IntermediateManager::init_fs(join_dir(&data_home, "intm"))
let index_aux_path = self.data_home.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let object_store_manager = self.get_object_store_manager().unwrap();
let write_cache =
WriteCache::new(local_store, object_store_manager, capacity, None, intm_mgr)
.await
.unwrap();
let write_cache = WriteCache::new(
local_store,
object_store_manager,
capacity,
None,
puffin_mgr,
intm_mgr,
)
.await
.unwrap();
Arc::new(write_cache)
}

View File

@@ -149,6 +149,7 @@ impl WorkerGroup {
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
puffin_manager_factory.clone(),
intermediate_manager.clone(),
)
.await?;
@@ -280,6 +281,7 @@ impl WorkerGroup {
let write_cache = write_cache_from_config(
&config,
object_store_manager.clone(),
puffin_manager_factory.clone(),
intermediate_manager.clone(),
)
.await?;
@@ -337,6 +339,7 @@ fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
async fn write_cache_from_config(
config: &MitoConfig,
object_store_manager: ObjectStoreManagerRef,
puffin_manager_factory: PuffinManagerFactory,
intermediate_manager: IntermediateManager,
) -> Result<Option<WriteCacheRef>> {
if !config.enable_experimental_write_cache {
@@ -351,6 +354,7 @@ async fn write_cache_from_config(
object_store_manager,
config.experimental_write_cache_size,
config.experimental_write_cache_ttl,
puffin_manager_factory,
intermediate_manager,
)
.await?;

View File

@@ -838,6 +838,7 @@ create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "64.0MiB"
compress = true
[region_engine.mito.memtable]
type = "time_series"