feat: support changing table's append_mode to true (#7669)

* feat: support alter append_mode to true

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add sqlness test

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fix compiler errors

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: clear merge mode in mito when setting append mode

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: sanitize open request and options with both append/merge mode

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: clear merge mode when append mode is true

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2026-02-25 12:11:23 +08:00
committed by GitHub
parent ffcd41adf8
commit 42ad842434
15 changed files with 890 additions and 11 deletions

View File

@@ -39,6 +39,7 @@ use crate::engine::MitoEngine;
use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener};
use crate::error;
use crate::sst::FormatType;
use crate::test_util::batch_util::sort_batches_and_print;
use crate::test_util::{
CreateRequestBuilder, TestEnv, build_rows, build_rows_for_key, flush_region, put_rows,
rows_schema,
@@ -1229,3 +1230,283 @@ async fn test_alter_region_sst_format_without_flush() {
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_alter_region_append_mode_with_flush() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
// Create a region with append_mode=false (default)
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
let table_dir = request.table_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let check_append_mode = |engine: &MitoEngine, expected: bool| {
let append_mode = engine
.get_region(region_id)
.unwrap()
.version()
.options
.append_mode;
assert_eq!(append_mode, expected);
};
check_append_mode(&engine, false);
// Inserts some data before alter (memtable not empty, alter will trigger flush)
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Alters append_mode from false to true (this triggers internal flush)
let alter_request = RegionAlterRequest {
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::AppendMode(true)],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(alter_request))
.await
.unwrap();
check_append_mode(&engine, true);
// Inserts duplicate data after alter (same as rows 0, 1, 2)
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Flushes again
flush_region(&engine, region_id, None).await;
// After append_mode=true, duplicates should be preserved
let expected_all_data = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
expected_all_data,
sort_batches_and_print(&batches, &["tag_0", "ts"])
);
// Reopens region to verify append_mode persists
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
table_dir,
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
check_append_mode(&engine, true);
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
expected_all_data,
sort_batches_and_print(&batches, &["tag_0", "ts"])
);
}
#[tokio::test]
async fn test_alter_region_append_mode_without_flush() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
// Create a region with append_mode=false (default)
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
let table_dir = request.table_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let check_append_mode = |engine: &MitoEngine, expected: bool| {
let append_mode = engine
.get_region(region_id)
.unwrap()
.version()
.options
.append_mode;
assert_eq!(append_mode, expected);
};
check_append_mode(&engine, false);
// Alters append_mode from false to true immediately (no data, no flush needed)
let alter_request = RegionAlterRequest {
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::AppendMode(true)],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(alter_request))
.await
.unwrap();
check_append_mode(&engine, true);
// Inserts duplicate data
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Insert same data again
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Flushes
flush_region(&engine, region_id, None).await;
// Duplicates should be preserved
let expected_all_data = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
expected_all_data,
sort_batches_and_print(&batches, &["tag_0", "ts"])
);
// Reopens region to verify append_mode persists
let engine = env.reopen_engine(engine, MitoConfig::default()).await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
table_dir,
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
check_append_mode(&engine, true);
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(
expected_all_data,
sort_batches_and_print(&batches, &["tag_0", "ts"])
);
}
#[tokio::test]
async fn test_alter_region_append_mode_invalid() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
// Create a region with append_mode=true
let request = CreateRequestBuilder::new()
.insert_option("append_mode", "true")
.build();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let check_append_mode = |engine: &MitoEngine, expected: bool| {
let append_mode = engine
.get_region(region_id)
.unwrap()
.version()
.options
.append_mode;
assert_eq!(append_mode, expected);
};
check_append_mode(&engine, true);
// Try to alter append_mode from true to false (should fail)
let alter_request = RegionAlterRequest {
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::AppendMode(false)],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(alter_request))
.await
.unwrap_err();
// append_mode should still be true
check_append_mode(&engine, true);
}

View File

@@ -14,10 +14,15 @@
//! Tests for append mode.
use std::collections::HashMap;
use api::v1::Rows;
use common_recordbatch::RecordBatches;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{RegionCompactRequest, RegionRequest};
use store_api::region_request::{
AlterKind, PathType, RegionAlterRequest, RegionCompactRequest, RegionOpenRequest,
RegionRequest, SetRegionOption,
};
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
@@ -221,6 +226,146 @@ async fn test_append_mode_compaction_with_format(flat_format: bool) {
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
}
#[tokio::test]
async fn test_alter_append_mode_clears_merge_mode() {
test_alter_append_mode_clears_merge_mode_with_format(false).await;
test_alter_append_mode_clears_merge_mode_with_format(true).await;
}
async fn test_alter_append_mode_clears_merge_mode_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
// Create a region with merge_mode=last_non_null.
let request = CreateRequestBuilder::new()
.insert_option("merge_mode", "last_non_null")
.build();
let column_schemas = rows_schema(&request);
let table_dir = request.table_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Verify initial options.
let options = &engine.get_region(region_id).unwrap().version().options;
assert!(!options.append_mode);
assert!(options.merge_mode.is_some());
// Insert some data.
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Alter append_mode to true (triggers flush since memtable is not empty).
let alter_request = RegionAlterRequest {
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::AppendMode(true)],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(alter_request))
.await
.unwrap();
// Verify append_mode is true and merge_mode is cleared.
let options = &engine.get_region(region_id).unwrap().version().options;
assert!(options.append_mode);
assert!(options.merge_mode.is_none());
// Insert duplicate data (should be preserved in append mode).
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Flush to persist.
flush_region(&engine, region_id, None).await;
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
let stream = engine
.scan_to_stream(region_id, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
// Reopen engine and region to verify persistence.
let engine = env
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: flat_format,
..Default::default()
},
)
.await;
let mut options = HashMap::default();
options.insert("append_mode".to_string(), "true".to_string());
options.insert("merge_mode".to_string(), "last_non_null".to_string());
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
table_dir,
path_type: PathType::Bare,
options,
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
// Verify options persist after reopen.
let options = &engine.get_region(region_id).unwrap().version().options;
assert!(options.append_mode);
assert!(options.merge_mode.is_none());
// Verify data persists (duplicates preserved).
let stream = engine
.scan_to_stream(region_id, ScanRequest::default())
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, sort_batches_and_print(&batches, &["tag_0", "ts"]));
}
#[tokio::test]
async fn test_put_single_range() {
test_put_single_range_with_format(false).await;

View File

@@ -543,6 +543,7 @@ async fn test_apply_staging_manifest_change_edit_different_columns_fails_with_fo
RegionMetaAction::Change(RegionChange {
metadata: Arc::new(changed_metadata),
sst_format: FormatType::PrimaryKey,
append_mode: None,
}),
RegionMetaAction::Edit(RegionEdit {
files_to_add: Vec::new(),

View File

@@ -925,6 +925,7 @@ async fn test_staging_exit_conflict_partition_expr_change_and_change_with_format
RegionMetaAction::Change(RegionChange {
metadata: Arc::new(changed_metadata),
sst_format: FormatType::PrimaryKey,
append_mode: None,
}),
RegionMetaAction::Edit(RegionEdit {
files_to_add: Vec::new(),

View File

@@ -78,6 +78,9 @@ pub struct RegionChange {
/// Format of the SST.
#[serde(default)]
pub sst_format: FormatType,
/// Whether the region is in append mode.
#[serde(default)]
pub append_mode: Option<bool>,
}
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
@@ -160,6 +163,9 @@ pub struct RegionManifest {
/// Format of the SST file.
#[serde(default)]
pub sst_format: FormatType,
/// Whether the region is in append mode.
#[serde(default)]
pub append_mode: Option<bool>,
}
#[cfg(test)]
@@ -188,6 +194,7 @@ pub struct RegionManifestBuilder {
compaction_time_window: Option<Duration>,
committed_sequence: Option<SequenceNumber>,
sst_format: FormatType,
append_mode: Option<bool>,
}
impl RegionManifestBuilder {
@@ -205,6 +212,7 @@ impl RegionManifestBuilder {
compaction_time_window: s.compaction_time_window,
committed_sequence: s.committed_sequence,
sst_format: s.sst_format,
append_mode: s.append_mode,
}
} else {
Default::default()
@@ -215,6 +223,8 @@ impl RegionManifestBuilder {
self.metadata = Some(change.metadata);
self.manifest_version = manifest_version;
self.sst_format = change.sst_format;
// Only update append_mode if the change specifies a value.
self.append_mode = change.append_mode.or(self.append_mode);
}
/// Applies a partition-expression-only metadata change.
@@ -344,6 +354,7 @@ impl RegionManifestBuilder {
truncated_entry_id: self.truncated_entry_id,
compaction_time_window: self.compaction_time_window,
sst_format: self.sst_format,
append_mode: self.append_mode,
})
}
}
@@ -896,6 +907,7 @@ mod tests {
}],
},
sst_format: FormatType::PrimaryKey,
append_mode: None,
};
let json = serde_json::to_string(&manifest).unwrap();
@@ -998,6 +1010,7 @@ mod tests {
truncated_entry_id: None,
compaction_time_window: None,
sst_format: FormatType::PrimaryKey,
append_mode: None,
}
);
@@ -1012,6 +1025,7 @@ mod tests {
truncated_entry_id: None,
compaction_time_window: None,
sst_format: FormatType::PrimaryKey,
append_mode: None,
};
let json = serde_json::to_string(&new_manifest).unwrap();
let old_from_new: RegionManifestV1 = serde_json::from_str(&json).unwrap();
@@ -1111,6 +1125,7 @@ mod tests {
let region_change = RegionChange {
metadata: region_change.metadata.clone(),
sst_format: FormatType::Flat,
append_mode: None,
};
let serialized = serde_json::to_string(&region_change).unwrap();

View File

@@ -194,6 +194,7 @@ impl RegionManifestManager {
RegionChange {
metadata: metadata.clone(),
sst_format,
append_mode: None,
},
);
let manifest = manifest_builder.try_build()?;
@@ -207,6 +208,7 @@ impl RegionManifestManager {
let mut actions = vec![RegionMetaAction::Change(RegionChange {
metadata,
sst_format,
append_mode: None,
})];
if flushed_entry_id > 0 {
actions.push(RegionMetaAction::Edit(RegionEdit {
@@ -893,6 +895,7 @@ mod test {
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
sst_format: FormatType::PrimaryKey,
append_mode: None,
}));
let current_version = manager.update(action_list, false).await.unwrap();
@@ -956,6 +959,7 @@ mod test {
RegionMetaActionList::with_action(RegionMetaAction::Change(RegionChange {
metadata: new_metadata.clone(),
sst_format: FormatType::PrimaryKey,
append_mode: None,
}));
let current_version = manager.update(action_list, false).await.unwrap();
@@ -1005,6 +1009,6 @@ mod test {
// get manifest size again
let manifest_size = manager.manifest_usage();
assert_eq!(manifest_size, 1378);
assert_eq!(manifest_size, 1397);
}
}

View File

@@ -492,6 +492,7 @@ impl MitoRegion {
let action = RegionMetaAction::Change(RegionChange {
metadata: current_meta.clone(),
sst_format: current_version.options.sst_format.unwrap_or_default(),
append_mode: None,
});
let result = manager
.update(RegionMetaActionList::with_action(action), false)
@@ -1518,6 +1519,7 @@ mod tests {
RegionMetaAction::Change(RegionChange {
metadata: Arc::new(changed_metadata),
sst_format: FormatType::PrimaryKey,
append_mode: None,
}),
RegionMetaAction::Edit(empty_edit()),
]),
@@ -1556,6 +1558,7 @@ mod tests {
RegionMetaAction::Change(RegionChange {
metadata: Arc::new(changed_metadata),
sst_format: FormatType::PrimaryKey,
append_mode: None,
}),
RegionMetaAction::Edit(empty_edit()),
]),
@@ -1588,6 +1591,7 @@ mod tests {
RegionMetaAction::Change(RegionChange {
metadata: Arc::new(changed_metadata),
sst_format: FormatType::PrimaryKey,
append_mode: None,
}),
RegionMetaAction::Edit(empty_edit()),
]),

View File

@@ -619,6 +619,36 @@ pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut R
);
options.sst_format = Some(manifest.sst_format);
}
if let Some(manifest_append_mode) = manifest.append_mode
&& options.append_mode != manifest_append_mode
{
common_telemetry::warn!(
"Overriding append_mode from {} to {} for region {}",
options.append_mode,
manifest_append_mode,
manifest.metadata.region_id,
);
options.append_mode = manifest_append_mode;
}
if options.append_mode && options.merge_mode.take().is_some() {
common_telemetry::warn!(
"Ignoring merge_mode because append_mode is enabled for region {}",
manifest.metadata.region_id,
);
}
}
/// Sanitizes open request options before parsing.
pub(crate) fn sanitize_open_request_options(options: &mut HashMap<String, String>) {
let append_mode_enabled = options
.get("append_mode")
.is_some_and(|v| matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "1"));
if append_mode_enabled && options.remove("merge_mode").is_some() {
common_telemetry::warn!(
"Ignoring merge_mode in open request options because append_mode is enabled"
);
}
}
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.

View File

@@ -115,6 +115,7 @@ impl RemapManifest {
truncated_entry_id: None,
compaction_time_window: None,
sst_format,
append_mode: None,
};
new_manifests.insert(*region_id, manifest);
@@ -464,6 +465,7 @@ mod tests {
compaction_time_window: None,
committed_sequence: None,
sst_format: FormatType::PrimaryKey,
append_mode: None,
}
}

View File

@@ -161,13 +161,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
};
// Persist the metadata to region's manifest.
let options = new_options.as_ref().unwrap_or(&version.options);
let change = RegionChange {
metadata: new_meta,
sst_format: new_options
.as_ref()
.unwrap_or(&version.options)
.sst_format
.unwrap_or_default(),
sst_format: options.sst_format.unwrap_or_default(),
append_mode: Some(options.append_mode),
};
self.handle_manifest_region_change(region, change, need_index, new_options, sender);
}
@@ -229,6 +227,22 @@ impl<S: LogStore> RegionWorkerLoop<S> {
);
}
}
SetRegionOption::AppendMode(new_append_mode) => {
// If the append mode is unchanged, we consider the option is altered.
if new_append_mode != current_options.append_mode {
// Validates: only allow changing from false to true.
ensure!(
!current_options.append_mode && new_append_mode,
store_api::metadata::InvalidRegionRequestSnafu {
region_id: region.region_id,
err: "Only allow changing append_mode from false to true",
}
);
// Clear merge_mode since it's incompatible with append_mode.
current_options.merge_mode = None;
all_options_altered = false;
}
}
}
}
region.version_control.alter_options(current_options);
@@ -264,6 +278,13 @@ fn new_region_options_on_empty_memtable(
current_options.sst_format = Some(new_format);
}
SetRegionOption::AppendMode(new_append_mode) => {
// Safety: handle_alter_region_options_fast() has validated this.
assert!(*new_append_mode && !current_options.append_mode);
current_options.append_mode = true;
current_options.merge_mode = None;
}
}
}
Some(current_options)

View File

@@ -28,7 +28,7 @@ use table::requests::STORAGE_KEY;
use crate::error::{
ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
};
use crate::region::opener::RegionOpener;
use crate::region::opener::{RegionOpener, sanitize_open_request_options};
use crate::request::OptionOutputTx;
use crate::sst::location::region_dir_from_table_dir;
use crate::wal::entry_distributor::WalEntryReceiver;
@@ -73,7 +73,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
pub(crate) async fn handle_open_request(
&mut self,
region_id: RegionId,
request: RegionOpenRequest,
mut request: RegionOpenRequest,
wal_entry_receiver: Option<WalEntryReceiver>,
sender: OptionOutputTx,
) {
@@ -92,6 +92,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
info!("Try to open region {}, worker: {}", region_id, self.id);
sanitize_open_request_options(&mut request.options);
// Open region from specific region dir.
let opener = match RegionOpener::new(

View File

@@ -52,7 +52,8 @@ use crate::metadata::{
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::metrics;
use crate::mito_engine_options::{
SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
APPEND_MODE_KEY, SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
TWCS_TRIGGER_FILE_NUM,
};
use crate::path_utils::table_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};
@@ -1316,6 +1317,8 @@ pub enum SetRegionOption {
Twsc(String, String),
// Modifying the SST format.
Format(String),
// Modifying the append mode.
AppendMode(bool),
}
impl TryFrom<&PbOption> for SetRegionOption {
@@ -1334,6 +1337,12 @@ impl TryFrom<&PbOption> for SetRegionOption {
Ok(Self::Twsc(key.clone(), value.clone()))
}
SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
APPEND_MODE_KEY => {
let append_mode = value
.parse::<bool>()
.map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
Ok(Self::AppendMode(append_mode))
}
_ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
}
}

View File

@@ -28,7 +28,9 @@ use derive_builder::Builder;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY};
use store_api::mito_engine_options::{
APPEND_MODE_KEY, COMPACTION_TYPE, COMPACTION_TYPE_TWCS, MERGE_MODE_KEY, SST_FORMAT_KEY,
};
use store_api::region_request::{SetRegionOption, UnsetRegionOption};
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId};
@@ -364,6 +366,14 @@ impl TableMeta {
.extra_options
.insert(SST_FORMAT_KEY.to_string(), value.clone());
}
SetRegionOption::AppendMode(value) => {
new_options
.extra_options
.insert(APPEND_MODE_KEY.to_string(), value.to_string());
if *value {
new_options.extra_options.remove(MERGE_MODE_KEY);
}
}
}
}
let mut builder = self.new_meta_builder();
@@ -1503,6 +1513,85 @@ mod tests {
assert_eq!(&[1, 2, 4], &new_meta.value_indices[..]);
}
#[test]
fn test_set_append_mode_true_clears_merge_mode_option() {
let schema = Arc::new(new_test_schema());
let mut table_options = TableOptions::default();
table_options
.extra_options
.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
let meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.options(table_options)
.build()
.unwrap();
let alter_kind = AlterKind::SetTableOptions {
options: vec![SetRegionOption::AppendMode(true)],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();
assert_eq!(
Some("true"),
new_meta
.options
.extra_options
.get(APPEND_MODE_KEY)
.map(String::as_str)
);
assert!(!new_meta.options.extra_options.contains_key(MERGE_MODE_KEY));
}
#[test]
fn test_set_append_mode_false_keeps_merge_mode_option() {
let schema = Arc::new(new_test_schema());
let mut table_options = TableOptions::default();
table_options
.extra_options
.insert(MERGE_MODE_KEY.to_string(), "last_non_null".to_string());
let meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices(vec![0])
.engine("engine")
.next_column_id(3)
.options(table_options)
.build()
.unwrap();
let alter_kind = AlterKind::SetTableOptions {
options: vec![SetRegionOption::AppendMode(false)],
};
let new_meta = meta
.builder_with_alter_kind("my_table", &alter_kind)
.unwrap()
.build()
.unwrap();
assert_eq!(
Some("false"),
new_meta
.options
.extra_options
.get(APPEND_MODE_KEY)
.map(String::as_str)
);
assert_eq!(
Some("last_non_null"),
new_meta
.options
.extra_options
.get(MERGE_MODE_KEY)
.map(String::as_str)
);
}
#[test]
fn test_add_columns_multiple_times() {
let schema = Arc::new(new_test_schema());

View File

@@ -0,0 +1,192 @@
-- Test altering append_mode from false to true
-- Create a table with append_mode=false (default)
CREATE TABLE test_alter_append_mode(
host STRING,
ts TIMESTAMP TIME INDEX,
cpu DOUBLE,
PRIMARY KEY(host)
) ENGINE=mito;
Affected Rows: 0
-- Insert some data
INSERT INTO test_alter_append_mode VALUES ('host1', 0, 1.0), ('host2', 1, 2.0);
Affected Rows: 2
-- Insert duplicate data (should be deduplicated since append_mode=false)
INSERT INTO test_alter_append_mode VALUES ('host1', 0, 10.0), ('host2', 1, 20.0);
Affected Rows: 2
-- Query should show deduplicated data (latest values)
SELECT * FROM test_alter_append_mode ORDER BY host, ts;
+-------+-------------------------+------+
| host | ts | cpu |
+-------+-------------------------+------+
| host1 | 1970-01-01T00:00:00 | 10.0 |
| host2 | 1970-01-01T00:00:00.001 | 20.0 |
+-------+-------------------------+------+
-- Alter append_mode from false to true
ALTER TABLE test_alter_append_mode SET 'append_mode' = 'true';
Affected Rows: 0
-- Verify append_mode is set via SHOW CREATE TABLE
SHOW CREATE TABLE test_alter_append_mode;
+------------------------+-------------------------------------------------------+
| Table | Create Table |
+------------------------+-------------------------------------------------------+
| test_alter_append_mode | CREATE TABLE IF NOT EXISTS "test_alter_append_mode" ( |
| | "host" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "cpu" DOUBLE NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | append_mode = 'true' |
| | ) |
+------------------------+-------------------------------------------------------+
-- Insert duplicate data again (should be preserved since append_mode=true now)
INSERT INTO test_alter_append_mode VALUES ('host1', 0, 100.0), ('host2', 1, 200.0);
Affected Rows: 2
-- Query should show the new duplicates preserved
SELECT * FROM test_alter_append_mode ORDER BY host, ts, cpu;
+-------+-------------------------+-------+
| host | ts | cpu |
+-------+-------------------------+-------+
| host1 | 1970-01-01T00:00:00 | 10.0 |
| host1 | 1970-01-01T00:00:00 | 100.0 |
| host2 | 1970-01-01T00:00:00.001 | 20.0 |
| host2 | 1970-01-01T00:00:00.001 | 200.0 |
+-------+-------------------------+-------+
-- Try to alter append_mode from true to false (should fail)
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
ALTER TABLE test_alter_append_mode SET 'append_mode' = 'false';
Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing append_mode from false to true
-- Clean up
DROP TABLE test_alter_append_mode;
Affected Rows: 0
-- Test creating with append_mode=true and trying to alter to false
CREATE TABLE test_append_mode_true(
host STRING,
ts TIMESTAMP TIME INDEX,
cpu DOUBLE,
PRIMARY KEY(host)
) ENGINE=mito WITH('append_mode'='true');
Affected Rows: 0
-- Try to alter append_mode from true to false (should fail)
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
ALTER TABLE test_append_mode_true SET 'append_mode' = 'false';
Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing append_mode from false to true
-- Altering to the same value should succeed
ALTER TABLE test_append_mode_true SET 'append_mode' = 'true';
Affected Rows: 0
-- Clean up
DROP TABLE test_append_mode_true;
Affected Rows: 0
-- Test altering append_mode on a table with merge_mode set
CREATE TABLE test_alter_append_with_merge(
host STRING,
ts TIMESTAMP TIME INDEX,
cpu DOUBLE,
PRIMARY KEY(host)
) ENGINE=mito WITH('merge_mode'='last_non_null');
Affected Rows: 0
-- Verify merge_mode is set
SHOW CREATE TABLE test_alter_append_with_merge;
+------------------------------+-------------------------------------------------------------+
| Table | Create Table |
+------------------------------+-------------------------------------------------------------+
| test_alter_append_with_merge | CREATE TABLE IF NOT EXISTS "test_alter_append_with_merge" ( |
| | "host" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "cpu" DOUBLE NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | merge_mode = 'last_non_null' |
| | ) |
+------------------------------+-------------------------------------------------------------+
-- Insert some data
INSERT INTO test_alter_append_with_merge VALUES ('host1', 0, 1.0), ('host2', 1, 2.0);
Affected Rows: 2
-- Alter append_mode to true (should succeed and clear merge_mode)
ALTER TABLE test_alter_append_with_merge SET 'append_mode' = 'true';
Affected Rows: 0
-- Verify merge_mode is cleared and append_mode is set
SHOW CREATE TABLE test_alter_append_with_merge;
+------------------------------+-------------------------------------------------------------+
| Table | Create Table |
+------------------------------+-------------------------------------------------------------+
| test_alter_append_with_merge | CREATE TABLE IF NOT EXISTS "test_alter_append_with_merge" ( |
| | "host" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "cpu" DOUBLE NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | |
| | ENGINE=mito |
| | WITH( |
| | append_mode = 'true' |
| | ) |
+------------------------------+-------------------------------------------------------------+
-- Insert duplicate data (should be preserved since append_mode=true now)
INSERT INTO test_alter_append_with_merge VALUES ('host1', 0, 100.0), ('host2', 1, 200.0);
Affected Rows: 2
-- Query should show the new duplicates preserved
SELECT * FROM test_alter_append_with_merge ORDER BY host, ts, cpu;
+-------+-------------------------+-------+
| host | ts | cpu |
+-------+-------------------------+-------+
| host1 | 1970-01-01T00:00:00 | 1.0 |
| host1 | 1970-01-01T00:00:00 | 100.0 |
| host2 | 1970-01-01T00:00:00.001 | 2.0 |
| host2 | 1970-01-01T00:00:00.001 | 200.0 |
+-------+-------------------------+-------+
-- Clean up
DROP TABLE test_alter_append_with_merge;
Affected Rows: 0

View File

@@ -0,0 +1,84 @@
-- Test altering append_mode from false to true
-- Create a table with append_mode=false (default)
CREATE TABLE test_alter_append_mode(
host STRING,
ts TIMESTAMP TIME INDEX,
cpu DOUBLE,
PRIMARY KEY(host)
) ENGINE=mito;
-- Insert some data
INSERT INTO test_alter_append_mode VALUES ('host1', 0, 1.0), ('host2', 1, 2.0);
-- Insert duplicate data (should be deduplicated since append_mode=false)
INSERT INTO test_alter_append_mode VALUES ('host1', 0, 10.0), ('host2', 1, 20.0);
-- Query should show deduplicated data (latest values)
SELECT * FROM test_alter_append_mode ORDER BY host, ts;
-- Alter append_mode from false to true
ALTER TABLE test_alter_append_mode SET 'append_mode' = 'true';
-- Verify append_mode is set via SHOW CREATE TABLE
SHOW CREATE TABLE test_alter_append_mode;
-- Insert duplicate data again (should be preserved since append_mode=true now)
INSERT INTO test_alter_append_mode VALUES ('host1', 0, 100.0), ('host2', 1, 200.0);
-- Query should show the new duplicates preserved
SELECT * FROM test_alter_append_mode ORDER BY host, ts, cpu;
-- Try to alter append_mode from true to false (should fail)
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
ALTER TABLE test_alter_append_mode SET 'append_mode' = 'false';
-- Clean up
DROP TABLE test_alter_append_mode;
-- Test creating with append_mode=true and trying to alter to false
CREATE TABLE test_append_mode_true(
host STRING,
ts TIMESTAMP TIME INDEX,
cpu DOUBLE,
PRIMARY KEY(host)
) ENGINE=mito WITH('append_mode'='true');
-- Try to alter append_mode from true to false (should fail)
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
ALTER TABLE test_append_mode_true SET 'append_mode' = 'false';
-- Altering to the same value should succeed
ALTER TABLE test_append_mode_true SET 'append_mode' = 'true';
-- Clean up
DROP TABLE test_append_mode_true;
-- Test altering append_mode on a table with merge_mode set
CREATE TABLE test_alter_append_with_merge(
host STRING,
ts TIMESTAMP TIME INDEX,
cpu DOUBLE,
PRIMARY KEY(host)
) ENGINE=mito WITH('merge_mode'='last_non_null');
-- Verify merge_mode is set
SHOW CREATE TABLE test_alter_append_with_merge;
-- Insert some data
INSERT INTO test_alter_append_with_merge VALUES ('host1', 0, 1.0), ('host2', 1, 2.0);
-- Alter append_mode to true (should succeed and clear merge_mode)
ALTER TABLE test_alter_append_with_merge SET 'append_mode' = 'true';
-- Verify merge_mode is cleared and append_mode is set
SHOW CREATE TABLE test_alter_append_with_merge;
-- Insert duplicate data (should be preserved since append_mode=true now)
INSERT INTO test_alter_append_with_merge VALUES ('host1', 0, 100.0), ('host2', 1, 200.0);
-- Query should show the new duplicates preserved
SELECT * FROM test_alter_append_with_merge ORDER BY host, ts, cpu;
-- Clean up
DROP TABLE test_alter_append_with_merge;