diff --git a/config/config.md b/config/config.md
index dfb4db1cf1..35d78a0788 100644
--- a/config/config.md
+++ b/config/config.md
@@ -193,11 +193,6 @@
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the bloom filter on compaction.
- `auto`: automatically (default)
- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the bloom filter on query
- `auto`: automatically (default)
- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for bloom filter creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold |
-| `region_engine.mito.memtable` | -- | -- | -- |
-| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) |
-| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. |
-| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.
Only available for `partition_tree` memtable. |
-| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.
Only available for `partition_tree` memtable. |
| `region_engine.file` | -- | -- | Enable the file engine. |
| `region_engine.metric` | -- | -- | Metric engine options. |
| `region_engine.metric.sparse_primary_key_encoding` | Bool | `true` | Whether to use sparse primary key encoding. |
@@ -585,11 +580,6 @@
| `region_engine.mito.bloom_filter_index.create_on_compaction` | String | `auto` | Whether to create the index on compaction.
- `auto`: automatically (default)
- `disable`: never |
| `region_engine.mito.bloom_filter_index.apply_on_query` | String | `auto` | Whether to apply the index on query
- `auto`: automatically (default)
- `disable`: never |
| `region_engine.mito.bloom_filter_index.mem_threshold_on_create` | String | `auto` | Memory threshold for the index creation.
- `auto`: automatically determine the threshold based on the system memory size (default)
- `unlimited`: no memory limit
- `[size]` e.g. `64MB`: fixed memory threshold |
-| `region_engine.mito.memtable` | -- | -- | -- |
-| `region_engine.mito.memtable.type` | String | `time_series` | Memtable type.
- `time_series`: time-series memtable
- `partition_tree`: partition tree memtable (experimental) |
-| `region_engine.mito.memtable.index_max_keys_per_shard` | Integer | `8192` | The max number of keys in one shard.
Only available for `partition_tree` memtable. |
-| `region_engine.mito.memtable.data_freeze_threshold` | Integer | `32768` | The max rows of data inside the actively writing buffer in one shard.
Only available for `partition_tree` memtable. |
-| `region_engine.mito.memtable.fork_dictionary_bytes` | String | `1GiB` | Max dictionary bytes.
Only available for `partition_tree` memtable. |
| `region_engine.mito.gc` | -- | -- | -- |
| `region_engine.mito.gc.enable` | Bool | `false` | Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur |
| `region_engine.mito.gc.lingering_time` | String | `1m` | Lingering time before deleting files.
Should be long enough to allow long running queries to finish.
If set to None, then unused files will be deleted immediately. |
diff --git a/config/datanode.example.toml b/config/datanode.example.toml
index fbe205c17a..170045a090 100644
--- a/config/datanode.example.toml
+++ b/config/datanode.example.toml
@@ -646,24 +646,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
-[region_engine.mito.memtable]
-## Memtable type.
-## - `time_series`: time-series memtable
-## - `partition_tree`: partition tree memtable (experimental)
-type = "time_series"
-
-## The max number of keys in one shard.
-## Only available for `partition_tree` memtable.
-index_max_keys_per_shard = 8192
-
-## The max rows of data inside the actively writing buffer in one shard.
-## Only available for `partition_tree` memtable.
-data_freeze_threshold = 32768
-
-## Max dictionary bytes.
-## Only available for `partition_tree` memtable.
-fork_dictionary_bytes = "1GiB"
-
[region_engine.mito.gc]
## Whether GC is enabled. Need to be the same with metasrv's `gc.enable` or unexpected behavior will occur
enable = false
diff --git a/config/standalone.example.toml b/config/standalone.example.toml
index aa745bb6ba..5e790749fe 100644
--- a/config/standalone.example.toml
+++ b/config/standalone.example.toml
@@ -762,24 +762,6 @@ apply_on_query = "auto"
## - `[size]` e.g. `64MB`: fixed memory threshold
mem_threshold_on_create = "auto"
-[region_engine.mito.memtable]
-## Memtable type.
-## - `time_series`: time-series memtable
-## - `partition_tree`: partition tree memtable (experimental)
-type = "time_series"
-
-## The max number of keys in one shard.
-## Only available for `partition_tree` memtable.
-index_max_keys_per_shard = 8192
-
-## The max rows of data inside the actively writing buffer in one shard.
-## Only available for `partition_tree` memtable.
-data_freeze_threshold = 32768
-
-## Max dictionary bytes.
-## Only available for `partition_tree` memtable.
-fork_dictionary_bytes = "1GiB"
-
[[region_engine]]
## Enable the file engine.
[region_engine.file]
diff --git a/src/metric-engine/src/engine/bulk_insert.rs b/src/metric-engine/src/engine/bulk_insert.rs
index 24c9e7934c..6d33d39149 100644
--- a/src/metric-engine/src/engine/bulk_insert.rs
+++ b/src/metric-engine/src/engine/bulk_insert.rs
@@ -14,20 +14,14 @@
use std::collections::HashSet;
-use api::v1::{ArrowIpc, ColumnDataType, SemanticType};
+use api::v1::{ArrowIpc, SemanticType};
use bytes::Bytes;
-use common_error::ext::ErrorExt;
-use common_error::status_code::StatusCode;
use common_grpc::flight::{FlightEncoder, FlightMessage};
-use common_query::prelude::{greptime_timestamp, greptime_value};
-use datatypes::arrow::array::{Array, Float64Array, StringArray, TimestampMillisecondArray};
use datatypes::arrow::record_batch::RecordBatch;
use snafu::{OptionExt, ensure};
use store_api::codec::PrimaryKeyEncoding;
use store_api::metadata::RegionMetadataRef;
-use store_api::region_request::{
- AffectedRows, RegionBulkInsertsRequest, RegionPutRequest, RegionRequest,
-};
+use store_api::region_request::{AffectedRows, RegionBulkInsertsRequest, RegionRequest};
use store_api::storage::RegionId;
use crate::batch_modifier::{TagColumnInfo, modify_batch_sparse};
@@ -42,8 +36,7 @@ impl MetricEngineInner {
/// **Logical region path:** The request payload is a logical `RecordBatch`
/// (timestamp, value and tag columns). It is transformed to physical format
/// via `modify_batch_sparse`, encoded to Arrow IPC, and forwarded as a
- /// `BulkInserts` request to the data region. If mito reports
- /// `StatusCode::Unsupported`, the request is transparently retried as a `Put`.
+ /// `BulkInserts` request to the data region.
///
/// **Physical region path:** The request payload is already in physical format
/// (produced by the batcher's `flush_batch_physical`). It is forwarded directly
@@ -134,27 +127,9 @@ impl MetricEngineInner {
},
partition_expr_version,
};
- match self
- .data_region
+ self.data_region
.write_data(data_region_id, RegionRequest::BulkInserts(request))
.await
- {
- Ok(affected_rows) => Ok(affected_rows),
- Err(err) if err.status_code() == StatusCode::Unsupported => {
- // todo(hl): fallback path for PartitionTreeMemtable, remove this once we remove it
- let rows = record_batch_to_rows(&batch, region_id)?;
- self.put_region(
- region_id,
- RegionPutRequest {
- rows,
- hint: None,
- partition_expr_version,
- },
- )
- .await
- }
- Err(err) => Err(err),
- }
}
fn resolve_tag_columns_from_metadata(
@@ -214,174 +189,6 @@ impl MetricEngineInner {
}
}
-fn record_batch_to_rows(batch: &RecordBatch, logical_region_id: RegionId) -> Result {
- let schema_ref = batch.schema();
- let fields = schema_ref.fields();
-
- let mut ts_idx = None;
- let mut val_idx = None;
- let mut tag_indices = Vec::new();
-
- for (idx, field) in fields.iter().enumerate() {
- if field.name() == greptime_timestamp() {
- ts_idx = Some(idx);
- if !matches!(
- field.data_type(),
- datatypes::arrow::datatypes::DataType::Timestamp(
- datatypes::arrow::datatypes::TimeUnit::Millisecond,
- _
- )
- ) {
- return error::UnexpectedRequestSnafu {
- reason: format!(
- "Timestamp column '{}' in region {:?} has incompatible type: {:?}",
- field.name(),
- logical_region_id,
- field.data_type()
- ),
- }
- .fail();
- }
- } else if field.name() == greptime_value() {
- val_idx = Some(idx);
- if !matches!(
- field.data_type(),
- datatypes::arrow::datatypes::DataType::Float64
- ) {
- return error::UnexpectedRequestSnafu {
- reason: format!(
- "Value column '{}' in region {:?} has incompatible type: {:?}",
- field.name(),
- logical_region_id,
- field.data_type()
- ),
- }
- .fail();
- }
- } else {
- if !matches!(
- field.data_type(),
- datatypes::arrow::datatypes::DataType::Utf8
- ) {
- return error::UnexpectedRequestSnafu {
- reason: format!(
- "Tag column '{}' in region {:?} must be Utf8, found: {:?}",
- field.name(),
- logical_region_id,
- field.data_type()
- ),
- }
- .fail();
- }
- tag_indices.push(idx);
- }
- }
-
- let ts_idx = ts_idx.with_context(|| error::UnexpectedRequestSnafu {
- reason: format!(
- "Timestamp column '{}' not found in RecordBatch for region {:?}",
- greptime_timestamp(),
- logical_region_id
- ),
- })?;
- let val_idx = val_idx.with_context(|| error::UnexpectedRequestSnafu {
- reason: format!(
- "Value column '{}' not found in RecordBatch for region {:?}",
- greptime_value(),
- logical_region_id
- ),
- })?;
-
- let mut schema = Vec::with_capacity(2 + tag_indices.len());
- schema.push(api::v1::ColumnSchema {
- column_name: greptime_timestamp().to_string(),
- datatype: ColumnDataType::TimestampMillisecond as i32,
- semantic_type: SemanticType::Timestamp as i32,
- datatype_extension: None,
- options: None,
- });
- schema.push(api::v1::ColumnSchema {
- column_name: greptime_value().to_string(),
- datatype: ColumnDataType::Float64 as i32,
- semantic_type: SemanticType::Field as i32,
- datatype_extension: None,
- options: None,
- });
- for &idx in &tag_indices {
- let field = &fields[idx];
- schema.push(api::v1::ColumnSchema {
- column_name: field.name().clone(),
- datatype: ColumnDataType::String as i32,
- semantic_type: SemanticType::Tag as i32,
- datatype_extension: None,
- options: None,
- });
- }
-
- let ts_array = batch
- .column(ts_idx)
- .as_any()
- .downcast_ref::()
- .expect("validated as TimestampMillisecond");
- let val_array = batch
- .column(val_idx)
- .as_any()
- .downcast_ref::()
- .expect("validated as Float64");
- let tag_arrays: Vec<&StringArray> = tag_indices
- .iter()
- .map(|&idx| {
- batch
- .column(idx)
- .as_any()
- .downcast_ref::()
- .expect("validated as Utf8")
- })
- .collect();
-
- let num_rows = batch.num_rows();
- let mut rows = Vec::with_capacity(num_rows);
- for row_idx in 0..num_rows {
- let mut values = Vec::with_capacity(2 + tag_arrays.len());
-
- if ts_array.is_null(row_idx) {
- values.push(api::v1::Value { value_data: None });
- } else {
- values.push(api::v1::Value {
- value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(
- ts_array.value(row_idx),
- )),
- });
- }
-
- if val_array.is_null(row_idx) {
- values.push(api::v1::Value { value_data: None });
- } else {
- values.push(api::v1::Value {
- value_data: Some(api::v1::value::ValueData::F64Value(
- val_array.value(row_idx),
- )),
- });
- }
-
- for arr in &tag_arrays {
- if arr.is_null(row_idx) {
- values.push(api::v1::Value { value_data: None });
- } else {
- values.push(api::v1::Value {
- value_data: Some(api::v1::value::ValueData::StringValue(
- arr.value(row_idx).to_string(),
- )),
- });
- }
- }
-
- rows.push(api::v1::Row { values });
- }
-
- Ok(api::v1::Rows { schema, rows })
-}
-
fn record_batch_to_ipc(record_batch: &RecordBatch) -> Result<(Bytes, Bytes, Bytes)> {
let mut encoder = FlightEncoder::default();
let schema = encoder.encode_schema(record_batch.schema().as_ref());
@@ -422,7 +229,7 @@ mod tests {
use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
use datatypes::arrow::record_batch::RecordBatch;
use mito2::config::MitoConfig;
- use store_api::metric_engine_consts::MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING;
+ use store_api::metric_engine_consts::PRIMARY_KEY_ENCODING;
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionBulkInsertsRequest, RegionPutRequest, RegionRequest};
@@ -483,10 +290,7 @@ mod tests {
env.create_physical_region(
physical_region_id,
&TestEnv::default_table_dir(),
- vec![(
- MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
- "dense".to_string(),
- )],
+ vec![(PRIMARY_KEY_ENCODING.to_string(), "dense".to_string())],
)
.await;
@@ -810,65 +614,4 @@ mod tests {
assert_eq!(put_output, bulk_output);
}
-
- #[test]
- fn test_record_batch_to_rows_with_null_values() {
- use datatypes::arrow::array::{Float64Array, StringArray, TimestampMillisecondArray};
- use datatypes::arrow::datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit};
- use datatypes::arrow::record_batch::RecordBatch;
- use store_api::storage::RegionId;
-
- use crate::engine::bulk_insert::record_batch_to_rows;
-
- let schema = Arc::new(ArrowSchema::new(vec![
- Field::new(
- greptime_timestamp(),
- DataType::Timestamp(TimeUnit::Millisecond, None),
- true,
- ),
- Field::new(greptime_value(), DataType::Float64, true),
- Field::new("job", DataType::Utf8, true),
- Field::new("host", DataType::Utf8, true),
- ]));
-
- let ts_array = TimestampMillisecondArray::from(vec![Some(1000), None, Some(3000)]);
- let val_array = Float64Array::from(vec![Some(1.0), Some(2.0), None]);
- let job_array = StringArray::from(vec![Some("job1"), None, Some("job3")]);
- let host_array = StringArray::from(vec![None, Some("host2"), Some("host3")]);
-
- let batch = RecordBatch::try_new(
- schema,
- vec![
- Arc::new(ts_array),
- Arc::new(val_array),
- Arc::new(job_array),
- Arc::new(host_array),
- ],
- )
- .unwrap();
-
- let region_id = RegionId::new(1, 1);
- let rows = record_batch_to_rows(&batch, region_id).unwrap();
-
- assert_eq!(rows.rows.len(), 3);
- assert_eq!(rows.schema.len(), 4);
-
- // Row 0: all non-null except host
- assert!(rows.rows[0].values[0].value_data.is_some());
- assert!(rows.rows[0].values[1].value_data.is_some());
- assert!(rows.rows[0].values[2].value_data.is_some());
- assert!(rows.rows[0].values[3].value_data.is_none());
-
- // Row 1: null timestamp, null job
- assert!(rows.rows[1].values[0].value_data.is_none());
- assert!(rows.rows[1].values[1].value_data.is_some());
- assert!(rows.rows[1].values[2].value_data.is_none());
- assert!(rows.rows[1].values[3].value_data.is_some());
-
- // Row 2: null value
- assert!(rows.rows[2].values[0].value_data.is_some());
- assert!(rows.rows[2].values[1].value_data.is_none());
- assert!(rows.rows[2].values[2].value_data.is_some());
- assert!(rows.rows[2].values[3].value_data.is_some());
- }
}
diff --git a/src/metric-engine/src/engine/options.rs b/src/metric-engine/src/engine/options.rs
index 232d3e93c5..7008325bc5 100644
--- a/src/metric-engine/src/engine/options.rs
+++ b/src/metric-engine/src/engine/options.rs
@@ -22,9 +22,14 @@ use store_api::metric_engine_consts::{
METRIC_ENGINE_INDEX_SKIPPING_INDEX_FALSE_POSITIVE_RATE_OPTION_DEFAULT,
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION,
METRIC_ENGINE_INDEX_SKIPPING_INDEX_GRANULARITY_OPTION_DEFAULT, METRIC_ENGINE_INDEX_TYPE_OPTION,
+ PRIMARY_KEY_ENCODING,
};
use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, TWCS_TIME_WINDOW};
+/// Prefix for legacy `memtable.partition_tree.*` option keys. These keys are
+/// silently dropped by the metric engine; the partition tree memtable is gone.
+const LEGACY_PARTITION_TREE_OPTION_PREFIX: &str = "memtable.partition_tree.";
+
use crate::error::{Error, ParseRegionOptionsSnafu, Result};
/// The empirical value for the seg row count of the metric data region.
@@ -66,16 +71,27 @@ pub fn set_data_region_options(
"index.inverted_index.segment_row_count".to_string(),
SEG_ROW_COUNT_FOR_DATA_REGION.to_string(),
);
- // Set memtable options for the data region.
- options.insert("memtable.type".to_string(), "partition_tree".to_string());
- if sparse_primary_key_encoding_if_absent
- && !options.contains_key(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING)
- {
- options.insert(
- MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
- "sparse".to_string(),
- );
+
+ // Extract primary key encoding from the legacy nested key before dropping
+ // all `memtable.partition_tree.*` keys.
+ let legacy_encoding = options.remove(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING);
+ options.retain(|k, _| !k.starts_with(LEGACY_PARTITION_TREE_OPTION_PREFIX));
+
+ // Set memtable options for the data region. Bulk memtable produces
+ // flat-encoded ranges, so the SST format must be flat to match.
+ options.insert("memtable.type".to_string(), "bulk".to_string());
+ options.insert("sst_format".to_string(), "flat".to_string());
+
+ // Decide the top-level primary key encoding: caller-supplied top-level key wins,
+ // then extracted legacy value, then the `sparse` default if requested.
+ if !options.contains_key(PRIMARY_KEY_ENCODING) {
+ if let Some(encoding) = legacy_encoding {
+ options.insert(PRIMARY_KEY_ENCODING.to_string(), encoding);
+ } else if sparse_primary_key_encoding_if_absent {
+ options.insert(PRIMARY_KEY_ENCODING.to_string(), "sparse".to_string());
+ }
}
+
if !options.contains_key(TWCS_TIME_WINDOW) {
options.insert(
COMPACTION_TYPE.to_string(),
@@ -213,6 +229,8 @@ mod tests {
let mut options = HashMap::new();
set_data_region_options(&mut options, false);
+ assert_eq!(options.get("memtable.type"), Some(&"bulk".to_string()));
+ assert_eq!(options.get("sst_format"), Some(&"flat".to_string()));
assert_eq!(
options.get(COMPACTION_TYPE),
Some(&COMPACTION_TYPE_TWCS.to_string())
@@ -220,6 +238,58 @@ mod tests {
assert_eq!(options.get(TWCS_TIME_WINDOW), Some(&"1d".to_string()));
}
+ #[test]
+ fn test_set_data_region_options_sparse_primary_key_encoding() {
+ let mut options = HashMap::new();
+ set_data_region_options(&mut options, true);
+
+ assert_eq!(options.get("memtable.type"), Some(&"bulk".to_string()));
+ assert_eq!(options.get("sst_format"), Some(&"flat".to_string()));
+ assert_eq!(
+ options.get(PRIMARY_KEY_ENCODING),
+ Some(&"sparse".to_string())
+ );
+ assert!(!options.contains_key(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING));
+ }
+
+ #[test]
+ fn test_set_data_region_options_migrates_legacy_partition_tree_options() {
+ let mut options = HashMap::new();
+ options.insert("memtable.type".to_string(), "partition_tree".to_string());
+ options.insert(
+ MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
+ "sparse".to_string(),
+ );
+ options.insert(
+ "memtable.partition_tree.index_max_keys_per_shard".to_string(),
+ "2048".to_string(),
+ );
+ set_data_region_options(&mut options, false);
+
+ assert_eq!(options.get("memtable.type"), Some(&"bulk".to_string()));
+ assert_eq!(options.get("sst_format"), Some(&"flat".to_string()));
+ assert_eq!(
+ options.get(PRIMARY_KEY_ENCODING),
+ Some(&"sparse".to_string())
+ );
+ // All legacy partition-tree-specific keys should be stripped.
+ assert!(!options.contains_key(MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING));
+ assert!(!options.contains_key("memtable.partition_tree.index_max_keys_per_shard"));
+ }
+
+ #[test]
+ fn test_set_data_region_options_preserves_existing_top_level_encoding() {
+ let mut options = HashMap::new();
+ options.insert(PRIMARY_KEY_ENCODING.to_string(), "dense".to_string());
+ // Sparse flag is on but caller already specified dense.
+ set_data_region_options(&mut options, true);
+
+ assert_eq!(
+ options.get(PRIMARY_KEY_ENCODING),
+ Some(&"dense".to_string())
+ );
+ }
+
#[test]
fn test_set_data_region_options_respects_user_compaction_time_window() {
// Test that user-specified time window is preserved
diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs
index 07adfae120..438a8fcad3 100644
--- a/src/metric-engine/src/engine/put.rs
+++ b/src/metric-engine/src/engine/put.rs
@@ -709,8 +709,7 @@ mod tests {
use partition::expr::col;
use store_api::metadata::ColumnMetadata;
use store_api::metric_engine_consts::{
- DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME,
- MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING,
+ DATA_SCHEMA_TABLE_ID_COLUMN_NAME, DATA_SCHEMA_TSID_COLUMN_NAME, PRIMARY_KEY_ENCODING,
};
use store_api::path_utils::table_dir;
use store_api::region_engine::RegionEngine;
@@ -1282,10 +1281,7 @@ mod tests {
run_batch_write_with_schema_variants(
&env,
physical_region_id,
- vec![(
- MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
- "sparse".to_string(),
- )],
+ vec![(PRIMARY_KEY_ENCODING.to_string(), "sparse".to_string())],
true,
)
.await;
@@ -1299,10 +1295,7 @@ mod tests {
run_batch_write_with_schema_variants(
&env,
physical_region_id,
- vec![(
- MEMTABLE_PARTITION_TREE_PRIMARY_KEY_ENCODING.to_string(),
- "dense".to_string(),
- )],
+ vec![(PRIMARY_KEY_ENCODING.to_string(), "dense".to_string())],
false,
)
.await;
diff --git a/src/mito-codec/benches/bench_primary_key_filter.rs b/src/mito-codec/benches/bench_primary_key_filter.rs
index 528c374761..ee06ac28e1 100644
--- a/src/mito-codec/benches/bench_primary_key_filter.rs
+++ b/src/mito-codec/benches/bench_primary_key_filter.rs
@@ -247,12 +247,12 @@ fn bench_primary_key_filter(c: &mut Criterion) {
let dense_pk = encode_dense_pk(&metadata, &row);
let dense_codec = DensePrimaryKeyCodec::new(&metadata);
- let mut dense_fast = dense_codec.primary_key_filter(&metadata, filters.clone(), false);
+ let mut dense_fast = dense_codec.primary_key_filter(&metadata, filters.clone());
let mut dense_offsets = Vec::new();
let sparse_pk = encode_sparse_pk(&metadata, &row);
let sparse_codec = SparsePrimaryKeyCodec::new(&metadata);
- let mut sparse_fast = sparse_codec.primary_key_filter(&metadata, filters.clone(), false);
+ let mut sparse_fast = sparse_codec.primary_key_filter(&metadata, filters.clone());
let mut sparse_offsets = SparseOffsetsCache::new();
let mut group = c.benchmark_group(format!("primary_key_filter/{case_name}"));
diff --git a/src/mito-codec/src/primary_key_filter.rs b/src/mito-codec/src/primary_key_filter.rs
index 2450a6e44a..f6f9d78b6b 100644
--- a/src/mito-codec/src/primary_key_filter.rs
+++ b/src/mito-codec/src/primary_key_filter.rs
@@ -21,7 +21,6 @@ use datatypes::value::Value;
use memcomparable::Serializer;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
-use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::storage::ColumnId;
use crate::error::{EvaluateFilterSnafu, Result};
@@ -29,11 +28,6 @@ use crate::row_converter::{
DensePrimaryKeyCodec, PrimaryKeyFilter, SortField, SparseOffsetsCache, SparsePrimaryKeyCodec,
};
-/// Returns true if this is a partition column for metrics in the memtable.
-pub fn is_partition_column(name: &str) -> bool {
- name == DATA_SCHEMA_TABLE_ID_COLUMN_NAME
-}
-
#[derive(Clone)]
struct PrimaryKeyFilterInner {
filters: Arc>,
@@ -41,12 +35,8 @@ struct PrimaryKeyFilterInner {
}
impl PrimaryKeyFilterInner {
- fn new(
- metadata: RegionMetadataRef,
- filters: Arc>,
- skip_partition_column: bool,
- ) -> Self {
- let compiled_filters = Self::compile_filters(&metadata, &filters, skip_partition_column);
+ fn new(metadata: RegionMetadataRef, filters: Arc>) -> Self {
+ let compiled_filters = Self::compile_filters(&metadata, &filters);
Self {
filters,
compiled_filters,
@@ -56,7 +46,6 @@ impl PrimaryKeyFilterInner {
fn compile_filters(
metadata: &RegionMetadataRef,
filters: &[SimpleFilterEvaluator],
- skip_partition_column: bool,
) -> Vec {
if filters.is_empty() || metadata.primary_key.is_empty() {
return Vec::new();
@@ -64,10 +53,6 @@ impl PrimaryKeyFilterInner {
let mut compiled_filters = Vec::with_capacity(filters.len());
for (filter_idx, filter) in filters.iter().enumerate() {
- if skip_partition_column && is_partition_column(filter.column_name()) {
- continue;
- }
-
let Some(column) = metadata.column_by_name(filter.column_name()) else {
continue;
};
@@ -256,10 +241,9 @@ impl DensePrimaryKeyFilter {
metadata: RegionMetadataRef,
filters: Arc>,
codec: DensePrimaryKeyCodec,
- skip_partition_column: bool,
) -> Self {
Self {
- inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column),
+ inner: PrimaryKeyFilterInner::new(metadata, filters),
codec,
offsets_buf: Vec::new(),
}
@@ -310,10 +294,9 @@ impl SparsePrimaryKeyFilter {
metadata: RegionMetadataRef,
filters: Arc>,
codec: SparsePrimaryKeyCodec,
- skip_partition_column: bool,
) -> Self {
Self {
- inner: PrimaryKeyFilterInner::new(metadata, filters, skip_partition_column),
+ inner: PrimaryKeyFilterInner::new(metadata, filters),
codec,
offsets_cache: SparseOffsetsCache::new(),
}
@@ -513,7 +496,7 @@ mod tests {
)]);
let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
let codec = SparsePrimaryKeyCodec::new(&metadata);
- let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
+ let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
assert!(filter.matches(&pk).unwrap());
}
@@ -526,7 +509,7 @@ mod tests {
)]);
let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
let codec = SparsePrimaryKeyCodec::new(&metadata);
- let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
+ let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
assert!(!filter.matches(&pk).unwrap());
}
@@ -539,7 +522,7 @@ mod tests {
)]);
let pk = encode_sparse_pk(&metadata, 1, 0, create_test_row());
let codec = SparsePrimaryKeyCodec::new(&metadata);
- let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec, false);
+ let mut filter = SparsePrimaryKeyFilter::new(metadata, filters, codec);
assert!(filter.matches(&pk).unwrap());
}
@@ -552,7 +535,7 @@ mod tests {
)]);
let pk = encode_dense_pk(&metadata, create_test_row());
let codec = DensePrimaryKeyCodec::new(&metadata);
- let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
+ let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
assert!(filter.matches(&pk).unwrap());
}
@@ -565,7 +548,7 @@ mod tests {
)]);
let pk = encode_dense_pk(&metadata, create_test_row());
let codec = DensePrimaryKeyCodec::new(&metadata);
- let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
+ let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
assert!(!filter.matches(&pk).unwrap());
}
@@ -578,7 +561,7 @@ mod tests {
)]);
let pk = encode_dense_pk(&metadata, create_test_row());
let codec = DensePrimaryKeyCodec::new(&metadata);
- let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
+ let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
assert!(filter.matches(&pk).unwrap());
}
@@ -597,8 +580,7 @@ mod tests {
for (op, value, expected) in cases {
let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
- let mut filter =
- DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone(), false);
+ let mut filter = DensePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
assert_eq!(expected, filter.matches(&pk).unwrap());
}
}
@@ -618,8 +600,7 @@ mod tests {
for (op, value, expected) in cases {
let filters = Arc::new(vec![create_filter_with_op("pod", op, value)]);
- let mut filter =
- SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone(), false);
+ let mut filter = SparsePrimaryKeyFilter::new(metadata.clone(), filters, codec.clone());
assert_eq!(expected, filter.matches(&pk).unwrap());
}
}
@@ -652,7 +633,7 @@ mod tests {
.unwrap();
let filters = Arc::new(vec![create_filter_with_op("f", Operator::Eq, 0.0_f64)]);
- let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
+ let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
assert!(filter.matches(&pk).unwrap());
}
@@ -674,29 +655,7 @@ mod tests {
Operator::Eq,
42_u32,
)]);
- let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, false);
-
- assert!(filter.matches(&pk).unwrap());
- }
-
- #[test]
- fn test_dense_primary_key_filter_can_skip_partition_column() {
- let metadata = setup_partitioned_metadata();
- let codec = DensePrimaryKeyCodec::new(&metadata);
- let mut pk = Vec::new();
- codec
- .encode_to_vec(
- [ValueRef::UInt32(42), ValueRef::String("host-a")].into_iter(),
- &mut pk,
- )
- .unwrap();
-
- let filters = Arc::new(vec![create_filter_with_op(
- DATA_SCHEMA_TABLE_ID_COLUMN_NAME,
- Operator::Eq,
- 7_u32,
- )]);
- let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec, true);
+ let mut filter = DensePrimaryKeyFilter::new(metadata, filters, codec);
assert!(filter.matches(&pk).unwrap());
}
diff --git a/src/mito-codec/src/row_converter.rs b/src/mito-codec/src/row_converter.rs
index 0a3205ce9e..fae2997182 100644
--- a/src/mito-codec/src/row_converter.rs
+++ b/src/mito-codec/src/row_converter.rs
@@ -124,7 +124,6 @@ pub trait PrimaryKeyCodec: Send + Sync + Debug {
&self,
metadata: &RegionMetadataRef,
filters: Arc>,
- skip_partition_column: bool,
) -> Box;
/// Returns the estimated size of the primary key.
diff --git a/src/mito-codec/src/row_converter/dense.rs b/src/mito-codec/src/row_converter/dense.rs
index 4bc774c941..6cc70feaea 100644
--- a/src/mito-codec/src/row_converter/dense.rs
+++ b/src/mito-codec/src/row_converter/dense.rs
@@ -556,13 +556,11 @@ impl PrimaryKeyCodec for DensePrimaryKeyCodec {
&self,
metadata: &RegionMetadataRef,
filters: Arc>,
- skip_partition_column: bool,
) -> Box {
Box::new(DensePrimaryKeyFilter::new(
metadata.clone(),
filters,
self.clone(),
- skip_partition_column,
))
}
diff --git a/src/mito-codec/src/row_converter/sparse.rs b/src/mito-codec/src/row_converter/sparse.rs
index 9c0e488576..37e1813470 100644
--- a/src/mito-codec/src/row_converter/sparse.rs
+++ b/src/mito-codec/src/row_converter/sparse.rs
@@ -507,13 +507,11 @@ impl PrimaryKeyCodec for SparsePrimaryKeyCodec {
&self,
metadata: &RegionMetadataRef,
filters: Arc>,
- skip_partition_column: bool,
) -> Box {
Box::new(SparsePrimaryKeyFilter::new(
metadata.clone(),
filters,
self.clone(),
- skip_partition_column,
))
}
diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs
index 8336625e3c..23ba1411f5 100644
--- a/src/mito2/benches/memtable_bench.rs
+++ b/src/mito2/benches/memtable_bench.rs
@@ -28,7 +28,6 @@ use mito2::memtable::bulk::context::BulkIterContext;
use mito2::memtable::bulk::part::BulkPartConverter;
use mito2::memtable::bulk::part_reader::BulkPartBatchIter;
use mito2::memtable::bulk::{BulkMemtable, BulkMemtableConfig};
-use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable};
use mito2::memtable::time_series::TimeSeriesMemtable;
use mito2::memtable::{IterBuilder, Memtable, RangesOptions};
use mito2::read::flat_merge::FlatMergeIterator;
@@ -45,21 +44,6 @@ fn write_rows(c: &mut Criterion) {
// Note that this test only generate one time series.
let mut group = c.benchmark_group("write");
- group.bench_function("partition_tree", |b| {
- let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
- let memtable = PartitionTreeMemtable::new(
- 1,
- codec,
- metadata.clone(),
- None,
- &PartitionTreeConfig::default(),
- );
- let kvs =
- memtable_util::build_key_values(&metadata, "hello".to_string(), 42, ×tamps, 1);
- b.iter(|| {
- memtable.write(&kvs).unwrap();
- });
- });
group.bench_function("time_series", |b| {
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true, MergeMode::LastRow);
let kvs =
@@ -73,26 +57,11 @@ fn write_rows(c: &mut Criterion) {
/// Scans all rows.
fn full_scan(c: &mut Criterion) {
let metadata = Arc::new(cpu_metadata());
- let config = PartitionTreeConfig::default();
let start_sec = 1710043200;
let generator = CpuDataGenerator::new(metadata.clone(), 4000, start_sec, start_sec + 3600 * 2);
let mut group = c.benchmark_group("full_scan");
group.sample_size(10);
- group.bench_function("partition_tree", |b| {
- let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
- let memtable = PartitionTreeMemtable::new(1, codec, metadata.clone(), None, &config);
- for kvs in generator.iter() {
- memtable.write(&kvs).unwrap();
- }
-
- b.iter(|| {
- let iter = memtable.iter(None, None, None).unwrap();
- for batch in iter {
- let _batch = batch.unwrap();
- }
- });
- });
group.bench_function("time_series", |b| {
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true, MergeMode::LastRow);
for kvs in generator.iter() {
@@ -115,27 +84,11 @@ fn full_scan(c: &mut Criterion) {
/// Filters 1 host.
fn filter_1_host(c: &mut Criterion) {
let metadata = Arc::new(cpu_metadata());
- let config = PartitionTreeConfig::default();
let start_sec = 1710043200;
let generator = CpuDataGenerator::new(metadata.clone(), 4000, start_sec, start_sec + 3600 * 2);
let mut group = c.benchmark_group("filter_1_host");
group.sample_size(10);
- group.bench_function("partition_tree", |b| {
- let codec = Arc::new(DensePrimaryKeyCodec::new(&metadata));
- let memtable = PartitionTreeMemtable::new(1, codec, metadata.clone(), None, &config);
- for kvs in generator.iter() {
- memtable.write(&kvs).unwrap();
- }
- let predicate = generator.random_host_filter();
-
- b.iter(|| {
- let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap();
- for batch in iter {
- let _batch = batch.unwrap();
- }
- });
- });
group.bench_function("time_series", |b| {
let memtable = TimeSeriesMemtable::new(metadata.clone(), 1, None, true, MergeMode::LastRow);
for kvs in generator.iter() {
diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs
index 1d1151177d..4b8ef71a56 100644
--- a/src/mito2/src/compaction.rs
+++ b/src/mito2/src/compaction.rs
@@ -42,7 +42,7 @@ use datafusion_expr::Expr;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use store_api::metadata::RegionMetadataRef;
-use store_api::storage::{RegionId, TableId};
+use store_api::storage::RegionId;
use task::MAX_PARALLEL_COMPACTION;
use tokio::sync::mpsc::{self, Sender};
@@ -452,7 +452,7 @@ impl CompactionScheduler {
) -> Result