feat(fulltext_index): integrate full-text indexer with sst writer (#4302)

* feat(fulltext_index): integrate full-text indexer with sst writer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* refactor: delay building puffin writer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* test: indexer test

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: add abort on empty indexer

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* config: indicates default mode

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* config: introduce "auto" and "unlimited" as mem threshold

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chore: polish

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* doc: comment about push empty string

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
This commit is contained in:
Zhenchi
2024-07-07 12:10:19 +08:00
committed by GitHub
parent 3f4928effc
commit a710676d06
22 changed files with 969 additions and 132 deletions

View File

@@ -124,11 +124,16 @@
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.<br/>Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |
@@ -408,11 +413,16 @@
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
| `region_engine.mito.inverted_index` | -- | -- | The options for inverted index in Mito engine. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `64M` | Memory threshold for performing an external sort during index creation.<br/>Setting to empty will disable external sorting, forcing all sorting operations to happen in memory. |
| `region_engine.mito.inverted_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.inverted_index.mem_threshold_on_create` | String | `auto` | Memory threshold for performing an external sort during index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.inverted_index.intermediate_path` | String | `""` | Deprecated, use `region_engine.mito.index.aux_path` instead. |
| `region_engine.mito.fulltext_index` | -- | -- | The options for full-text index in Mito engine. |
| `region_engine.mito.fulltext_index.create_on_flush` | String | `auto` | Whether to create the index on flush.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.apply_on_query` | String | `auto` | Whether to apply the index on query<br/>- `auto`: automatically (default)<br/>- `disable`: never |
| `region_engine.mito.fulltext_index.mem_threshold_on_create` | String | `auto` | Memory threshold for index creation.<br/>- `auto`: automatically determine the threshold based on the system memory size (default)<br/>- `unlimited`: no memory limit<br/>- `[size]` e.g. `64MB`: fixed memory threshold |
| `region_engine.mito.memtable` | -- | -- | -- |
| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.<br/>- `time_series`: time-series memtable<br/>- `partition_tree`: partition tree memtable (experimental) |
| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.<br/>Only available for `partition_tree` memtable. |

View File

@@ -413,27 +413,53 @@ staging_size = "2GB"
[region_engine.mito.inverted_index]
## Whether to create the index on flush.
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"
## Whether to create the index on compaction.
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"
## Whether to apply the index on query
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"
## Memory threshold for performing an external sort during index creation.
## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = ""
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]
## Whether to create the index on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"
## Whether to create the index on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"
## Whether to apply the index on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"
## Memory threshold for index creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable

View File

@@ -436,27 +436,53 @@ staging_size = "2GB"
[region_engine.mito.inverted_index]
## Whether to create the index on flush.
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"
## Whether to create the index on compaction.
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"
## Whether to apply the index on query
## - `auto`: automatically
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"
## Memory threshold for performing an external sort during index creation.
## Setting to empty will disable external sorting, forcing all sorting operations to happen in memory.
mem_threshold_on_create = "64M"
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
## Deprecated, use `region_engine.mito.index.aux_path` instead.
intermediate_path = ""
## The options for full-text index in Mito engine.
[region_engine.mito.fulltext_index]
## Whether to create the index on flush.
## - `auto`: automatically (default)
## - `disable`: never
create_on_flush = "auto"
## Whether to create the index on compaction.
## - `auto`: automatically (default)
## - `disable`: never
create_on_compaction = "auto"
## Whether to apply the index on query
## - `auto`: automatically (default)
## - `disable`: never
apply_on_query = "auto"
## Memory threshold for index creation.
## - `auto`: automatically determine the threshold based on the system memory size (default)
## - `unlimited`: no memory limit
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
[region_engine.mito.memtable]
## Memtable type.
## - `time_series`: time-series memtable

View File

@@ -25,7 +25,10 @@ use datafusion_common::DFSchemaRef;
use snafu::{ensure, ResultExt};
use crate::error::{self, DuplicateColumnSnafu, Error, ProjectArrowSchemaSnafu, Result};
pub use crate::schema::column_schema::{ColumnSchema, Metadata, COMMENT_KEY, TIME_INDEX_KEY};
pub use crate::schema::column_schema::{
ColumnSchema, FulltextAnalyzer, FulltextOptions, Metadata, COMMENT_KEY, FULLTEXT_KEY,
TIME_INDEX_KEY,
};
pub use crate::schema::constraint::ColumnDefaultConstraint;
pub use crate::schema::raw::RawSchema;

View File

@@ -32,6 +32,8 @@ pub const TIME_INDEX_KEY: &str = "greptime:time_index";
pub const COMMENT_KEY: &str = "greptime:storage:comment";
/// Key used to store default constraint in arrow field's metadata.
const DEFAULT_CONSTRAINT_KEY: &str = "greptime:default_constraint";
/// Key used to store fulltext options in column metadata.
pub const FULLTEXT_KEY: &str = "greptime:fulltext";
/// Schema of a column, used as an immutable struct.
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
@@ -240,6 +242,18 @@ impl ColumnSchema {
}
}
}
/// Retrieves the fulltext options for the column.
pub fn fulltext_options(&self) -> Result<Option<FulltextOptions>> {
match self.metadata.get(FULLTEXT_KEY) {
None => Ok(None),
Some(json) => {
let options =
serde_json::from_str(json).context(error::DeserializeSnafu { json })?;
Ok(Some(options))
}
}
}
}
impl TryFrom<&Field> for ColumnSchema {
@@ -296,6 +310,25 @@ impl TryFrom<&ColumnSchema> for Field {
}
}
/// Fulltext options for a column.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct FulltextOptions {
/// Whether the fulltext index is enabled.
pub enable: bool,
/// The fulltext analyzer to use.
pub analyzer: FulltextAnalyzer,
/// Whether the fulltext index is case-sensitive.
pub case_sensitive: bool,
}
/// Fulltext analyzer.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum FulltextAnalyzer {
#[default]
English,
Chinese,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;

View File

@@ -22,7 +22,7 @@ use store_api::metadata::RegionMetadataRef;
use crate::cache::write_cache::SstUploadRequest;
use crate::cache::CacheManagerRef;
use crate::config::InvertedIndexConfig;
use crate::config::{FulltextIndexConfig, InvertedIndexConfig};
use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result};
use crate::read::Source;
use crate::region::options::IndexOptions;
@@ -153,6 +153,7 @@ impl AccessLayer {
intermediate_manager: self.intermediate_manager.clone(),
index_options: request.index_options,
inverted_index_config: request.inverted_index_config,
fulltext_index_config: request.fulltext_index_config,
}
.build()
.await;
@@ -204,6 +205,7 @@ pub(crate) struct SstWriteRequest {
/// Configs for index
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
}
/// Creates a fs object store with atomic write dir.

View File

@@ -130,6 +130,7 @@ impl WriteCache {
intermediate_manager: self.intermediate_manager.clone(),
index_options: write_request.index_options,
inverted_index_config: write_request.inverted_index_config,
fulltext_index_config: write_request.fulltext_index_config,
}
.build()
.await;
@@ -307,6 +308,7 @@ mod tests {
cache_manager: Default::default(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
};
let upload_request = SstUploadRequest {
@@ -391,6 +393,7 @@ mod tests {
cache_manager: cache_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: Default::default(),
fulltext_index_config: Default::default(),
};
let write_opts = WriteOptions {
row_group_size: 512,

View File

@@ -275,6 +275,7 @@ impl Compactor for DefaultCompactor {
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
futs.push(async move {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
@@ -299,6 +300,7 @@ impl Compactor for DefaultCompactor {
storage,
index_options,
inverted_index_config,
fulltext_index_config,
},
&write_opts,
)
@@ -314,6 +316,9 @@ impl Compactor for DefaultCompactor {
if sst_info.index_metadata.inverted_index.is_available() {
indexes.push(IndexType::InvertedIndex);
}
if sst_info.index_metadata.fulltext_index.is_available() {
indexes.push(IndexType::FulltextIndex);
}
indexes
},
index_file_size: sst_info.index_metadata.file_size,

View File

@@ -22,7 +22,7 @@ use common_base::readable_size::ReadableSize;
use common_telemetry::warn;
use object_store::util::join_dir;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};
use serde_with::serde_as;
use crate::error::Result;
use crate::memtable::MemtableConfig;
@@ -41,6 +41,8 @@ const GLOBAL_WRITE_BUFFER_SIZE_FACTOR: u64 = 8;
const SST_META_CACHE_SIZE_FACTOR: u64 = 32;
/// Use `1/MEM_CACHE_SIZE_FACTOR` of OS memory size as mem cache size in default mode
const MEM_CACHE_SIZE_FACTOR: u64 = 16;
/// Use `1/INDEX_CREATE_MEM_THRESHOLD_FACTOR` of OS memory size as mem threshold for creating index
const INDEX_CREATE_MEM_THRESHOLD_FACTOR: u64 = 16;
/// Configuration for [MitoEngine](crate::engine::MitoEngine).
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
@@ -109,6 +111,8 @@ pub struct MitoConfig {
pub index: IndexConfig,
/// Inverted index configs.
pub inverted_index: InvertedIndexConfig,
/// Full-text index configs.
pub fulltext_index: FulltextIndexConfig,
/// Memtable config
pub memtable: MemtableConfig,
@@ -139,6 +143,7 @@ impl Default for MitoConfig {
allow_stale_entries: false,
index: IndexConfig::default(),
inverted_index: InvertedIndexConfig::default(),
fulltext_index: FulltextIndexConfig::default(),
memtable: MemtableConfig::default(),
};
@@ -337,6 +342,20 @@ impl Mode {
}
}
/// Memory threshold for performing certain actions.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum MemoryThreshold {
/// Automatically determine the threshold based on internal criteria.
#[default]
Auto,
/// Unlimited memory.
Unlimited,
/// Fixed memory threshold.
#[serde(untagged)]
Size(ReadableSize),
}
/// Configuration options for the inverted index.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
@@ -350,9 +369,7 @@ pub struct InvertedIndexConfig {
pub apply_on_query: Mode,
/// Memory threshold for performing an external sort during index creation.
/// `None` means all sorting will happen in memory.
#[serde_as(as = "NoneAsEmptyString")]
pub mem_threshold_on_create: Option<ReadableSize>,
pub mem_threshold_on_create: MemoryThreshold,
/// Whether to compress the index data.
pub compress: bool,
@@ -373,8 +390,8 @@ impl Default for InvertedIndexConfig {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
compress: true,
mem_threshold_on_create: Some(ReadableSize::mb(64)),
write_buffer_size: ReadableSize::mb(8),
intermediate_path: String::new(),
@@ -382,6 +399,67 @@ impl Default for InvertedIndexConfig {
}
}
impl InvertedIndexConfig {
pub fn mem_threshold_on_create(&self) -> Option<usize> {
match self.mem_threshold_on_create {
MemoryThreshold::Auto => {
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
Some((sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as usize)
} else {
Some(ReadableSize::mb(64).as_bytes() as usize)
}
}
MemoryThreshold::Unlimited => None,
MemoryThreshold::Size(size) => Some(size.as_bytes() as usize),
}
}
}
/// Configuration options for the full-text index.
#[serde_as]
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
pub struct FulltextIndexConfig {
/// Whether to create the index on flush: automatically or never.
pub create_on_flush: Mode,
/// Whether to create the index on compaction: automatically or never.
pub create_on_compaction: Mode,
/// Whether to apply the index on query: automatically or never.
pub apply_on_query: Mode,
/// Memory threshold for creating the index.
pub mem_threshold_on_create: MemoryThreshold,
/// Whether to compress the index data.
pub compress: bool,
}
impl Default for FulltextIndexConfig {
fn default() -> Self {
Self {
create_on_flush: Mode::Auto,
create_on_compaction: Mode::Auto,
apply_on_query: Mode::Auto,
mem_threshold_on_create: MemoryThreshold::Auto,
compress: true,
}
}
}
impl FulltextIndexConfig {
pub fn mem_threshold_on_create(&self) -> usize {
match self.mem_threshold_on_create {
MemoryThreshold::Auto => {
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
(sys_memory / INDEX_CREATE_MEM_THRESHOLD_FACTOR).as_bytes() as _
} else {
ReadableSize::mb(64).as_bytes() as _
}
}
MemoryThreshold::Unlimited => usize::MAX,
MemoryThreshold::Size(size) => size.as_bytes() as _,
}
}
}
/// Divide cpu num by a non-zero `divisor` and returns at least 1.
fn divide_num_cpus(divisor: usize) -> usize {
debug_assert!(divisor > 0);

View File

@@ -783,6 +783,44 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to retrieve fulltext options from column metadata"))]
FulltextOptions {
#[snafu(implicit)]
location: Location,
source: datatypes::error::Error,
column_name: String,
},
#[snafu(display("Failed to create fulltext index creator"))]
CreateFulltextCreator {
source: index::fulltext_index::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to cast vector of {from} to {to}"))]
CastVector {
#[snafu(implicit)]
location: Location,
from: ConcreteDataType,
to: ConcreteDataType,
source: datatypes::error::Error,
},
#[snafu(display("Failed to push text to fulltext index"))]
FulltextPushText {
source: index::fulltext_index::error::Error,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("Failed to finalize fulltext index creator"))]
FulltextFinish {
source: index::fulltext_index::error::Error,
#[snafu(implicit)]
location: Location,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -904,6 +942,11 @@ impl ErrorExt for Error {
BuildTimeRangeFilter { .. } => StatusCode::Unexpected,
UnsupportedOperation { .. } => StatusCode::Unsupported,
RemoteCompaction { .. } => StatusCode::Unexpected,
FulltextOptions { source, .. } => source.status_code(),
CreateFulltextCreator { source, .. } => source.status_code(),
CastVector { source, .. } => source.status_code(),
FulltextPushText { source, .. } | FulltextFinish { source, .. } => source.status_code(),
}
}

View File

@@ -332,6 +332,7 @@ impl RegionFlushTask {
storage: version.options.storage.clone(),
index_options: self.index_options.clone(),
inverted_index_config: self.engine_config.inverted_index.clone(),
fulltext_index_config: self.engine_config.fulltext_index.clone(),
};
let Some(sst_info) = self
.access_layer
@@ -354,6 +355,9 @@ impl RegionFlushTask {
if sst_info.index_metadata.inverted_index.is_available() {
indexes.push(IndexType::InvertedIndex);
}
if sst_info.index_metadata.fulltext_index.is_available() {
indexes.push(IndexType::FulltextIndex);
}
indexes
},
index_file_size: sst_info.index_metadata.file_size,

View File

@@ -128,12 +128,17 @@ pub struct FileMeta {
pub enum IndexType {
/// Inverted index.
InvertedIndex,
/// Full-text index.
FulltextIndex,
}
impl FileMeta {
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}
pub fn fulltext_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::FulltextIndex)
}
}
/// Handle to a SST file.

View File

@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod fulltext_index;
mod indexer;
pub(crate) mod intermediate;
pub(crate) mod inverted_index;
@@ -22,18 +23,18 @@ mod store;
use std::num::NonZeroUsize;
use common_telemetry::{debug, warn};
use puffin::puffin_manager::PuffinManager;
use puffin_manager::{SstPuffinManager, SstPuffinWriter};
use puffin_manager::SstPuffinManager;
use statistics::{ByteCount, RowCount};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, RegionId};
use crate::access_layer::OperationType;
use crate::config::InvertedIndexConfig;
use crate::config::{FulltextIndexConfig, InvertedIndexConfig};
use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
use crate::read::Batch;
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::creator::SstIndexCreator as FulltextIndexer;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer;
@@ -44,6 +45,8 @@ pub struct IndexOutput {
pub file_size: u64,
/// Inverted index output.
pub inverted_index: InvertedIndexOutput,
/// Fulltext index output.
pub fulltext_index: FulltextIndexOutput,
}
/// Output of the inverted index creation.
@@ -57,21 +60,40 @@ pub struct InvertedIndexOutput {
pub columns: Vec<ColumnId>,
}
/// Output of the fulltext index creation.
#[derive(Debug, Clone, Default)]
pub struct FulltextIndexOutput {
/// Size of the index.
pub index_size: ByteCount,
/// Number of rows in the index.
pub row_count: RowCount,
/// Available columns in the index.
pub columns: Vec<ColumnId>,
}
impl InvertedIndexOutput {
pub fn is_available(&self) -> bool {
self.index_size > 0
}
}
impl FulltextIndexOutput {
pub fn is_available(&self) -> bool {
self.index_size > 0
}
}
/// The index creator that hides the error handling details.
#[derive(Default)]
pub struct Indexer {
file_id: FileId,
file_path: String,
region_id: RegionId,
last_memory_usage: usize,
puffin_manager: Option<SstPuffinManager>,
inverted_indexer: Option<InvertedIndexer>,
puffin_writer: Option<SstPuffinWriter>,
fulltext_indexer: Option<FulltextIndexer>,
}
impl Indexer {
@@ -104,6 +126,10 @@ impl Indexer {
self.inverted_indexer
.as_ref()
.map_or(0, |creator| creator.memory_usage())
+ self
.fulltext_indexer
.as_ref()
.map_or(0, |creator| creator.memory_usage())
}
}
@@ -117,6 +143,7 @@ pub(crate) struct IndexerBuilder<'a> {
pub(crate) intermediate_manager: IntermediateManager,
pub(crate) index_options: IndexOptions,
pub(crate) inverted_index_config: InvertedIndexConfig,
pub(crate) fulltext_index_config: FulltextIndexConfig,
}
impl<'a> IndexerBuilder<'a> {
@@ -124,6 +151,7 @@ impl<'a> IndexerBuilder<'a> {
pub(crate) async fn build(self) -> Indexer {
let mut indexer = Indexer {
file_id: self.file_id,
file_path: self.file_path.clone(),
region_id: self.metadata.region_id,
last_memory_usage: 0,
@@ -131,17 +159,13 @@ impl<'a> IndexerBuilder<'a> {
};
indexer.inverted_indexer = self.build_inverted_indexer();
if indexer.inverted_indexer.is_none() {
indexer.abort().await;
return Indexer::default();
}
indexer.puffin_writer = self.build_puffin_writer().await;
if indexer.puffin_writer.is_none() {
indexer.fulltext_indexer = self.build_fulltext_indexer().await;
if indexer.inverted_indexer.is_none() && indexer.fulltext_indexer.is_none() {
indexer.abort().await;
return Indexer::default();
}
indexer.puffin_manager = Some(self.puffin_manager);
indexer
}
@@ -190,16 +214,11 @@ impl<'a> IndexerBuilder<'a> {
segment_row_count = row_group_size;
}
let mem_threshold = self
.inverted_index_config
.mem_threshold_on_create
.map(|t| t.as_bytes() as usize);
let indexer = InvertedIndexer::new(
self.file_id,
self.metadata,
self.intermediate_manager.clone(),
mem_threshold,
self.inverted_index_config.mem_threshold_on_create(),
segment_row_count,
self.inverted_index_config.compress,
&self.index_options.inverted_index.ignore_column_ids,
@@ -208,20 +227,54 @@ impl<'a> IndexerBuilder<'a> {
Some(indexer)
}
async fn build_puffin_writer(&self) -> Option<SstPuffinWriter> {
let err = match self.puffin_manager.writer(&self.file_path).await {
Ok(writer) => return Some(writer),
async fn build_fulltext_indexer(&self) -> Option<FulltextIndexer> {
let create = match self.op_type {
OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
};
if !create {
debug!(
"Skip creating full-text index due to config, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return None;
}
let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
let creator = FulltextIndexer::new(
&self.metadata.region_id,
&self.file_id,
&self.intermediate_manager,
self.metadata,
self.fulltext_index_config.compress,
mem_limit,
)
.await;
let err = match creator {
Ok(creator) => {
if creator.is_empty() {
debug!(
"Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
return None;
} else {
return Some(creator);
}
}
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create puffin writer, region_id: {}, file_id: {}, err: {}",
"Failed to create full-text indexer, region_id: {}, file_id: {}, err: {}",
self.metadata.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to create puffin writer, region_id: {}, file_id: {}",
err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
self.metadata.region_id, self.file_id,
);
}
@@ -236,48 +289,35 @@ mod tests {
use api::v1::SemanticType;
use datatypes::data_type::ConcreteDataType;
use datatypes::schema::ColumnSchema;
use datatypes::schema::{ColumnSchema, FulltextOptions, FULLTEXT_KEY};
use object_store::services::Memory;
use object_store::ObjectStore;
use puffin_manager::PuffinManagerFactory;
use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
use super::*;
use crate::config::Mode;
use crate::config::{FulltextIndexConfig, Mode};
fn mock_region_metadata() -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Tag,
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
semantic_type: SemanticType::Field,
column_id: 2,
})
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new(
"c",
ConcreteDataType::timestamp_millisecond_datatype(),
false,
),
semantic_type: SemanticType::Timestamp,
column_id: 3,
})
.primary_key(vec![1]);
Arc::new(builder.build().unwrap())
struct MetaConfig {
with_tag: bool,
with_fulltext: bool,
}
fn no_tag_region_metadata() -> RegionMetadataRef {
fn mock_region_metadata(
MetaConfig {
with_tag,
with_fulltext,
}: MetaConfig,
) -> RegionMetadataRef {
let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
builder
.push_column_metadata(ColumnMetadata {
column_schema: ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false),
semantic_type: SemanticType::Field,
semantic_type: if with_tag {
SemanticType::Tag
} else {
SemanticType::Field
},
column_id: 1,
})
.push_column_metadata(ColumnMetadata {
@@ -295,6 +335,32 @@ mod tests {
column_id: 3,
});
if with_tag {
builder.primary_key(vec![1]);
}
if with_fulltext {
let opts = serde_json::to_string(&FulltextOptions {
enable: true,
..Default::default()
})
.unwrap();
let mut column_schema =
ColumnSchema::new("text", ConcreteDataType::string_datatype(), true);
column_schema
.mut_metadata()
.insert(FULLTEXT_KEY.to_string(), opts);
let column = ColumnMetadata {
column_schema,
semantic_type: SemanticType::Field,
column_id: 4,
};
builder.push_column_metadata(column);
}
Arc::new(builder.build().unwrap())
}
@@ -302,102 +368,164 @@ mod tests {
ObjectStore::new(Memory::default()).unwrap().finish()
}
fn mock_intm_mgr() -> IntermediateManager {
IntermediateManager::new(mock_object_store())
async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
IntermediateManager::init_fs(path).await.unwrap()
}
#[tokio::test]
async fn test_build_indexer_basic() {
let (_d, factory) =
let (dir, factory) =
PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
let store = mock_object_store();
let puffin_manager = factory.build(store);
let metadata = mock_region_metadata();
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
let metadata = mock_region_metadata(MetaConfig {
with_tag: true,
with_fulltext: true,
});
let indexer = IndexerBuilder {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
puffin_manager,
intermediate_manager: mock_intm_mgr(),
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
}
.build()
.await;
assert!(indexer.inverted_indexer.is_some());
assert!(indexer.fulltext_indexer.is_some());
}
#[tokio::test]
async fn test_build_indexer_disable_create() {
let (_d, factory) =
let (dir, factory) =
PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
let store = mock_object_store();
let puffin_manager = factory.build(store);
let metadata = mock_region_metadata();
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
let metadata = mock_region_metadata(MetaConfig {
with_tag: true,
with_fulltext: true,
});
let indexer = IndexerBuilder {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
puffin_manager,
intermediate_manager: mock_intm_mgr(),
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig {
create_on_flush: Mode::Disable,
..Default::default()
},
fulltext_index_config: FulltextIndexConfig::default(),
}
.build()
.await;
assert!(indexer.inverted_indexer.is_none());
assert!(indexer.fulltext_indexer.is_some());
let indexer = IndexerBuilder {
op_type: OperationType::Compact,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig {
create_on_compaction: Mode::Disable,
..Default::default()
},
}
.build()
.await;
assert!(indexer.inverted_indexer.is_none());
assert!(indexer.inverted_indexer.is_some());
assert!(indexer.fulltext_indexer.is_none());
}
#[tokio::test]
async fn test_build_indexer_no_tag() {
let (_d, factory) =
PuffinManagerFactory::new_for_test_async("test_build_indexer_no_tag_").await;
let store = mock_object_store();
let puffin_manager = factory.build(store);
let metadata = no_tag_region_metadata();
async fn test_build_indexer_no_required() {
let (dir, factory) =
PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
let metadata = mock_region_metadata(MetaConfig {
with_tag: false,
with_fulltext: true,
});
let indexer = IndexerBuilder {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
puffin_manager,
intermediate_manager: mock_intm_mgr(),
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager.clone(),
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
}
.build()
.await;
assert!(indexer.inverted_indexer.is_none());
assert!(indexer.fulltext_indexer.is_some());
let metadata = mock_region_metadata(MetaConfig {
with_tag: true,
with_fulltext: false,
});
let indexer = IndexerBuilder {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 1024,
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
}
.build()
.await;
assert!(indexer.inverted_indexer.is_some());
assert!(indexer.fulltext_indexer.is_none());
}
#[tokio::test]
async fn test_build_indexer_zero_row_group() {
let (_d, factory) =
let (dir, factory) =
PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
let store = mock_object_store();
let puffin_manager = factory.build(store);
let metadata = mock_region_metadata();
let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
let metadata = mock_region_metadata(MetaConfig {
with_tag: true,
with_fulltext: true,
});
let indexer = IndexerBuilder {
op_type: OperationType::Flush,
file_id: FileId::random(),
file_path: "test".to_string(),
metadata: &metadata,
row_group_size: 0,
puffin_manager,
intermediate_manager: mock_intm_mgr(),
puffin_manager: factory.build(mock_object_store()),
intermediate_manager: intm_manager,
index_options: IndexOptions::default(),
inverted_index_config: InvertedIndexConfig::default(),
fulltext_index_config: FulltextIndexConfig::default(),
}
.build()
.await;

View File

@@ -0,0 +1,17 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub(crate) mod creator;
const INDEX_BLOB_TYPE: &str = "greptime-fulltext-index-v1";

View File

@@ -0,0 +1,296 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::path::PathBuf;
use common_telemetry::warn;
use datatypes::schema::FulltextAnalyzer;
use index::fulltext_index::create::{FulltextIndexCreator, TantivyFulltextIndexCreator};
use index::fulltext_index::{Analyzer, Config};
use puffin::blob_metadata::CompressionCodec;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, ConcreteDataType, RegionId};
use crate::error::{
CastVectorSnafu, CreateFulltextCreatorSnafu, FieldTypeMismatchSnafu, FulltextFinishSnafu,
FulltextOptionsSnafu, FulltextPushTextSnafu, OperateAbortedIndexSnafu, PuffinAddBlobSnafu,
Result,
};
use crate::read::Batch;
use crate::sst::file::FileId;
use crate::sst::index::fulltext_index::INDEX_BLOB_TYPE;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
/// `SstIndexCreator` is responsible for creating fulltext indexes for SST files.
pub struct SstIndexCreator {
/// Creators for each column.
creators: HashMap<ColumnId, SingleCreator>,
/// Whether the index creation was aborted.
aborted: bool,
/// Statistics of index creation.
stats: Statistics,
}
impl SstIndexCreator {
/// Creates a new `SstIndexCreator`.
pub async fn new(
region_id: &RegionId,
sst_file_id: &FileId,
intermediate_manager: &IntermediateManager,
metadata: &RegionMetadataRef,
compress: bool,
mem_limit: usize,
) -> Result<Self> {
let mut creators = HashMap::new();
for column in &metadata.column_metadatas {
let options =
column
.column_schema
.fulltext_options()
.context(FulltextOptionsSnafu {
column_name: &column.column_schema.name,
})?;
// Relax the type constraint here as many types can be casted to string.
let options = match options {
Some(options) if options.enable => options,
_ => continue,
};
let column_id = column.column_id;
let intm_path = intermediate_manager.fulltext_path(region_id, sst_file_id, column_id);
let config = Config {
analyzer: match options.analyzer {
FulltextAnalyzer::English => Analyzer::English,
FulltextAnalyzer::Chinese => Analyzer::Chinese,
},
case_sensitive: options.case_sensitive,
};
let creator = TantivyFulltextIndexCreator::new(&intm_path, config, mem_limit)
.await
.context(CreateFulltextCreatorSnafu)?;
creators.insert(
column_id,
SingleCreator {
column_id,
inner: Box::new(creator),
intm_path,
compress,
},
);
}
Ok(Self {
creators,
aborted: false,
stats: Statistics::default(),
})
}
/// Updates the index with the given batch.
pub async fn update(&mut self, batch: &Batch) -> Result<()> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
if let Err(update_err) = self.do_update(batch).await {
if let Err(err) = self.do_abort().await {
if cfg!(any(test, feature = "test")) {
panic!("Failed to abort index creator, err: {err}");
} else {
warn!(err; "Failed to abort index creator");
}
}
return Err(update_err);
}
Ok(())
}
/// Finalizes the index creation.
pub async fn finish(
&mut self,
puffin_writer: &mut SstPuffinWriter,
) -> Result<(RowCount, ByteCount)> {
ensure!(!self.aborted, OperateAbortedIndexSnafu);
match self.do_finish(puffin_writer).await {
Ok(()) => Ok((self.stats.row_count(), self.stats.byte_count())),
Err(finish_err) => {
if let Err(err) = self.do_abort().await {
if cfg!(any(test, feature = "test")) {
panic!("Failed to abort index creator, err: {err}");
} else {
warn!(err; "Failed to abort index creator");
}
}
Err(finish_err)
}
}
}
/// Aborts the index creation.
pub async fn abort(&mut self) -> Result<()> {
if self.aborted {
return Ok(());
}
self.do_abort().await
}
/// Returns the memory usage of the index creator.
pub fn memory_usage(&self) -> usize {
self.creators.values().map(|c| c.inner.memory_usage()).sum()
}
/// Returns IDs of columns that the creator is responsible for.
pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
self.creators.keys().copied()
}
pub(crate) fn is_empty(&self) -> bool {
self.creators.is_empty()
}
}
impl SstIndexCreator {
async fn do_update(&mut self, batch: &Batch) -> Result<()> {
let mut guard = self.stats.record_update();
guard.inc_row_count(batch.num_rows());
for creator in self.creators.values_mut() {
creator.update(batch).await?;
}
Ok(())
}
async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
let mut guard = self.stats.record_finish();
let mut written_bytes = 0;
for creator in self.creators.values_mut() {
written_bytes += creator.finish(puffin_writer).await?;
}
guard.inc_byte_count(written_bytes);
Ok(())
}
async fn do_abort(&mut self) -> Result<()> {
let _guard = self.stats.record_cleanup();
self.aborted = true;
for (_, mut creator) in self.creators.drain() {
creator.abort().await?;
}
Ok(())
}
}
/// `SingleCreator` is a creator for a single column.
struct SingleCreator {
/// Column ID.
column_id: ColumnId,
/// Inner creator.
inner: Box<dyn FulltextIndexCreator>,
/// Intermediate path where the index is written to.
intm_path: PathBuf,
/// Whether the index should be compressed.
compress: bool,
}
impl SingleCreator {
async fn update(&mut self, batch: &Batch) -> Result<()> {
let text_column = batch
.fields()
.iter()
.find(|c| c.column_id == self.column_id);
match text_column {
Some(column) => {
let data = column
.data
.cast(&ConcreteDataType::string_datatype())
.context(CastVectorSnafu {
from: column.data.data_type(),
to: ConcreteDataType::string_datatype(),
})?;
for i in 0..batch.num_rows() {
let data = data.get_ref(i);
let text = data
.as_string()
.context(FieldTypeMismatchSnafu)?
.unwrap_or_default();
self.inner
.push_text(text)
.await
.context(FulltextPushTextSnafu)?;
}
}
_ => {
// If the column is not found in the batch, push empty text.
// Ensure that the number of texts pushed is the same as the number of rows in the SST,
// so that the texts are aligned with the row ids.
for _ in 0..batch.num_rows() {
self.inner
.push_text("")
.await
.context(FulltextPushTextSnafu)?;
}
}
}
Ok(())
}
async fn finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<ByteCount> {
self.inner.finish().await.context(FulltextFinishSnafu)?;
let options = PutOptions {
compression: self.compress.then_some(CompressionCodec::Zstd),
};
let key = format!("{INDEX_BLOB_TYPE}-{}", self.column_id);
puffin_writer
.put_dir(&key, self.intm_path.clone(), options)
.await
.context(PuffinAddBlobSnafu)
}
async fn abort(&mut self) -> Result<()> {
if let Err(err) = self.inner.finish().await {
warn!(err; "Failed to finish fulltext index creator, col_id: {:?}, dir_path: {:?}", self.column_id, self.intm_path);
}
if let Err(err) = tokio::fs::remove_dir_all(&self.intm_path).await {
warn!(err; "Failed to remove fulltext index directory, col_id: {:?}, dir_path: {:?}", self.column_id, self.intm_path);
}
Ok(())
}
}
#[cfg(test)]
mod tests {
// TODO(zhongzc): After search is implemented, add tests for full-text indexer.
}

View File

@@ -13,14 +13,14 @@
// limitations under the License.
use common_telemetry::warn;
use puffin::puffin_manager::PuffinWriter;
use crate::sst::index::Indexer;
impl Indexer {
pub(crate) async fn do_abort(&mut self) {
self.do_abort_inverted_index().await;
self.do_abort_puffin_writer().await;
self.do_abort_fulltext_index().await;
self.puffin_manager = None;
}
async fn do_abort_inverted_index(&mut self) {
@@ -44,24 +44,22 @@ impl Indexer {
}
}
async fn do_abort_puffin_writer(&mut self) {
let Some(puffin_writer) = self.puffin_writer.take() else {
async fn do_abort_fulltext_index(&mut self) {
let Some(mut indexer) = self.fulltext_indexer.take() else {
return;
};
let err = match puffin_writer.finish().await {
Ok(_) => return,
Err(err) => err,
let Err(err) = indexer.abort().await else {
return;
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to abort puffin writer, region_id: {}, file_id: {}, err: {}",
"Failed to abort full-text index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to abort puffin writer, region_id: {}, file_id: {}",
err; "Failed to abort full-text index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}

View File

@@ -13,18 +13,20 @@
// limitations under the License.
use common_telemetry::{debug, warn};
use puffin::puffin_manager::PuffinWriter;
use puffin::puffin_manager::{PuffinManager, PuffinWriter};
use crate::sst::index::fulltext_index::creator::SstIndexCreator as FulltextIndexer;
use crate::sst::index::inverted_index::creator::SstIndexCreator as InvertedIndexer;
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount};
use crate::sst::index::{IndexOutput, Indexer, InvertedIndexOutput};
use crate::sst::index::{FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput};
impl Indexer {
pub(crate) async fn do_finish(&mut self) -> IndexOutput {
let mut output = IndexOutput::default();
let Some(mut writer) = self.puffin_writer.take() else {
let Some(mut writer) = self.build_puffin_writer().await else {
self.do_abort().await;
return output;
};
@@ -36,10 +38,41 @@ impl Indexer {
return IndexOutput::default();
}
let success = self
.do_finish_fulltext_index(&mut writer, &mut output)
.await;
if !success {
self.do_abort().await;
return IndexOutput::default();
}
output.file_size = self.do_finish_puffin_writer(writer).await;
output
}
async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
let puffin_manager = self.puffin_manager.take()?;
let err = match puffin_manager.writer(&self.file_path).await {
Ok(writer) => return Some(writer),
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create puffin writer, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to create puffin writer, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
None
}
async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
let err = match writer.finish().await {
Ok(size) => return size,
@@ -99,6 +132,43 @@ impl Indexer {
false
}
async fn do_finish_fulltext_index(
&mut self,
puffin_writer: &mut SstPuffinWriter,
index_output: &mut IndexOutput,
) -> bool {
let Some(mut indexer) = self.fulltext_indexer.take() else {
return true;
};
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_fulltext_index_output(
&mut index_output.fulltext_index,
row_count,
byte_count,
&indexer,
);
return true;
}
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to finish full-text index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to finish full-text index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
fn fill_inverted_index_output(
&mut self,
output: &mut InvertedIndexOutput,
@@ -115,4 +185,21 @@ impl Indexer {
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
}
fn fill_fulltext_index_output(
&mut self,
output: &mut FulltextIndexOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &FulltextIndexer,
) {
debug!(
"Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
}
}

View File

@@ -26,6 +26,9 @@ impl Indexer {
if !self.do_update_inverted_index(batch).await {
self.do_abort().await;
}
if !self.do_update_fulltext_index(batch).await {
self.do_abort().await;
}
}
/// Returns false if the update failed.
@@ -52,4 +55,29 @@ impl Indexer {
false
}
/// Returns false if the update failed.
async fn do_update_fulltext_index(&mut self, batch: &Batch) -> bool {
let Some(creator) = self.fulltext_indexer.as_mut() else {
return true;
};
let Err(err) = creator.update(batch).await else {
return true;
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to update full-text index, region_id: {}, file_id: {}, err: {}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to update full-text index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
}

View File

@@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::path::PathBuf;
use common_telemetry::warn;
use object_store::util::{self, normalize_dir};
use store_api::storage::RegionId;
use store_api::storage::{ColumnId, RegionId};
use uuid::Uuid;
use crate::access_layer::new_fs_object_store;
@@ -27,6 +29,7 @@ const INTERMEDIATE_DIR: &str = "__intm";
/// `IntermediateManager` provides store to access to intermediate files.
#[derive(Clone)]
pub struct IntermediateManager {
base_dir: PathBuf,
store: InstrumentedStore,
}
@@ -42,7 +45,10 @@ impl IntermediateManager {
warn!(err; "Failed to remove garbage intermediate files");
}
Ok(Self { store })
Ok(Self {
base_dir: PathBuf::from(aux_path.as_ref()),
store,
})
}
/// Set the write buffer size for the store.
@@ -56,11 +62,20 @@ impl IntermediateManager {
&self.store
}
#[cfg(test)]
pub(crate) fn new(store: object_store::ObjectStore) -> Self {
Self {
store: InstrumentedStore::new(store),
}
/// Returns the intermediate directory path for building fulltext index.
/// The format is `{aux_path}/__intm/{region_id}/{sst_file_id}/fulltext-{column_id}-{uuid}`.
pub(crate) fn fulltext_path(
&self,
region_id: &RegionId,
sst_file_id: &FileId,
column_id: ColumnId,
) -> PathBuf {
let uuid = Uuid::new_v4();
self.base_dir
.join(INTERMEDIATE_DIR)
.join(region_id.as_u64().to_string())
.join(sst_file_id.to_string())
.join(format!("fulltext-{column_id}-{uuid}"))
}
}
@@ -163,4 +178,30 @@ mod tests {
format!("{INTERMEDIATE_DIR}/0/{sst_file_id}/{uuid}/{column_id}/{im_file_id}.im")
);
}
#[tokio::test]
async fn test_fulltext_intm_path() {
let temp_dir = temp_dir::create_temp_dir("test_fulltext_intm_path_");
let aux_path = temp_dir.path().to_string_lossy().to_string();
let manager = IntermediateManager::init_fs(&aux_path).await.unwrap();
let region_id = RegionId::new(0, 0);
let sst_file_id = FileId::random();
let column_id = 0;
let fulltext_path = manager.fulltext_path(&region_id, &sst_file_id, column_id);
if cfg!(windows) {
let p = fulltext_path.to_string_lossy().to_string();
let r = Regex::new(&format!(
"{aux_path}\\\\{INTERMEDIATE_DIR}\\\\0\\\\{sst_file_id}\\\\fulltext-0-\\w{{8}}-\\w{{4}}-\\w{{4}}-\\w{{4}}-\\w{{12}}",
)).unwrap();
assert!(r.is_match(&p));
} else {
let p = fulltext_path.to_string_lossy().to_string();
let r = Regex::new(&format!(
"{aux_path}/{INTERMEDIATE_DIR}/0/{sst_file_id}/fulltext-0-\\w{{8}}-\\w{{4}}-\\w{{4}}-\\w{{4}}-\\w{{12}}",
)).unwrap();
assert!(r.is_match(&p));
}
}
}

View File

@@ -313,8 +313,8 @@ mod tests {
ObjectStore::new(Memory::default()).unwrap().finish()
}
fn mock_intm_mgr() -> IntermediateManager {
IntermediateManager::new(mock_object_store())
async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
IntermediateManager::init_fs(path).await.unwrap()
}
fn mock_region_metadata() -> RegionMetadataRef {
@@ -387,7 +387,7 @@ mod tests {
let file_path = location::index_file_path(&region_dir, sst_file_id);
let object_store = mock_object_store();
let region_metadata = mock_region_metadata();
let intm_mgr = mock_intm_mgr();
let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
let memory_threshold = None;
let segment_row_count = 2;

View File

@@ -45,7 +45,6 @@ use log_store::raft_engine::log_store::RaftEngineLogStore;
use log_store::test_util::log_store_util;
use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
use object_store::services::Fs;
use object_store::util::join_dir;
use object_store::ObjectStore;
use rskafka::client::partition::{Compression, UnknownTopicHandling};
use rskafka::client::{Client, ClientBuilder};
@@ -603,8 +602,6 @@ impl TestEnv {
local_store: ObjectStore,
capacity: ReadableSize,
) -> WriteCacheRef {
let data_home = self.data_home().display().to_string();
let index_aux_path = self.data_home.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None)
.await

View File

@@ -837,7 +837,14 @@ write_buffer_size = "8MiB"
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "64.0MiB"
mem_threshold_on_create = "auto"
compress = true
[region_engine.mito.fulltext_index]
create_on_flush = "auto"
create_on_compaction = "auto"
apply_on_query = "auto"
mem_threshold_on_create = "auto"
compress = true
[region_engine.mito.memtable]