From bb6a3a2ff3466ee6f73a549a45c82acaaa51f20d Mon Sep 17 00:00:00 2001 From: Yingwen Date: Tue, 11 Nov 2025 21:19:00 +0800 Subject: [PATCH] feat: support altering sst format for a table (#7206) * refactor: remove memtable_builder from MitoRegion Signed-off-by: evenyag * chore: add alter format Signed-off-by: evenyag * feat: support changing the format and memtable Signed-off-by: evenyag * feat: support changing sst format via table options Signed-off-by: evenyag * fix: set scanner and memtable builder with correct format Signed-off-by: evenyag * style: fix clippy Signed-off-by: evenyag * fix: fix incorrect metadata in version after alter Signed-off-by: evenyag * test: add sqlness test Signed-off-by: evenyag * test: replace region_id in sqlness result Signed-off-by: evenyag * fix: create correct memtable when setting sst_format explicitly Signed-off-by: evenyag * test: sqlness alter_format test set sst_format to primary_key Signed-off-by: evenyag * chore: remove verbose log Signed-off-by: evenyag --------- Signed-off-by: evenyag --- src/mito2/src/engine.rs | 3 +- src/mito2/src/engine/alter_test.rs | 249 +++++++++++++++++- src/mito2/src/engine/create_test.rs | 76 +++++- src/mito2/src/memtable.rs | 20 +- src/mito2/src/memtable/time_partition.rs | 26 +- src/mito2/src/memtable/version.rs | 2 +- src/mito2/src/read/scan_region.rs | 28 +- src/mito2/src/region.rs | 19 +- src/mito2/src/region/opener.rs | 75 ++++-- src/mito2/src/region/version.rs | 61 ++++- src/mito2/src/request.rs | 3 + src/mito2/src/worker/handle_alter.rs | 139 +++++++--- src/mito2/src/worker/handle_drop.rs | 4 +- src/mito2/src/worker/handle_manifest.rs | 85 ++++-- src/mito2/src/worker/handle_truncate.rs | 2 +- src/store-api/src/region_request.rs | 11 +- src/table/src/metadata.rs | 7 +- .../common/alter/alter_format.result | 218 +++++++++++++++ .../standalone/common/alter/alter_format.sql | 81 ++++++ 19 files changed, 956 insertions(+), 153 deletions(-) create mode 100644 tests/cases/standalone/common/alter/alter_format.result create mode 100644 tests/cases/standalone/common/alter/alter_format.sql diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 22d55b7a57..587552d02f 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -959,8 +959,7 @@ impl EngineInner { .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) .with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled()) .with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled()) - .with_start_time(query_start) - .with_flat_format(self.config.default_experimental_flat_format); + .with_start_time(query_start); #[cfg(feature = "enterprise")] let scan_region = self.maybe_fill_extension_range_provider(scan_region, region); diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs index 63e7c029ae..7717bbceb7 100644 --- a/src/mito2/src/engine/alter_test.rs +++ b/src/mito2/src/engine/alter_test.rs @@ -38,6 +38,7 @@ use crate::config::MitoConfig; use crate::engine::MitoEngine; use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener}; use crate::error; +use crate::sst::FormatType; use crate::test_util::{ CreateRequestBuilder, TestEnv, build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, @@ -902,13 +903,13 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) { #[tokio::test] async fn test_write_stall_on_altering() { + common_telemetry::init_default_ut_logging(); + test_write_stall_on_altering_with_format(false).await; test_write_stall_on_altering_with_format(true).await; } async fn test_write_stall_on_altering_with_format(flat_format: bool) { - common_telemetry::init_default_ut_logging(); - let mut env = TestEnv::new().await; let listener = Arc::new(NotifyRegionChangeResultListener::default()); let engine = env @@ -980,3 +981,247 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) { let batches = RecordBatches::try_collect(stream).await.unwrap(); assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_alter_region_sst_format_with_flush() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: false, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let column_schemas = rows_schema(&request); + let table_dir = request.table_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + // Inserts some data before alter + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Flushes to create SST files with primary_key format + flush_region(&engine, region_id, None).await; + + let expected_data = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected_data, batches.pretty_print().unwrap()); + + // Alters sst_format from primary_key to flat + let alter_format_request = RegionAlterRequest { + kind: AlterKind::SetRegionOptions { + options: vec![SetRegionOption::Format("flat".to_string())], + }, + }; + engine + .handle_request(region_id, RegionRequest::Alter(alter_format_request)) + .await + .unwrap(); + + // Inserts more data after alter + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(3, 6), + }; + put_rows(&engine, region_id, rows).await; + + // Flushes to create SST files with flat format + flush_region(&engine, region_id, None).await; + + let expected_all_data = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | +| 5 | 5.0 | 1970-01-01T00:00:05 | ++-------+---------+---------------------+"; + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected_all_data, batches.pretty_print().unwrap()); + + // Reopens region to verify format persists + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: false, + ..Default::default() + }, + ) + .await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + table_dir, + path_type: PathType::Bare, + options: HashMap::default(), + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected_all_data, batches.pretty_print().unwrap()); +} + +#[tokio::test] +async fn test_alter_region_sst_format_without_flush() { + common_telemetry::init_default_ut_logging(); + + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: false, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().build(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let column_schemas = rows_schema(&request); + let table_dir = request.table_dir.clone(); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let check_format = |engine: &MitoEngine, expected: Option| { + let current_format = engine + .get_region(region_id) + .unwrap() + .version() + .options + .sst_format; + assert_eq!(current_format, expected); + }; + check_format(&engine, Some(FormatType::PrimaryKey)); + + // Inserts some data before alter + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + // Alters sst_format from primary_key to flat + let alter_format_request = RegionAlterRequest { + kind: AlterKind::SetRegionOptions { + options: vec![SetRegionOption::Format("flat".to_string())], + }, + }; + engine + .handle_request(region_id, RegionRequest::Alter(alter_format_request)) + .await + .unwrap(); + + check_format(&engine, Some(FormatType::Flat)); + + // Inserts more data after alter + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(3, 6), + }; + put_rows(&engine, region_id, rows).await; + + let expected_all_data = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | +| 3 | 3.0 | 1970-01-01T00:00:03 | +| 4 | 4.0 | 1970-01-01T00:00:04 | +| 5 | 5.0 | 1970-01-01T00:00:05 | ++-------+---------+---------------------+"; + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected_all_data, batches.pretty_print().unwrap()); + + // Reopens region to verify format persists + let engine = env + .reopen_engine( + engine, + MitoConfig { + default_experimental_flat_format: false, + ..Default::default() + }, + ) + .await; + engine + .handle_request( + region_id, + RegionRequest::Open(RegionOpenRequest { + engine: String::new(), + table_dir, + path_type: PathType::Bare, + options: HashMap::default(), + skip_wal_replay: false, + checkpoint: None, + }), + ) + .await + .unwrap(); + + check_format(&engine, Some(FormatType::Flat)); + + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected_all_data, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/engine/create_test.rs b/src/mito2/src/engine/create_test.rs index 7ba7aab225..e5980d9442 100644 --- a/src/mito2/src/engine/create_test.rs +++ b/src/mito2/src/engine/create_test.rs @@ -23,7 +23,7 @@ use store_api::storage::{RegionId, ScanRequest}; use crate::config::MitoConfig; use crate::region::options::MemtableOptions; use crate::test_util::{ - CreateRequestBuilder, TestEnv, build_rows, put_rows, reopen_region, rows_schema, + CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, reopen_region, rows_schema, }; #[tokio::test] @@ -380,3 +380,77 @@ async fn create_with_partition_expr_persists_manifest_with_format(flat_format: b let manifest = region.manifest_ctx.manifest().await; assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json)); } + +#[tokio::test] +async fn test_engine_create_with_format() { + common_telemetry::init_default_ut_logging(); + + test_engine_create_with_format_one_case("primary_key", false).await; + test_engine_create_with_format_one_case("primary_key", true).await; + test_engine_create_with_format_one_case("flat", false).await; + test_engine_create_with_format_one_case("flat", true).await; +} + +async fn test_engine_create_with_format_one_case(create_format: &str, default_flat_format: bool) { + common_telemetry::info!( + "Test engine create with format, create_format: {}, default_flat_format: {}", + create_format, + default_flat_format + ); + + let mut env = TestEnv::new().await; + let engine = env + .create_engine(MitoConfig { + default_experimental_flat_format: default_flat_format, + ..Default::default() + }) + .await; + + let region_id = RegionId::new(1, 1); + let request = CreateRequestBuilder::new() + .insert_option("sst_format", create_format) + .build(); + + env.get_schema_metadata_manager() + .register_region_table_info( + region_id.table_id(), + "test_table", + "test_catalog", + "test_schema", + None, + env.get_kv_backend(), + ) + .await; + + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id, RegionRequest::Create(request)) + .await + .unwrap(); + + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows(0, 3), + }; + put_rows(&engine, region_id, rows).await; + + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| 0 | 0.0 | 1970-01-01T00:00:00 | +| 1 | 1.0 | 1970-01-01T00:00:01 | +| 2 | 2.0 | 1970-01-01T00:00:02 | ++-------+---------+---------------------+"; + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, batches.pretty_print().unwrap()); + + flush_region(&engine, region_id, None).await; + + let request = ScanRequest::default(); + let stream = engine.scan_to_stream(region_id, request).await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index 2d430b2356..ea3875ac7a 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -422,6 +422,7 @@ impl MemtableBuilderProvider { ); } + // The format is not flat. match &options.memtable { Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new( self.write_buffer_manager.clone(), @@ -440,22 +441,15 @@ impl MemtableBuilderProvider { self.write_buffer_manager.clone(), )) } - None => self.default_memtable_builder(dedup, merge_mode), + None => self.default_primary_key_memtable_builder(dedup, merge_mode), } } - fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef { - if self.config.default_experimental_flat_format { - return Arc::new( - BulkMemtableBuilder::new( - self.write_buffer_manager.clone(), - !dedup, // append_mode: true if not dedup, false if dedup - merge_mode, - ) - .with_compact_dispatcher(self.compact_dispatcher.clone()), - ); - } - + fn default_primary_key_memtable_builder( + &self, + dedup: bool, + merge_mode: MergeMode, + ) -> MemtableBuilderRef { match &self.config.memtable { MemtableConfig::PartitionTree(config) => { let mut config = config.clone(); diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 6038d5cd20..9131de32a5 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -353,12 +353,13 @@ impl TimePartitions { .builder .build(inner.alloc_memtable_id(), &self.metadata); debug!( - "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}", + "Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}, metadata: {:?}", range, self.metadata.region_id, self.part_duration, memtable.id(), - inner.parts.len() + 1 + inner.parts.len() + 1, + self.metadata, ); let pos = inner.parts.len(); inner.parts.push(TimePartition { @@ -454,6 +455,11 @@ impl TimePartitions { self.part_duration } + /// Returns the memtable builder. + pub(crate) fn memtable_builder(&self) -> &MemtableBuilderRef { + &self.builder + } + /// Returns memory usage. pub(crate) fn memory_usage(&self) -> usize { let inner = self.inner.lock().unwrap(); @@ -488,12 +494,16 @@ impl TimePartitions { /// Creates a new empty partition list from this list and a `part_duration`. /// It falls back to the old partition duration if `part_duration` is `None`. - pub(crate) fn new_with_part_duration(&self, part_duration: Option) -> Self { + pub(crate) fn new_with_part_duration( + &self, + part_duration: Option, + memtable_builder: Option, + ) -> Self { debug_assert!(self.is_empty()); Self::new( self.metadata.clone(), - self.builder.clone(), + memtable_builder.unwrap_or_else(|| self.builder.clone()), self.next_memtable_id(), Some(part_duration.unwrap_or(self.part_duration)), ) @@ -941,17 +951,17 @@ mod tests { let builder = Arc::new(PartitionTreeMemtableBuilder::default()); let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None); - let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5))); + let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)), None); assert_eq!(Duration::from_secs(5), new_parts.part_duration()); assert_eq!(0, new_parts.next_memtable_id()); // Won't update the duration if it's None. - let new_parts = new_parts.new_with_part_duration(None); + let new_parts = new_parts.new_with_part_duration(None, None); assert_eq!(Duration::from_secs(5), new_parts.part_duration()); // Don't need to create new memtables. assert_eq!(0, new_parts.next_memtable_id()); - let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10))); + let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)), None); assert_eq!(Duration::from_secs(10), new_parts.part_duration()); // Don't need to create new memtables. assert_eq!(0, new_parts.next_memtable_id()); @@ -959,7 +969,7 @@ mod tests { let builder = Arc::new(PartitionTreeMemtableBuilder::default()); let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None); // Need to build a new memtable as duration is still None. - let new_parts = partitions.new_with_part_duration(None); + let new_parts = partitions.new_with_part_duration(None, None); assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration()); assert_eq!(0, new_parts.next_memtable_id()); } diff --git a/src/mito2/src/memtable/version.rs b/src/mito2/src/memtable/version.rs index 537332d3b7..25c7535927 100644 --- a/src/mito2/src/memtable/version.rs +++ b/src/mito2/src/memtable/version.rs @@ -82,7 +82,7 @@ impl MemtableVersion { } // Update the time window. - let mutable = self.mutable.new_with_part_duration(time_window); + let mutable = self.mutable.new_with_part_duration(time_window, None); common_telemetry::debug!( "Freeze empty memtable, update partition duration from {:?} to {:?}", self.mutable.part_duration(), diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index aa158389e0..babdd43b0b 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -60,6 +60,7 @@ use crate::read::unordered_scan::UnorderedScan; use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source}; use crate::region::options::MergeMode; use crate::region::version::VersionRef; +use crate::sst::FormatType; use crate::sst::file::FileHandle; use crate::sst::index::bloom_filter::applier::{ BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef, @@ -221,8 +222,6 @@ pub(crate) struct ScanRegion { /// Whether to filter out the deleted rows. /// Usually true for normal read, and false for scan for compaction. filter_deleted: bool, - /// Whether to use flat format. - flat_format: bool, #[cfg(feature = "enterprise")] extension_range_provider: Option, } @@ -247,7 +246,6 @@ impl ScanRegion { ignore_bloom_filter: false, start_time: None, filter_deleted: true, - flat_format: false, #[cfg(feature = "enterprise")] extension_range_provider: None, } @@ -304,13 +302,6 @@ impl ScanRegion { self.filter_deleted = filter_deleted; } - /// Sets whether to use flat format. - #[must_use] - pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self { - self.flat_format = flat_format; - self - } - #[cfg(feature = "enterprise")] pub(crate) fn set_extension_range_provider( &mut self, @@ -385,18 +376,24 @@ impl ScanRegion { self.request.distribution == Some(TimeSeriesDistribution::PerSeries) } + /// Returns true if the region use flat format. + fn use_flat_format(&self) -> bool { + self.version.options.sst_format.unwrap_or_default() == FormatType::Flat + } + /// Creates a scan input. async fn scan_input(mut self) -> Result { let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new); let time_range = self.build_time_range_predicate(); let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?; + let flat_format = self.use_flat_format(); // The mapper always computes projected column ids as the schema of SSTs may change. let mapper = match &self.request.projection { Some(p) => { - ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)? + ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)? } - None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?, + None => ProjectionMapper::all(&self.version.metadata, flat_format)?, }; let ssts = &self.version.ssts; @@ -463,13 +460,14 @@ impl ScanRegion { let region_id = self.region_id(); debug!( - "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}", + "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}", region_id, self.request, time_range, mem_range_builders.len(), files.len(), self.version.options.append_mode, + flat_format, ); let (non_field_filters, field_filters) = self.partition_by_field_filters(); @@ -487,7 +485,7 @@ impl ScanRegion { ]; let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?; - if self.flat_format { + if flat_format { // The batch is already large enough so we use a small channel size here. self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE; } @@ -509,7 +507,7 @@ impl ScanRegion { .with_merge_mode(self.version.options.merge_mode()) .with_series_row_selector(self.request.series_row_selector) .with_distribution(self.request.distribution) - .with_flat_format(self.flat_format); + .with_flat_format(flat_format); #[cfg(feature = "enterprise")] let input = if let Some(provider) = self.extension_range_provider { diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs index df63b5e4d8..76ff739351 100644 --- a/src/mito2/src/region.rs +++ b/src/mito2/src/region.rs @@ -47,10 +47,8 @@ use crate::manifest::action::{ RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList, }; use crate::manifest::manager::RegionManifestManager; -use crate::memtable::MemtableBuilderRef; use crate::region::version::{VersionControlRef, VersionRef}; use crate::request::{OnFailure, OptionOutputTx}; -use crate::sst::FormatType; use crate::sst::file_purger::FilePurgerRef; use crate::sst::location::{index_file_path, sst_file_path}; use crate::time_provider::TimeProviderRef; @@ -140,10 +138,6 @@ pub struct MitoRegion { pub(crate) topic_latest_entry_id: AtomicU64, /// The total bytes written to the region. pub(crate) written_bytes: Arc, - /// Memtable builder for the region. - pub(crate) memtable_builder: MemtableBuilderRef, - /// Format type of the SST file. - pub(crate) sst_format: FormatType, /// manifest stats stats: ManifestStats, } @@ -200,11 +194,6 @@ impl MitoRegion { self.last_compaction_millis.load(Ordering::Relaxed) } - /// Returns format type of the SST file. - pub(crate) fn sst_format(&self) -> FormatType { - self.sst_format - } - /// Update compaction time to current time. pub(crate) fn update_compaction_millis(&self) { let now = self.time_provider.current_time_millis(); @@ -460,11 +449,12 @@ impl MitoRegion { if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) { // Persist backfilled metadata if manifest is missing fields (e.g., partition_expr) let manifest_meta = &manager.manifest().metadata; - let current_meta = &self.version().metadata; + let current_version = self.version(); + let current_meta = ¤t_version.metadata; if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() { let action = RegionMetaAction::Change(RegionChange { metadata: current_meta.clone(), - sst_format: self.sst_format(), + sst_format: current_version.options.sst_format.unwrap_or_default(), }); let result = manager .update( @@ -1220,7 +1210,6 @@ mod tests { use crate::sst::FormatType; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; - use crate::test_util::memtable_util::EmptyMemtableBuilder; use crate::test_util::scheduler_util::SchedulerEnv; use crate::test_util::version_util::VersionControlBuilder; use crate::time_provider::StdTimeProvider; @@ -1391,8 +1380,6 @@ mod tests { time_provider: Arc::new(StdTimeProvider), topic_latest_entry_id: Default::default(), written_bytes: Arc::new(AtomicU64::new(0)), - memtable_builder: Arc::new(EmptyMemtableBuilder::default()), - sst_format: FormatType::PrimaryKey, stats: ManifestStats::default(), }; diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index a278e068af..06e603d613 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -53,7 +53,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, Rem use crate::manifest::storage::manifest_compress_type; use crate::memtable::MemtableBuilderProvider; use crate::memtable::bulk::part::BulkPart; -use crate::memtable::time_partition::TimePartitions; +use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef}; use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES}; use crate::region::options::RegionOptions; use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef}; @@ -65,7 +65,7 @@ use crate::request::OptionOutputTx; use crate::schedule::scheduler::SchedulerRef; use crate::sst::FormatType; use crate::sst::file::RegionFileId; -use crate::sst::file_purger::create_local_file_purger; +use crate::sst::file_purger::{FilePurgerRef, create_local_file_purger}; use crate::sst::file_ref::FileReferenceManagerRef; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::puffin_manager::PuffinManagerFactory; @@ -255,17 +255,19 @@ impl RegionOpener { } } // Safety: must be set before calling this method. - let options = self.options.take().unwrap(); + let mut options = self.options.take().unwrap(); let object_store = get_object_store(&options.storage, &self.object_store_manager)?; let provider = self.provider::(&options.wal_options)?; let metadata = Arc::new(metadata); - // Set the sst_format based on options or flat_format flag + // Sets the sst_format based on options or flat_format flag let sst_format = if let Some(format) = options.sst_format { format } else if config.default_experimental_flat_format { + options.sst_format = Some(FormatType::Flat); FormatType::Flat } else { // Default to PrimaryKeyParquet for newly created regions + options.sst_format = Some(FormatType::PrimaryKey); FormatType::PrimaryKey }; // Create a manifest manager for this region and writes regions to the manifest file. @@ -293,7 +295,10 @@ impl RegionOpener { part_duration, )); - debug!("Create region {} with options: {:?}", region_id, options); + debug!( + "Create region {} with options: {:?}, default_flat_format: {}", + region_id, options, config.default_experimental_flat_format + ); let version = VersionBuilder::new(metadata, mutable) .options(options) @@ -328,9 +333,7 @@ impl RegionOpener { last_compaction_millis: AtomicI64::new(now), time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(0), - memtable_builder, written_bytes: Arc::new(AtomicU64::new(0)), - sst_format, stats: self.stats, })) } @@ -402,7 +405,7 @@ impl RegionOpener { wal: &Wal, ) -> Result> { let now = Instant::now(); - let region_options = self.options.as_ref().unwrap().clone(); + let mut region_options = self.options.as_ref().unwrap().clone(); let region_manifest_options = Self::manifest_options( config, @@ -432,6 +435,8 @@ impl RegionOpener { } else { manifest.metadata.clone() }; + // Updates the region options with the manifest. + sanitize_region_options(&manifest, &mut region_options); let region_id = self.region_id; let provider = self.provider::(®ion_options.wal_options)?; @@ -460,6 +465,7 @@ impl RegionOpener { self.cache_manager.clone(), self.file_ref_manager.clone(), ); + // We should sanitize the region options before creating a new memtable. let memtable_builder = self .memtable_builder_provider .builder_for_options(®ion_options); @@ -476,14 +482,16 @@ impl RegionOpener { 0, part_duration, )); - let version = VersionBuilder::new(metadata, mutable) - .add_files(file_purger.clone(), manifest.files.values().cloned()) - .flushed_entry_id(manifest.flushed_entry_id) - .flushed_sequence(manifest.flushed_sequence) - .truncated_entry_id(manifest.truncated_entry_id) - .compaction_time_window(manifest.compaction_time_window) - .options(region_options) - .build(); + + // Updates region options by manifest before creating version. + let version_builder = version_builder_from_manifest( + &manifest, + metadata, + file_purger.clone(), + mutable, + region_options, + ); + let version = version_builder.build(); let flushed_entry_id = version.flushed_entry_id; let version_control = Arc::new(VersionControl::new(version)); @@ -545,8 +553,6 @@ impl RegionOpener { } let now = self.time_provider.current_time_millis(); - // Read sst_format from manifest - let sst_format = manifest.sst_format; let region = MitoRegion { region_id: self.region_id, @@ -564,8 +570,6 @@ impl RegionOpener { time_provider: self.time_provider.clone(), topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id), written_bytes: Arc::new(AtomicU64::new(0)), - memtable_builder, - sst_format, stats: self.stats.clone(), }; @@ -599,6 +603,37 @@ impl RegionOpener { } } +/// Creates a version builder from a region manifest. +pub(crate) fn version_builder_from_manifest( + manifest: &RegionManifest, + metadata: RegionMetadataRef, + file_purger: FilePurgerRef, + mutable: TimePartitionsRef, + region_options: RegionOptions, +) -> VersionBuilder { + VersionBuilder::new(metadata, mutable) + .add_files(file_purger, manifest.files.values().cloned()) + .flushed_entry_id(manifest.flushed_entry_id) + .flushed_sequence(manifest.flushed_sequence) + .truncated_entry_id(manifest.truncated_entry_id) + .compaction_time_window(manifest.compaction_time_window) + .options(region_options) +} + +/// Updates region options by persistent options. +pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) { + let option_format = options.sst_format.unwrap_or_default(); + if option_format != manifest.sst_format { + common_telemetry::warn!( + "Overriding SST format from {:?} to {:?} for region {}", + option_format, + manifest.sst_format, + manifest.metadata.region_id, + ); + options.sst_format = Some(manifest.sst_format); + } +} + /// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store. pub fn get_object_store( name: &Option, diff --git a/src/mito2/src/region/version.rs b/src/mito2/src/region/version.rs index c7438b196a..79391e324d 100644 --- a/src/mito2/src/region/version.rs +++ b/src/mito2/src/region/version.rs @@ -161,13 +161,14 @@ impl VersionControl { } /// Mark all opened files as deleted and set the delete marker in [VersionControlData] - pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) { + pub(crate) fn mark_dropped(&self) { let version = self.current().version; let part_duration = Some(version.memtables.mutable.part_duration()); let next_memtable_id = version.memtables.mutable.next_memtable_id(); + let memtable_builder = version.memtables.mutable.memtable_builder().clone(); let new_mutable = Arc::new(TimePartitions::new( version.metadata.clone(), - memtable_builder.clone(), + memtable_builder, next_memtable_id, part_duration, )); @@ -185,13 +186,14 @@ impl VersionControl { /// /// It replaces existing mutable memtable with a memtable that uses the /// new schema. Memtables of the version must be empty. - pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) { + pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef) { let version = self.current().version; let part_duration = Some(version.memtables.mutable.part_duration()); let next_memtable_id = version.memtables.mutable.next_memtable_id(); + let memtable_builder = version.memtables.mutable.memtable_builder().clone(); let new_mutable = Arc::new(TimePartitions::new( metadata.clone(), - builder.clone(), + memtable_builder, next_memtable_id, part_duration, )); @@ -208,19 +210,50 @@ impl VersionControl { version_data.version = new_version; } - /// Truncate current version. - pub(crate) fn truncate( + /// Alter schema and format of the region. + /// + /// It replaces existing mutable memtable with a memtable that uses the + /// new format. Memtables of the version must be empty. + pub(crate) fn alter_schema_and_format( &self, - truncate_kind: TruncateKind, - memtable_builder: &MemtableBuilderRef, + metadata: RegionMetadataRef, + options: RegionOptions, + memtable_builder: MemtableBuilderRef, ) { let version = self.current().version; + let part_duration = Some(version.memtables.mutable.part_duration()); + let next_memtable_id = version.memtables.mutable.next_memtable_id(); + // Use the new metadata to build `TimePartitions`. + let new_mutable = Arc::new(TimePartitions::new( + metadata.clone(), + memtable_builder, + next_memtable_id, + part_duration, + )); + debug_assert!(version.memtables.mutable.is_empty()); + debug_assert!(version.memtables.immutables().is_empty()); + let new_version = Arc::new( + VersionBuilder::from_version(version) + .metadata(metadata) + .options(options) + .memtables(MemtableVersion::new(new_mutable)) + .build(), + ); + + let mut version_data = self.data.write().unwrap(); + version_data.version = new_version; + } + + /// Truncate current version. + pub(crate) fn truncate(&self, truncate_kind: TruncateKind) { + let version = self.current().version; let part_duration = version.memtables.mutable.part_duration(); let next_memtable_id = version.memtables.mutable.next_memtable_id(); + let memtable_builder = version.memtables.mutable.memtable_builder().clone(); let new_mutable = Arc::new(TimePartitions::new( version.metadata.clone(), - memtable_builder.clone(), + memtable_builder, next_memtable_id, Some(part_duration), )); @@ -230,7 +263,9 @@ impl VersionControl { truncated_sequence, } => { let new_version = Arc::new( - VersionBuilder::new(version.metadata.clone(), new_mutable) + VersionBuilder::from_version(version) + .memtables(MemtableVersion::new(new_mutable)) + .clear_files() .flushed_entry_id(truncated_entry_id) .flushed_sequence(truncated_sequence) .truncated_entry_id(Some(truncated_entry_id)) @@ -456,6 +491,12 @@ impl VersionBuilder { self } + /// Clear all files in the builder. + pub(crate) fn clear_files(mut self) -> Self { + self.ssts = Arc::new(SstVersion::new()); + self + } + /// Builds a new [Version] from the builder. /// It overwrites the window size by compaction option. pub(crate) fn build(self) -> Version { diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs index 3fe888decc..65a1fff9ef 100644 --- a/src/mito2/src/request.rs +++ b/src/mito2/src/request.rs @@ -50,6 +50,7 @@ use crate::manifest::action::{RegionEdit, TruncateKind}; use crate::memtable::MemtableId; use crate::memtable::bulk::part::BulkPart; use crate::metrics::COMPACTION_ELAPSED_TOTAL; +use crate::region::options::RegionOptions; use crate::sst::file::FileMeta; use crate::sst::index::IndexBuildType; use crate::wal::EntryId; @@ -948,6 +949,8 @@ pub(crate) struct RegionChangeResult { pub(crate) result: Result<()>, /// Used for index build in schema change. pub(crate) need_index: bool, + /// New options for the region. + pub(crate) new_options: Option, } /// Request to edit a region directly. diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs index e02c4cd33c..39a1fa665a 100644 --- a/src/mito2/src/worker/handle_alter.rs +++ b/src/mito2/src/worker/handle_alter.rs @@ -21,7 +21,7 @@ use common_base::readable_size::ReadableSize; use common_telemetry::info; use common_telemetry::tracing::warn; use humantime_serde::re::humantime; -use snafu::ResultExt; +use snafu::{ResultExt, ensure}; use store_api::metadata::{ InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder, RegionMetadataRef, @@ -35,9 +35,10 @@ use crate::flush::FlushReason; use crate::manifest::action::RegionChange; use crate::region::MitoRegionRef; use crate::region::options::CompactionOptions::Twcs; -use crate::region::options::TwcsOptions; +use crate::region::options::{RegionOptions, TwcsOptions}; use crate::region::version::VersionRef; use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest}; +use crate::sst::FormatType; use crate::worker::RegionWorkerLoop; impl RegionWorkerLoop { @@ -57,36 +58,38 @@ impl RegionWorkerLoop { info!("Try to alter region: {}, request: {:?}", region_id, request); - // Get the version before alter. - let version = region.version(); + // Gets the version before alter. + let mut version = region.version(); // fast path for memory state changes like options. - match request.kind { - AlterKind::SetRegionOptions { options } => { - match self.handle_alter_region_options(region, version, options) { - Ok(_) => sender.send(Ok(0)), - Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)), - } - return; - } + let set_options = match &request.kind { + AlterKind::SetRegionOptions { options } => options.clone(), AlterKind::UnsetRegionOptions { keys } => { // Converts the keys to SetRegionOption. // // It passes an empty string to achieve the purpose of unset - match self.handle_alter_region_options( - region, - version, - keys.iter().map(Into::into).collect(), - ) { - Ok(_) => sender.send(Ok(0)), - Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)), - } - return; + keys.iter().map(Into::into).collect() + } + _ => Vec::new(), + }; + if !set_options.is_empty() { + match self.handle_alter_region_options_fast(®ion, version, set_options) { + Ok(new_version) => { + let Some(new_version) = new_version else { + // We don't have options to alter after flush. + sender.send(Ok(0)); + return; + }; + version = new_version; + } + Err(e) => { + sender.send(Err(e).context(InvalidMetadataSnafu)); + return; + } } - _ => {} } - // Validate request. + // Validates request. if let Err(e) = request.validate(&version.metadata) { // Invalid request. sender.send(Err(e).context(InvalidRegionRequestSnafu)); @@ -132,14 +135,15 @@ impl RegionWorkerLoop { } info!( - "Try to alter region {}, version.metadata: {:?}, request: {:?}", - region_id, version.metadata, request, + "Try to alter region {}, version.metadata: {:?}, version.options: {:?}, request: {:?}", + region_id, version.metadata, version.options, request, ); - self.handle_alter_region_metadata(region, version, request, sender); + self.handle_alter_region_with_empty_memtable(region, version, request, sender); } - /// Handles region metadata changes. - fn handle_alter_region_metadata( + // TODO(yingwen): Optional new options and sst format. + /// Handles region metadata and format changes when the region memtable is empty. + fn handle_alter_region_with_empty_memtable( &mut self, region: MitoRegionRef, version: VersionRef, @@ -147,6 +151,7 @@ impl RegionWorkerLoop { sender: OptionOutputTx, ) { let need_index = need_change_index(&request.kind); + let new_options = new_region_options_on_empty_memtable(&version.options, &request.kind); let new_meta = match metadata_after_alteration(&version.metadata, request) { Ok(new_meta) => new_meta, Err(e) => { @@ -157,19 +162,30 @@ impl RegionWorkerLoop { // Persist the metadata to region's manifest. let change = RegionChange { metadata: new_meta, - sst_format: region.sst_format(), + sst_format: new_options + .as_ref() + .unwrap_or(&version.options) + .sst_format + .unwrap_or_default(), }; - self.handle_manifest_region_change(region, change, need_index, sender); + self.handle_manifest_region_change(region, change, need_index, new_options, sender); } /// Handles requests that changes region options, like TTL. It only affects memory state /// since changes are persisted in the `DatanodeTableValue` in metasrv. - fn handle_alter_region_options( + /// + /// If the options require empty memtable, it only does validation. + /// + /// Returns a new version with the updated options if it needs further alteration. + fn handle_alter_region_options_fast( &mut self, - region: MitoRegionRef, + region: &MitoRegionRef, version: VersionRef, options: Vec, - ) -> std::result::Result<(), MetadataError> { + ) -> std::result::Result, MetadataError> { + assert!(!options.is_empty()); + + let mut all_options_altered = true; let mut current_options = version.options.clone(); for option in options { match option { @@ -190,13 +206,68 @@ impl RegionWorkerLoop { region.region_id, )?; } + SetRegionOption::Format(format_str) => { + let new_format = format_str.parse::().map_err(|_| { + store_api::metadata::InvalidRegionRequestSnafu { + region_id: region.region_id, + err: format!("Invalid format type: {}", format_str), + } + .build() + })?; + // If the format is unchanged, we also consider the option is altered. + if new_format != current_options.sst_format.unwrap_or_default() { + all_options_altered = false; + + // Validates the format type. + ensure!( + new_format == FormatType::Flat, + store_api::metadata::InvalidRegionRequestSnafu { + region_id: region.region_id, + err: "Only allow changing format type to flat", + } + ); + } + } } } region.version_control.alter_options(current_options); - Ok(()) + if all_options_altered { + Ok(None) + } else { + Ok(Some(region.version())) + } } } +/// Returns the new region options if there are updates to the options. +fn new_region_options_on_empty_memtable( + current_options: &RegionOptions, + kind: &AlterKind, +) -> Option { + let AlterKind::SetRegionOptions { options } = kind else { + return None; + }; + + if options.is_empty() { + return None; + } + + let mut current_options = current_options.clone(); + for option in options { + match option { + SetRegionOption::Ttl(_) | SetRegionOption::Twsc(_, _) => (), + SetRegionOption::Format(format_str) => { + // Safety: handle_alter_region_options_fast() has validated this. + let new_format = format_str.parse::().unwrap(); + assert_eq!(FormatType::Flat, new_format); + + current_options.sst_format = Some(new_format); + } + } + } + Some(current_options) +} + /// Creates a metadata after applying the alter `request` to the old `metadata`. /// /// Returns an error if the `request` is invalid. diff --git a/src/mito2/src/worker/handle_drop.rs b/src/mito2/src/worker/handle_drop.rs index 2786126076..84337bd9d0 100644 --- a/src/mito2/src/worker/handle_drop.rs +++ b/src/mito2/src/worker/handle_drop.rs @@ -89,9 +89,7 @@ where .await; // Marks region version as dropped - region - .version_control - .mark_dropped(®ion.memtable_builder); + region.version_control.mark_dropped(); info!( "Region {} is dropped logically, but some files are not deleted yet", region_id diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs index 78a4b16210..c91c7adc6b 100644 --- a/src/mito2/src/worker/handle_manifest.rs +++ b/src/mito2/src/worker/handle_manifest.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use common_telemetry::{info, warn}; use store_api::logstore::LogStore; +use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use crate::cache::CacheManagerRef; @@ -31,8 +32,11 @@ use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result}; use crate::manifest::action::{ RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate, }; +use crate::memtable::MemtableBuilderProvider; use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD; -use crate::region::version::VersionBuilder; +use crate::region::opener::{sanitize_region_options, version_builder_from_manifest}; +use crate::region::options::RegionOptions; +use crate::region::version::VersionControlRef; use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState}; use crate::request::{ BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest, @@ -102,15 +106,12 @@ impl RegionWorkerLoop { }; if change_result.result.is_ok() { - // Apply the metadata to region's version. - region - .version_control - .alter_schema(change_result.new_meta, ®ion.memtable_builder); - - let version = region.version(); - info!( - "Region {} is altered, metadata is {:?}, options: {:?}", - region.region_id, version.metadata, version.options, + // Updates the region metadata and format. + Self::update_region_version( + ®ion.version_control, + change_result.new_meta, + change_result.new_options, + &self.memtable_builder_provider, ); } @@ -164,6 +165,10 @@ impl RegionWorkerLoop { } }; let version = region.version(); + let mut region_options = version.options.clone(); + let old_format = region_options.sst_format.unwrap_or_default(); + // Updates the region options with the manifest. + sanitize_region_options(&manifest, &mut region_options); if !version.memtables.is_empty() { let current = region.version_control.current(); warn!( @@ -171,23 +176,35 @@ impl RegionWorkerLoop { region.region_id, manifest.manifest_version, current.last_entry_id ); } - let region_options = version.options.clone(); + + // We should sanitize the region options before creating a new memtable. + let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() { + // Format changed, also needs to replace the memtable builder. + Some( + self.memtable_builder_provider + .builder_for_options(®ion_options), + ) + } else { + None + }; let new_mutable = Arc::new( region .version() .memtables .mutable - .new_with_part_duration(version.compaction_time_window), + .new_with_part_duration(version.compaction_time_window, memtable_builder), ); + // Here it assumes the leader has backfilled the partition_expr of the metadata. let metadata = manifest.metadata.clone(); - let version = VersionBuilder::new(metadata, new_mutable) - .add_files(region.file_purger.clone(), manifest.files.values().cloned()) - .flushed_entry_id(manifest.flushed_entry_id) - .flushed_sequence(manifest.flushed_sequence) - .truncated_entry_id(manifest.truncated_entry_id) - .compaction_time_window(manifest.compaction_time_window) - .options(region_options) - .build(); + + let version_builder = version_builder_from_manifest( + &manifest, + metadata, + region.file_purger.clone(), + new_mutable, + region_options, + ); + let version = version_builder.build(); region.version_control.overwrite_current(Arc::new(version)); let updated = manifest.manifest_version > original_manifest_version; @@ -364,6 +381,7 @@ impl RegionWorkerLoop { region: MitoRegionRef, change: RegionChange, need_index: bool, + new_options: Option, sender: OptionOutputTx, ) { // Marks the region as altering. @@ -391,6 +409,7 @@ impl RegionWorkerLoop { result, new_meta, need_index, + new_options, }), }; listener @@ -408,6 +427,32 @@ impl RegionWorkerLoop { } }); } + + fn update_region_version( + version_control: &VersionControlRef, + new_meta: RegionMetadataRef, + new_options: Option, + memtable_builder_provider: &MemtableBuilderProvider, + ) { + let options_changed = new_options.is_some(); + let region_id = new_meta.region_id; + if let Some(new_options) = new_options { + // Needs to update the region with new format and memtables. + // Creates a new memtable builder for the new options as it may change the memtable type. + let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options); + version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder); + } else { + // Only changes the schema. + version_control.alter_schema(new_meta); + } + + let version_data = version_control.current(); + let version = version_data.version; + info!( + "Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}", + region_id, version.metadata, version.options, options_changed, + ); + } } /// Checks the edit, writes and applies it. diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs index 64f8488da1..0867560a7b 100644 --- a/src/mito2/src/worker/handle_truncate.rs +++ b/src/mito2/src/worker/handle_truncate.rs @@ -129,7 +129,7 @@ impl RegionWorkerLoop { // Applies the truncate action to the region. region .version_control - .truncate(truncate_result.kind.clone(), ®ion.memtable_builder); + .truncate(truncate_result.kind.clone()); } Err(e) => { // Unable to truncate the region. diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 695b175a5f..7fb4b7fda0 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -51,7 +51,7 @@ use crate::metadata::{ use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; use crate::metrics; use crate::mito_engine_options::{ - TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM, + SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM, }; use crate::path_utils::table_dir; use crate::storage::{ColumnId, RegionId, ScanRequest}; @@ -881,11 +881,7 @@ impl AlterKind { AlterKind::ModifyColumnTypes { columns } => columns .iter() .any(|col_to_change| col_to_change.need_alter(metadata)), - AlterKind::SetRegionOptions { .. } => { - // we need to update region options for `ChangeTableOptions`. - // todo: we need to check if ttl has ever changed. - true - } + AlterKind::SetRegionOptions { .. } => true, AlterKind::UnsetRegionOptions { .. } => true, AlterKind::SetIndexes { options, .. } => options .iter() @@ -1258,6 +1254,8 @@ pub enum SetRegionOption { Ttl(Option), // Modifying TwscOptions with values as (option name, new value). Twsc(String, String), + // Modifying the SST format. + Format(String), } impl TryFrom<&PbOption> for SetRegionOption { @@ -1275,6 +1273,7 @@ impl TryFrom<&PbOption> for SetRegionOption { TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => { Ok(Self::Twsc(key.clone(), value.clone())) } + SST_FORMAT_KEY => Ok(Self::Format(value.clone())), _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(), } } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index b8537fc93e..f86a1edbb5 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -29,7 +29,7 @@ use derive_builder::Builder; use serde::{Deserialize, Deserializer, Serialize}; use snafu::{OptionExt, ResultExt, ensure}; use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY; -use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS}; +use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY}; use store_api::region_request::{SetRegionOption, UnsetRegionOption}; use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId}; @@ -292,6 +292,11 @@ impl TableMeta { new_options.extra_options.remove(key.as_str()); } } + SetRegionOption::Format(value) => { + new_options + .extra_options + .insert(SST_FORMAT_KEY.to_string(), value.clone()); + } } } let mut builder = self.new_meta_builder(); diff --git a/tests/cases/standalone/common/alter/alter_format.result b/tests/cases/standalone/common/alter/alter_format.result new file mode 100644 index 0000000000..d38c63997d --- /dev/null +++ b/tests/cases/standalone/common/alter/alter_format.result @@ -0,0 +1,218 @@ +CREATE TABLE test_alt_format(h INTEGER, i INTEGER DEFAULT 0, j TIMESTAMP TIME INDEX, PRIMARY KEY (h)) WITH ('sst_format' = 'primary_key'); + +Affected Rows: 0 + +ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key'; + +Affected Rows: 0 + +INSERT INTO test_alt_format (h, j) VALUES (10, 0); + +Affected Rows: 1 + +ALTER TABLE test_alt_format ADD COLUMN k INTEGER; + +Affected Rows: 0 + +INSERT INTO test_alt_format (h, j) VALUES (11, 1); + +Affected Rows: 1 + +-- SQLNESS SORT_RESULT 3 1 +SELECT * FROM test_alt_format; + ++----+---+-------------------------+---+ +| h | i | j | k | ++----+---+-------------------------+---+ +| 10 | 0 | 1970-01-01T00:00:00 | | +| 11 | 0 | 1970-01-01T00:00:00.001 | | ++----+---+-------------------------+---+ + +-- SQLNESS SORT_RESULT 3 1 +SELECT i, h FROM test_alt_format; + ++---+----+ +| i | h | ++---+----+ +| 0 | 10 | +| 0 | 11 | ++---+----+ + +ALTER TABLE test_alt_format SET 'sst_format' = 'flat'; + +Affected Rows: 0 + +-- SQLNESS SORT_RESULT 3 1 +SELECT * FROM test_alt_format; + ++----+---+-------------------------+---+ +| h | i | j | k | ++----+---+-------------------------+---+ +| 10 | 0 | 1970-01-01T00:00:00 | | +| 11 | 0 | 1970-01-01T00:00:00.001 | | ++----+---+-------------------------+---+ + +INSERT INTO test_alt_format (h, j) VALUES (12, 2); + +Affected Rows: 1 + +INSERT INTO test_alt_format (h, j, i, k) VALUES (13, 3, 23, 33); + +Affected Rows: 1 + +-- SQLNESS SORT_RESULT 3 1 +SELECT * FROM test_alt_format; + ++----+----+-------------------------+----+ +| h | i | j | k | ++----+----+-------------------------+----+ +| 10 | 0 | 1970-01-01T00:00:00 | | +| 11 | 0 | 1970-01-01T00:00:00.001 | | +| 12 | 0 | 1970-01-01T00:00:00.002 | | +| 13 | 23 | 1970-01-01T00:00:00.003 | 33 | ++----+----+-------------------------+----+ + +-- SQLNESS SORT_RESULT 3 1 +SELECT i, h FROM test_alt_format; + ++----+----+ +| i | h | ++----+----+ +| 0 | 10 | +| 0 | 11 | +| 0 | 12 | +| 23 | 13 | ++----+----+ + +ADMIN flush_table('test_alt_format'); + ++--------------------------------------+ +| ADMIN flush_table('test_alt_format') | ++--------------------------------------+ +| 0 | ++--------------------------------------+ + +-- SQLNESS SORT_RESULT 3 1 +SELECT * FROM test_alt_format; + ++----+----+-------------------------+----+ +| h | i | j | k | ++----+----+-------------------------+----+ +| 10 | 0 | 1970-01-01T00:00:00 | | +| 11 | 0 | 1970-01-01T00:00:00.001 | | +| 12 | 0 | 1970-01-01T00:00:00.002 | | +| 13 | 23 | 1970-01-01T00:00:00.003 | 33 | ++----+----+-------------------------+----+ + +-- SQLNESS SORT_RESULT 3 1 +SELECT i, h FROM test_alt_format; + ++----+----+ +| i | h | ++----+----+ +| 0 | 10 | +| 0 | 11 | +| 0 | 12 | +| 23 | 13 | ++----+----+ + +-- not allow to change from flat to primary_key +-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED +ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key'; + +Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing format type to flat + +DROP TABLE test_alt_format; + +Affected Rows: 0 + +CREATE TABLE alt_format_phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "sst_format" = "primary_key"); + +Affected Rows: 0 + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "alt_format_phy"); + +Affected Rows: 0 + +INSERT INTO t1 (ts, val, host) VALUES + ('2022-01-01 00:00:00', 1.23, 'example.com'), + ('2022-01-01 00:00:00', 1.23, 'hello.com'), + ('2022-01-02 00:00:00', 4.56, 'example.com'); + +Affected Rows: 3 + +ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key'; + +Affected Rows: 0 + +ALTER TABLE t1 SET 'sst_format' = 'primary_key'; + +Error: 1004(InvalidArguments), Alter logical tables invalid arguments: Only support add columns operation + +ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY; + +Affected Rows: 0 + +SELECT * FROM t1 ORDER BY ts ASC; + ++-------------+---+---------------------+------+ +| host | k | ts | val | ++-------------+---+---------------------+------+ +| example.com | | 2022-01-01T00:00:00 | 1.23 | +| hello.com | | 2022-01-01T00:00:00 | 1.23 | +| example.com | | 2022-01-02T00:00:00 | 4.56 | ++-------------+---+---------------------+------+ + +ALTER TABLE alt_format_phy SET 'sst_format' = 'flat'; + +Affected Rows: 0 + +SELECT * FROM t1 ORDER BY ts ASC; + ++-------------+---+---------------------+------+ +| host | k | ts | val | ++-------------+---+---------------------+------+ +| example.com | | 2022-01-01T00:00:00 | 1.23 | +| hello.com | | 2022-01-01T00:00:00 | 1.23 | +| example.com | | 2022-01-02T00:00:00 | 4.56 | ++-------------+---+---------------------+------+ + +SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC; + ++-------------+---------------------+------+ +| host | ts | val | ++-------------+---------------------+------+ +| example.com | 2022-01-01T00:00:00 | 1.23 | +| example.com | 2022-01-02T00:00:00 | 4.56 | ++-------------+---------------------+------+ + +INSERT INTO t1 (ts, val, host) VALUES + ('2022-01-01 00:00:01', 3.0, 'example.com'), + ('2022-01-01 00:00:01', 4.0, 'hello.com'); + +Affected Rows: 2 + +SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC; + ++-------------+---------------------+------+ +| host | ts | val | ++-------------+---------------------+------+ +| example.com | 2022-01-01T00:00:00 | 1.23 | +| example.com | 2022-01-01T00:00:01 | 3.0 | +| example.com | 2022-01-02T00:00:00 | 4.56 | ++-------------+---------------------+------+ + +-- not allow to change from flat to primary_key +-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED +ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key'; + +Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing format type to flat + +DROP TABLE t1; + +Affected Rows: 0 + +DROP TABLE alt_format_phy; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/alter/alter_format.sql b/tests/cases/standalone/common/alter/alter_format.sql new file mode 100644 index 0000000000..e1472d28e1 --- /dev/null +++ b/tests/cases/standalone/common/alter/alter_format.sql @@ -0,0 +1,81 @@ +CREATE TABLE test_alt_format(h INTEGER, i INTEGER DEFAULT 0, j TIMESTAMP TIME INDEX, PRIMARY KEY (h)) WITH ('sst_format' = 'primary_key'); + +ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key'; + +INSERT INTO test_alt_format (h, j) VALUES (10, 0); + +ALTER TABLE test_alt_format ADD COLUMN k INTEGER; + +INSERT INTO test_alt_format (h, j) VALUES (11, 1); + +-- SQLNESS SORT_RESULT 3 1 +SELECT * FROM test_alt_format; + +-- SQLNESS SORT_RESULT 3 1 +SELECT i, h FROM test_alt_format; + +ALTER TABLE test_alt_format SET 'sst_format' = 'flat'; + +-- SQLNESS SORT_RESULT 3 1 +SELECT * FROM test_alt_format; + +INSERT INTO test_alt_format (h, j) VALUES (12, 2); + +INSERT INTO test_alt_format (h, j, i, k) VALUES (13, 3, 23, 33); + +-- SQLNESS SORT_RESULT 3 1 +SELECT * FROM test_alt_format; + +-- SQLNESS SORT_RESULT 3 1 +SELECT i, h FROM test_alt_format; + +ADMIN flush_table('test_alt_format'); + +-- SQLNESS SORT_RESULT 3 1 +SELECT * FROM test_alt_format; + +-- SQLNESS SORT_RESULT 3 1 +SELECT i, h FROM test_alt_format; + +-- not allow to change from flat to primary_key +-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED +ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key'; + +DROP TABLE test_alt_format; + +CREATE TABLE alt_format_phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "sst_format" = "primary_key"); + +CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "alt_format_phy"); + +INSERT INTO t1 (ts, val, host) VALUES + ('2022-01-01 00:00:00', 1.23, 'example.com'), + ('2022-01-01 00:00:00', 1.23, 'hello.com'), + ('2022-01-02 00:00:00', 4.56, 'example.com'); + +ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key'; + +ALTER TABLE t1 SET 'sst_format' = 'primary_key'; + +ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY; + +SELECT * FROM t1 ORDER BY ts ASC; + +ALTER TABLE alt_format_phy SET 'sst_format' = 'flat'; + +SELECT * FROM t1 ORDER BY ts ASC; + +SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC; + +INSERT INTO t1 (ts, val, host) VALUES + ('2022-01-01 00:00:01', 3.0, 'example.com'), + ('2022-01-01 00:00:01', 4.0, 'hello.com'); + +SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC; + +-- not allow to change from flat to primary_key +-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED +ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key'; + +DROP TABLE t1; + +DROP TABLE alt_format_phy;