feat: add an flag to enable the experimental flat format (#6976)

* feat: add enable_experimental_flat_format flag to enable flat format

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: extract build_scan_input for CompactionSstReaderBuilder

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: add compact memtable cost to flush metrics

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: Sets compact dispatcher for bulk memtable

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: Cast dictionary to target type in FlatProjectionMapper

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: add time index to FlatProjectionMapper::batch_schema

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: update config toml

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: pass flat_format to ProjectionMapper in CompactionSstReaderBuilder

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-09-16 17:33:12 +08:00
committed by GitHub
parent db42ad42dc
commit b8e0c49cb4
14 changed files with 122 additions and 43 deletions

View File

@@ -151,6 +151,7 @@
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
@@ -543,6 +544,7 @@
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.<br/>To align with the old behavior, the default value is 0 (no restrictions). |
| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for<br/>creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.<br/>The default name for this directory is `index_intermediate` for backward compatibility.<br/><br/>This path contains two subdirectories:<br/>- `__intm`: for storing intermediate files used during creating index.<br/>- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |

View File

@@ -497,6 +497,9 @@ allow_stale_entries = false
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## Whether to enable experimental flat format.
enable_experimental_flat_format = false
## The options for index in Mito engine.
[region_engine.mito.index]

View File

@@ -576,6 +576,9 @@ allow_stale_entries = false
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
## Whether to enable experimental flat format.
enable_experimental_flat_format = false
## The options for index in Mito engine.
[region_engine.mito.index]

View File

@@ -68,6 +68,7 @@ pub struct Metrics {
pub(crate) update_index: Duration,
pub(crate) upload_parquet: Duration,
pub(crate) upload_puffin: Duration,
pub(crate) compact_memtable: Duration,
}
impl Metrics {
@@ -79,6 +80,7 @@ impl Metrics {
update_index: Default::default(),
upload_parquet: Default::default(),
upload_puffin: Default::default(),
compact_memtable: Default::default(),
}
}
@@ -89,6 +91,7 @@ impl Metrics {
self.update_index += other.update_index;
self.upload_parquet += other.upload_parquet;
self.upload_puffin += other.upload_puffin;
self.compact_memtable += other.compact_memtable;
self
}
@@ -110,6 +113,11 @@ impl Metrics {
FLUSH_ELAPSED
.with_label_values(&["upload_puffin"])
.observe(self.upload_puffin.as_secs_f64());
if !self.compact_memtable.is_zero() {
FLUSH_ELAPSED
.with_label_values(&["compact_memtable"])
.observe(self.upload_puffin.as_secs_f64());
}
}
WriteType::Compaction => {
COMPACTION_STAGE_ELAPSED

View File

@@ -638,25 +638,7 @@ struct CompactionSstReaderBuilder<'a> {
impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(self) -> Result<BoxedBatchReader> {
let mut scan_input = ScanInput::new(
self.sst_layer,
ProjectionMapper::all(&self.metadata, false)?,
)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.
if let Some(time_range) = self.time_range {
scan_input =
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
let scan_input = self.build_scan_input(false)?;
SeqScan::new(scan_input, true)
.build_reader_for_compaction()
@@ -665,17 +647,27 @@ impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
async fn build_flat_sst_reader(self) -> Result<BoxedRecordBatchStream> {
let mut scan_input =
ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata, true)?)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode)
.with_flat_format(true);
let scan_input = self.build_scan_input(true)?;
SeqScan::new(scan_input, true)
.build_flat_reader_for_compaction()
.await
}
fn build_scan_input(self, flat_format: bool) -> Result<ScanInput> {
let mut scan_input = ScanInput::new(
self.sst_layer,
ProjectionMapper::all(&self.metadata, flat_format)?,
)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
.with_merge_mode(self.merge_mode)
.with_flat_format(flat_format);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.
@@ -684,9 +676,7 @@ impl CompactionSstReaderBuilder<'_> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
SeqScan::new(scan_input, true)
.build_flat_reader_for_compaction()
.await
Ok(scan_input)
}
}

View File

@@ -335,8 +335,6 @@ impl Compactor for DefaultCompactor {
let region_id = compaction_region.region_id;
let cache_manager = compaction_region.cache_manager.clone();
let storage = compaction_region.region_options.storage.clone();
// TODO: Set flat_format from mito config
let flat_format = false;
let index_options = compaction_region
.current_version
.options
@@ -344,6 +342,9 @@ impl Compactor for DefaultCompactor {
.clone();
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
let flat_format = compaction_region
.engine_config
.enable_experimental_flat_format;
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =

View File

@@ -141,6 +141,10 @@ pub struct MitoConfig {
/// To align with the old behavior, the default value is 0 (no restrictions).
#[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration,
/// Whether to enable experimental flat format.
/// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
pub enable_experimental_flat_format: bool,
}
impl Default for MitoConfig {
@@ -177,6 +181,7 @@ impl Default for MitoConfig {
bloom_filter_index: BloomFilterConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
enable_experimental_flat_format: false,
};
// Adjust buffer and cache size according to system memory if we can.

View File

@@ -689,8 +689,7 @@ impl EngineInner {
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start)
// TODO(yingwen): Enable it after flat format is supported.
.with_flat_format(false);
.with_flat_format(self.config.enable_experimental_flat_format);
#[cfg(feature = "enterprise")]
let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);

View File

@@ -444,6 +444,7 @@ impl RegionFlushTask {
common_telemetry::error!(e; "Failed to compact memtable before flush");
}
let compact_cost = compact_start.elapsed();
flush_metrics.compact_memtable += compact_cost;
let mem_ranges = mem.ranges(None, PredicateGroup::default(), None)?;
let num_mem_ranges = mem_ranges.ranges.len();

View File

@@ -33,6 +33,7 @@ use store_api::storage::{ColumnId, SequenceNumber};
use crate::config::MitoConfig;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
@@ -325,6 +326,7 @@ impl Drop for AllocTracker {
pub(crate) struct MemtableBuilderProvider {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: Arc<MitoConfig>,
compact_dispatcher: Option<Arc<CompactDispatcher>>,
}
impl MemtableBuilderProvider {
@@ -332,9 +334,14 @@ impl MemtableBuilderProvider {
write_buffer_manager: Option<WriteBufferManagerRef>,
config: Arc<MitoConfig>,
) -> Self {
let compact_dispatcher = config
.enable_experimental_flat_format
.then(|| Arc::new(CompactDispatcher::new(config.max_background_compactions)));
Self {
write_buffer_manager,
config,
compact_dispatcher,
}
}
@@ -344,6 +351,22 @@ impl MemtableBuilderProvider {
dedup: bool,
merge_mode: MergeMode,
) -> MemtableBuilderRef {
if self.config.enable_experimental_flat_format {
common_telemetry::info!(
"Overriding memtable config, use BulkMemtable under flat format"
);
return Arc::new(
BulkMemtableBuilder::new(
self.write_buffer_manager.clone(),
!dedup, // append_mode: true if not dedup, false if dedup
merge_mode,
)
// Safety: We create the dispatcher if flat_format is enabled.
.with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()),
);
}
match options {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
@@ -367,6 +390,18 @@ impl MemtableBuilderProvider {
}
fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
if self.config.enable_experimental_flat_format {
return Arc::new(
BulkMemtableBuilder::new(
self.write_buffer_manager.clone(),
!dedup, // append_mode: true if not dedup, false if dedup
merge_mode,
)
// Safety: We create the dispatcher if flat_format is enabled.
.with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()),
);
}
match &self.config.memtable {
MemtableConfig::PartitionTree(config) => {
let mut config = config.clone();

View File

@@ -19,7 +19,7 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::RecordBatch;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
use datatypes::arrow::datatypes::Field;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
@@ -49,6 +49,8 @@ pub struct FlatProjectionMapper {
column_ids: Vec<ColumnId>,
/// Ids and DataTypes of columns of the expected batch.
/// We can use this to check if the batch is compatible with the expected schema.
///
/// It doesn't contain internal columns but always contains the time index column.
batch_schema: Vec<(ColumnId, ConcreteDataType)>,
/// `true` If the original projection is empty.
is_empty_projection: bool,
@@ -188,11 +190,13 @@ impl FlatProjectionMapper {
}
/// Returns ids of columns of the batch that the mapper expects to convert.
#[allow(dead_code)]
pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
&self.batch_schema
}
/// Returns the input arrow schema from sources.
///
/// The merge reader can use this schema.
pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef {
self.input_arrow_schema.clone()
}
@@ -212,7 +216,6 @@ impl FlatProjectionMapper {
/// Converts a flat format [RecordBatch] to a normal [RecordBatch].
///
/// The batch must match the `projection` using to build the mapper.
#[allow(dead_code)]
pub(crate) fn convert(
&self,
batch: &datatypes::arrow::record_batch::RecordBatch,
@@ -223,7 +226,15 @@ impl FlatProjectionMapper {
let mut columns = Vec::with_capacity(self.output_schema.num_columns());
for index in &self.batch_indices {
let array = batch.column(*index).clone();
let mut array = batch.column(*index).clone();
// Casts dictionary values to the target type.
if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
array.data_type()
{
let casted = datatypes::arrow::compute::cast(&array, value_type)
.context(ArrowComputeSnafu)?;
array = casted;
}
let vector = Helper::try_into_vector(array)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
@@ -235,11 +246,22 @@ impl FlatProjectionMapper {
}
/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
///
/// It adds the time index column if it doesn't present in the projection.
pub(crate) fn flat_projected_columns(
metadata: &RegionMetadata,
format_projection: &FormatProjection,
) -> Vec<(ColumnId, ConcreteDataType)> {
let mut schema = vec![None; format_projection.column_id_to_projected_index.len()];
let time_index = metadata.time_index_column();
let num_columns = if format_projection
.column_id_to_projected_index
.contains_key(&time_index.column_id)
{
format_projection.column_id_to_projected_index.len()
} else {
format_projection.column_id_to_projected_index.len() + 1
};
let mut schema = vec![None; num_columns];
for (column_id, index) in &format_projection.column_id_to_projected_index {
// Safety: FormatProjection ensures the id is valid.
schema[*index] = Some((
@@ -252,6 +274,12 @@ pub(crate) fn flat_projected_columns(
.clone(),
));
}
if num_columns != format_projection.column_id_to_projected_index.len() {
schema[num_columns - 1] = Some((
time_index.column_id,
time_index.column_schema.data_type.clone(),
));
}
// Safety: FormatProjection ensures all indices can be unwrapped.
schema.into_iter().map(|id_type| id_type.unwrap()).collect()

View File

@@ -749,7 +749,8 @@ mod tests {
assert_eq!(
[
(1, ConcreteDataType::int64_datatype()),
(4, ConcreteDataType::int64_datatype())
(4, ConcreteDataType::int64_datatype()),
(0, ConcreteDataType::timestamp_millisecond_datatype())
],
mapper.as_flat().unwrap().batch_schema()
);

View File

@@ -732,6 +732,8 @@ pub(crate) struct FormatProjection {
pub(crate) projection_indices: Vec<usize>,
/// Column id to their index in the projected schema (
/// the schema after projection).
///
/// It doesn't contain time index column if it is not present in the projection.
pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
}

View File

@@ -1443,6 +1443,7 @@ parallel_scan_channel_size = 32
max_concurrent_scan_files = 384
allow_stale_entries = false
min_compaction_interval = "0s"
enable_experimental_flat_format = false
[region_engine.mito.index]
aux_path = ""