diff --git a/config/config.md b/config/config.md
index b600ec0de6..d992e5f647 100644
--- a/config/config.md
+++ b/config/config.md
@@ -151,6 +151,7 @@
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). |
+| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.
This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
@@ -543,6 +544,7 @@
| `region_engine.mito.max_concurrent_scan_files` | Integer | `384` | Maximum number of SST files to scan concurrently. |
| `region_engine.mito.allow_stale_entries` | Bool | `false` | Whether to allow stale WAL entries read during replay. |
| `region_engine.mito.min_compaction_interval` | String | `0m` | Minimum time interval between two compactions.
To align with the old behavior, the default value is 0 (no restrictions). |
+| `region_engine.mito.enable_experimental_flat_format` | Bool | `false` | Whether to enable experimental flat format. |
| `region_engine.mito.index` | -- | -- | The options for index in Mito engine. |
| `region_engine.mito.index.aux_path` | String | `""` | Auxiliary directory path for the index in filesystem, used to store intermediate files for
creating the index and staging files for searching the index, defaults to `{data_home}/index_intermediate`.
The default name for this directory is `index_intermediate` for backward compatibility.
This path contains two subdirectories:
- `__intm`: for storing intermediate files used during creating index.
- `staging`: for storing staging files used during searching index. |
| `region_engine.mito.index.staging_size` | String | `2GB` | The max capacity of the staging directory. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index 7e04748059..a7cea4a655 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -497,6 +497,9 @@ allow_stale_entries = false
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
+## Whether to enable experimental flat format.
+enable_experimental_flat_format = false
+
## The options for index in Mito engine.
[region_engine.mito.index]
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index 22dd8105a9..efc38ffab1 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -576,6 +576,9 @@ allow_stale_entries = false
## To align with the old behavior, the default value is 0 (no restrictions).
min_compaction_interval = "0m"
+## Whether to enable experimental flat format.
+enable_experimental_flat_format = false
+
## The options for index in Mito engine.
[region_engine.mito.index]
diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs
index e65bec34c2..a2f2eeeb84 100644
--- a/src/mito2/src/access_layer.rs
+++ b/src/mito2/src/access_layer.rs
@@ -68,6 +68,7 @@ pub struct Metrics {
pub(crate) update_index: Duration,
pub(crate) upload_parquet: Duration,
pub(crate) upload_puffin: Duration,
+ pub(crate) compact_memtable: Duration,
}
impl Metrics {
@@ -79,6 +80,7 @@ impl Metrics {
update_index: Default::default(),
upload_parquet: Default::default(),
upload_puffin: Default::default(),
+ compact_memtable: Default::default(),
}
}
@@ -89,6 +91,7 @@ impl Metrics {
self.update_index += other.update_index;
self.upload_parquet += other.upload_parquet;
self.upload_puffin += other.upload_puffin;
+ self.compact_memtable += other.compact_memtable;
self
}
@@ -110,6 +113,11 @@ impl Metrics {
FLUSH_ELAPSED
.with_label_values(&["upload_puffin"])
.observe(self.upload_puffin.as_secs_f64());
+ if !self.compact_memtable.is_zero() {
+ FLUSH_ELAPSED
+ .with_label_values(&["compact_memtable"])
+ .observe(self.upload_puffin.as_secs_f64());
+ }
}
WriteType::Compaction => {
COMPACTION_STAGE_ELAPSED
diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs
index 99eb8e7056..45b568fc4e 100644
--- a/src/mito2/src/compaction.rs
+++ b/src/mito2/src/compaction.rs
@@ -638,25 +638,7 @@ struct CompactionSstReaderBuilder<'a> {
impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedBatchReader] that reads all SST files and yields batches in primary key order.
async fn build_sst_reader(self) -> Result {
- let mut scan_input = ScanInput::new(
- self.sst_layer,
- ProjectionMapper::all(&self.metadata, false)?,
- )
- .with_files(self.inputs.to_vec())
- .with_append_mode(self.append_mode)
- // We use special cache strategy for compaction.
- .with_cache(CacheStrategy::Compaction(self.cache))
- .with_filter_deleted(self.filter_deleted)
- // We ignore file not found error during compaction.
- .with_ignore_file_not_found(true)
- .with_merge_mode(self.merge_mode);
-
- // This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
- // by converting time ranges into predicate.
- if let Some(time_range) = self.time_range {
- scan_input =
- scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
- }
+ let scan_input = self.build_scan_input(false)?;
SeqScan::new(scan_input, true)
.build_reader_for_compaction()
@@ -665,17 +647,27 @@ impl CompactionSstReaderBuilder<'_> {
/// Builds [BoxedRecordBatchStream] that reads all SST files and yields batches in flat format for compaction.
async fn build_flat_sst_reader(self) -> Result {
- let mut scan_input =
- ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata, true)?)
- .with_files(self.inputs.to_vec())
- .with_append_mode(self.append_mode)
- // We use special cache strategy for compaction.
- .with_cache(CacheStrategy::Compaction(self.cache))
- .with_filter_deleted(self.filter_deleted)
- // We ignore file not found error during compaction.
- .with_ignore_file_not_found(true)
- .with_merge_mode(self.merge_mode)
- .with_flat_format(true);
+ let scan_input = self.build_scan_input(true)?;
+
+ SeqScan::new(scan_input, true)
+ .build_flat_reader_for_compaction()
+ .await
+ }
+
+ fn build_scan_input(self, flat_format: bool) -> Result {
+ let mut scan_input = ScanInput::new(
+ self.sst_layer,
+ ProjectionMapper::all(&self.metadata, flat_format)?,
+ )
+ .with_files(self.inputs.to_vec())
+ .with_append_mode(self.append_mode)
+ // We use special cache strategy for compaction.
+ .with_cache(CacheStrategy::Compaction(self.cache))
+ .with_filter_deleted(self.filter_deleted)
+ // We ignore file not found error during compaction.
+ .with_ignore_file_not_found(true)
+ .with_merge_mode(self.merge_mode)
+ .with_flat_format(flat_format);
// This serves as a workaround of https://github.com/GreptimeTeam/greptimedb/issues/3944
// by converting time ranges into predicate.
@@ -684,9 +676,7 @@ impl CompactionSstReaderBuilder<'_> {
scan_input.with_predicate(time_range_to_predicate(time_range, &self.metadata)?);
}
- SeqScan::new(scan_input, true)
- .build_flat_reader_for_compaction()
- .await
+ Ok(scan_input)
}
}
diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs
index 4fe7cae839..e28d29c152 100644
--- a/src/mito2/src/compaction/compactor.rs
+++ b/src/mito2/src/compaction/compactor.rs
@@ -335,8 +335,6 @@ impl Compactor for DefaultCompactor {
let region_id = compaction_region.region_id;
let cache_manager = compaction_region.cache_manager.clone();
let storage = compaction_region.region_options.storage.clone();
- // TODO: Set flat_format from mito config
- let flat_format = false;
let index_options = compaction_region
.current_version
.options
@@ -344,6 +342,9 @@ impl Compactor for DefaultCompactor {
.clone();
let append_mode = compaction_region.current_version.options.append_mode;
let merge_mode = compaction_region.current_version.options.merge_mode();
+ let flat_format = compaction_region
+ .engine_config
+ .enable_experimental_flat_format;
let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
let bloom_filter_index_config =
diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs
index 9a88ba96f8..966edfdf00 100644
--- a/src/mito2/src/config.rs
+++ b/src/mito2/src/config.rs
@@ -141,6 +141,10 @@ pub struct MitoConfig {
/// To align with the old behavior, the default value is 0 (no restrictions).
#[serde(with = "humantime_serde")]
pub min_compaction_interval: Duration,
+
+ /// Whether to enable experimental flat format.
+ /// When enabled, forces using BulkMemtable and BulkMemtableBuilder.
+ pub enable_experimental_flat_format: bool,
}
impl Default for MitoConfig {
@@ -177,6 +181,7 @@ impl Default for MitoConfig {
bloom_filter_index: BloomFilterConfig::default(),
memtable: MemtableConfig::default(),
min_compaction_interval: Duration::from_secs(0),
+ enable_experimental_flat_format: false,
};
// Adjust buffer and cache size according to system memory if we can.
diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs
index 7b325ab309..cef9c6853c 100644
--- a/src/mito2/src/engine.rs
+++ b/src/mito2/src/engine.rs
@@ -689,8 +689,7 @@ impl EngineInner {
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start)
- // TODO(yingwen): Enable it after flat format is supported.
- .with_flat_format(false);
+ .with_flat_format(self.config.enable_experimental_flat_format);
#[cfg(feature = "enterprise")]
let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);
diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs
index 3e2099beea..010fb88946 100644
--- a/src/mito2/src/flush.rs
+++ b/src/mito2/src/flush.rs
@@ -444,6 +444,7 @@ impl RegionFlushTask {
common_telemetry::error!(e; "Failed to compact memtable before flush");
}
let compact_cost = compact_start.elapsed();
+ flush_metrics.compact_memtable += compact_cost;
let mem_ranges = mem.ranges(None, PredicateGroup::default(), None)?;
let num_mem_ranges = mem_ranges.ranges.len();
diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs
index 434c23650d..77b7d54f99 100644
--- a/src/mito2/src/memtable.rs
+++ b/src/mito2/src/memtable.rs
@@ -33,6 +33,7 @@ use store_api::storage::{ColumnId, SequenceNumber};
use crate::config::MitoConfig;
use crate::error::{Result, UnsupportedOperationSnafu};
use crate::flush::WriteBufferManagerRef;
+use crate::memtable::bulk::{BulkMemtableBuilder, CompactDispatcher};
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::metrics::WRITE_BUFFER_BYTES;
@@ -325,6 +326,7 @@ impl Drop for AllocTracker {
pub(crate) struct MemtableBuilderProvider {
write_buffer_manager: Option,
config: Arc,
+ compact_dispatcher: Option>,
}
impl MemtableBuilderProvider {
@@ -332,9 +334,14 @@ impl MemtableBuilderProvider {
write_buffer_manager: Option,
config: Arc,
) -> Self {
+ let compact_dispatcher = config
+ .enable_experimental_flat_format
+ .then(|| Arc::new(CompactDispatcher::new(config.max_background_compactions)));
+
Self {
write_buffer_manager,
config,
+ compact_dispatcher,
}
}
@@ -344,6 +351,22 @@ impl MemtableBuilderProvider {
dedup: bool,
merge_mode: MergeMode,
) -> MemtableBuilderRef {
+ if self.config.enable_experimental_flat_format {
+ common_telemetry::info!(
+ "Overriding memtable config, use BulkMemtable under flat format"
+ );
+
+ return Arc::new(
+ BulkMemtableBuilder::new(
+ self.write_buffer_manager.clone(),
+ !dedup, // append_mode: true if not dedup, false if dedup
+ merge_mode,
+ )
+ // Safety: We create the dispatcher if flat_format is enabled.
+ .with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()),
+ );
+ }
+
match options {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
@@ -367,6 +390,18 @@ impl MemtableBuilderProvider {
}
fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
+ if self.config.enable_experimental_flat_format {
+ return Arc::new(
+ BulkMemtableBuilder::new(
+ self.write_buffer_manager.clone(),
+ !dedup, // append_mode: true if not dedup, false if dedup
+ merge_mode,
+ )
+ // Safety: We create the dispatcher if flat_format is enabled.
+ .with_compact_dispatcher(self.compact_dispatcher.clone().unwrap()),
+ );
+ }
+
match &self.config.memtable {
MemtableConfig::PartitionTree(config) => {
let mut config = config.clone();
diff --git a/src/mito2/src/read/flat_projection.rs b/src/mito2/src/read/flat_projection.rs
index e44d5335ef..69e82f33d3 100644
--- a/src/mito2/src/read/flat_projection.rs
+++ b/src/mito2/src/read/flat_projection.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::RecordBatch;
-use common_recordbatch::error::ExternalSnafu;
+use common_recordbatch::error::{ArrowComputeSnafu, ExternalSnafu};
use datatypes::arrow::datatypes::Field;
use datatypes::prelude::{ConcreteDataType, DataType};
use datatypes::schema::{Schema, SchemaRef};
@@ -49,6 +49,8 @@ pub struct FlatProjectionMapper {
column_ids: Vec,
/// Ids and DataTypes of columns of the expected batch.
/// We can use this to check if the batch is compatible with the expected schema.
+ ///
+ /// It doesn't contain internal columns but always contains the time index column.
batch_schema: Vec<(ColumnId, ConcreteDataType)>,
/// `true` If the original projection is empty.
is_empty_projection: bool,
@@ -188,11 +190,13 @@ impl FlatProjectionMapper {
}
/// Returns ids of columns of the batch that the mapper expects to convert.
- #[allow(dead_code)]
pub(crate) fn batch_schema(&self) -> &[(ColumnId, ConcreteDataType)] {
&self.batch_schema
}
+ /// Returns the input arrow schema from sources.
+ ///
+ /// The merge reader can use this schema.
pub(crate) fn input_arrow_schema(&self) -> datatypes::arrow::datatypes::SchemaRef {
self.input_arrow_schema.clone()
}
@@ -212,7 +216,6 @@ impl FlatProjectionMapper {
/// Converts a flat format [RecordBatch] to a normal [RecordBatch].
///
/// The batch must match the `projection` using to build the mapper.
- #[allow(dead_code)]
pub(crate) fn convert(
&self,
batch: &datatypes::arrow::record_batch::RecordBatch,
@@ -223,7 +226,15 @@ impl FlatProjectionMapper {
let mut columns = Vec::with_capacity(self.output_schema.num_columns());
for index in &self.batch_indices {
- let array = batch.column(*index).clone();
+ let mut array = batch.column(*index).clone();
+ // Casts dictionary values to the target type.
+ if let datatypes::arrow::datatypes::DataType::Dictionary(_key_type, value_type) =
+ array.data_type()
+ {
+ let casted = datatypes::arrow::compute::cast(&array, value_type)
+ .context(ArrowComputeSnafu)?;
+ array = casted;
+ }
let vector = Helper::try_into_vector(array)
.map_err(BoxedError::new)
.context(ExternalSnafu)?;
@@ -235,11 +246,22 @@ impl FlatProjectionMapper {
}
/// Returns ids and datatypes of columns of the output batch after applying the `projection`.
+///
+/// It adds the time index column if it doesn't present in the projection.
pub(crate) fn flat_projected_columns(
metadata: &RegionMetadata,
format_projection: &FormatProjection,
) -> Vec<(ColumnId, ConcreteDataType)> {
- let mut schema = vec![None; format_projection.column_id_to_projected_index.len()];
+ let time_index = metadata.time_index_column();
+ let num_columns = if format_projection
+ .column_id_to_projected_index
+ .contains_key(&time_index.column_id)
+ {
+ format_projection.column_id_to_projected_index.len()
+ } else {
+ format_projection.column_id_to_projected_index.len() + 1
+ };
+ let mut schema = vec![None; num_columns];
for (column_id, index) in &format_projection.column_id_to_projected_index {
// Safety: FormatProjection ensures the id is valid.
schema[*index] = Some((
@@ -252,6 +274,12 @@ pub(crate) fn flat_projected_columns(
.clone(),
));
}
+ if num_columns != format_projection.column_id_to_projected_index.len() {
+ schema[num_columns - 1] = Some((
+ time_index.column_id,
+ time_index.column_schema.data_type.clone(),
+ ));
+ }
// Safety: FormatProjection ensures all indices can be unwrapped.
schema.into_iter().map(|id_type| id_type.unwrap()).collect()
diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs
index aef5180e55..adc0290c23 100644
--- a/src/mito2/src/read/projection.rs
+++ b/src/mito2/src/read/projection.rs
@@ -749,7 +749,8 @@ mod tests {
assert_eq!(
[
(1, ConcreteDataType::int64_datatype()),
- (4, ConcreteDataType::int64_datatype())
+ (4, ConcreteDataType::int64_datatype()),
+ (0, ConcreteDataType::timestamp_millisecond_datatype())
],
mapper.as_flat().unwrap().batch_schema()
);
diff --git a/src/mito2/src/sst/parquet/format.rs b/src/mito2/src/sst/parquet/format.rs
index 626b7baa4a..9943ec5e62 100644
--- a/src/mito2/src/sst/parquet/format.rs
+++ b/src/mito2/src/sst/parquet/format.rs
@@ -732,6 +732,8 @@ pub(crate) struct FormatProjection {
pub(crate) projection_indices: Vec,
/// Column id to their index in the projected schema (
/// the schema after projection).
+ ///
+ /// It doesn't contain time index column if it is not present in the projection.
pub(crate) column_id_to_projected_index: HashMap,
}
diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs
index 8e965dd9b7..410d19547f 100644
--- a/tests-integration/tests/http.rs
+++ b/tests-integration/tests/http.rs
@@ -1443,6 +1443,7 @@ parallel_scan_channel_size = 32
max_concurrent_scan_files = 384
allow_stale_entries = false
min_compaction_interval = "0s"
+enable_experimental_flat_format = false
[region_engine.mito.index]
aux_path = ""