feat: support altering sst format for a table (#7206)

* refactor: remove memtable_builder from MitoRegion

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: add alter format

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support changing the format and memtable

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: support changing sst format via table options

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: set scanner and memtable builder with correct format

Signed-off-by: evenyag <realevenyag@gmail.com>

* style: fix clippy

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: fix incorrect metadata in version after alter

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: add sqlness test

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: replace region_id in sqlness result

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: create correct memtable when setting sst_format explicitly

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: sqlness alter_format test set sst_format to primary_key

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: remove verbose log

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Yingwen
2025-11-11 21:19:00 +08:00
committed by GitHub
parent 49c6812e98
commit bb6a3a2ff3
19 changed files with 956 additions and 153 deletions

View File

@@ -959,8 +959,7 @@ impl EngineInner {
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
.with_ignore_fulltext_index(self.config.fulltext_index.apply_on_query.disabled())
.with_ignore_bloom_filter(self.config.bloom_filter_index.apply_on_query.disabled())
.with_start_time(query_start)
.with_flat_format(self.config.default_experimental_flat_format);
.with_start_time(query_start);
#[cfg(feature = "enterprise")]
let scan_region = self.maybe_fill_extension_range_provider(scan_region, region);

View File

@@ -38,6 +38,7 @@ use crate::config::MitoConfig;
use crate::engine::MitoEngine;
use crate::engine::listener::{AlterFlushListener, NotifyRegionChangeResultListener};
use crate::error;
use crate::sst::FormatType;
use crate::test_util::{
CreateRequestBuilder, TestEnv, build_rows, build_rows_for_key, flush_region, put_rows,
rows_schema,
@@ -902,13 +903,13 @@ async fn test_alter_region_ttl_options_with_format(flat_format: bool) {
#[tokio::test]
async fn test_write_stall_on_altering() {
common_telemetry::init_default_ut_logging();
test_write_stall_on_altering_with_format(false).await;
test_write_stall_on_altering_with_format(true).await;
}
async fn test_write_stall_on_altering_with_format(flat_format: bool) {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let listener = Arc::new(NotifyRegionChangeResultListener::default());
let engine = env
@@ -980,3 +981,247 @@ async fn test_write_stall_on_altering_with_format(flat_format: bool) {
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_alter_region_sst_format_with_flush() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: false,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
let table_dir = request.table_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Inserts some data before alter
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Flushes to create SST files with primary_key format
flush_region(&engine, region_id, None).await;
let expected_data = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_data, batches.pretty_print().unwrap());
// Alters sst_format from primary_key to flat
let alter_format_request = RegionAlterRequest {
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::Format("flat".to_string())],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(alter_format_request))
.await
.unwrap();
// Inserts more data after alter
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(3, 6),
};
put_rows(&engine, region_id, rows).await;
// Flushes to create SST files with flat format
flush_region(&engine, region_id, None).await;
let expected_all_data = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
// Reopens region to verify format persists
let engine = env
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: false,
..Default::default()
},
)
.await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
table_dir,
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_alter_region_sst_format_without_flush() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: false,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
let table_dir = request.table_dir.clone();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let check_format = |engine: &MitoEngine, expected: Option<FormatType>| {
let current_format = engine
.get_region(region_id)
.unwrap()
.version()
.options
.sst_format;
assert_eq!(current_format, expected);
};
check_format(&engine, Some(FormatType::PrimaryKey));
// Inserts some data before alter
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
// Alters sst_format from primary_key to flat
let alter_format_request = RegionAlterRequest {
kind: AlterKind::SetRegionOptions {
options: vec![SetRegionOption::Format("flat".to_string())],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(alter_format_request))
.await
.unwrap();
check_format(&engine, Some(FormatType::Flat));
// Inserts more data after alter
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(3, 6),
};
put_rows(&engine, region_id, rows).await;
let expected_all_data = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
| 3 | 3.0 | 1970-01-01T00:00:03 |
| 4 | 4.0 | 1970-01-01T00:00:04 |
| 5 | 5.0 | 1970-01-01T00:00:05 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
// Reopens region to verify format persists
let engine = env
.reopen_engine(
engine,
MitoConfig {
default_experimental_flat_format: false,
..Default::default()
},
)
.await;
engine
.handle_request(
region_id,
RegionRequest::Open(RegionOpenRequest {
engine: String::new(),
table_dir,
path_type: PathType::Bare,
options: HashMap::default(),
skip_wal_replay: false,
checkpoint: None,
}),
)
.await
.unwrap();
check_format(&engine, Some(FormatType::Flat));
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected_all_data, batches.pretty_print().unwrap());
}

View File

@@ -23,7 +23,7 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::region::options::MemtableOptions;
use crate::test_util::{
CreateRequestBuilder, TestEnv, build_rows, put_rows, reopen_region, rows_schema,
CreateRequestBuilder, TestEnv, build_rows, flush_region, put_rows, reopen_region, rows_schema,
};
#[tokio::test]
@@ -380,3 +380,77 @@ async fn create_with_partition_expr_persists_manifest_with_format(flat_format: b
let manifest = region.manifest_ctx.manifest().await;
assert_eq!(manifest.metadata.partition_expr.as_deref(), Some(expr_json));
}
#[tokio::test]
async fn test_engine_create_with_format() {
common_telemetry::init_default_ut_logging();
test_engine_create_with_format_one_case("primary_key", false).await;
test_engine_create_with_format_one_case("primary_key", true).await;
test_engine_create_with_format_one_case("flat", false).await;
test_engine_create_with_format_one_case("flat", true).await;
}
async fn test_engine_create_with_format_one_case(create_format: &str, default_flat_format: bool) {
common_telemetry::info!(
"Test engine create with format, create_format: {}, default_flat_format: {}",
create_format,
default_flat_format
);
let mut env = TestEnv::new().await;
let engine = env
.create_engine(MitoConfig {
default_experimental_flat_format: default_flat_format,
..Default::default()
})
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("sst_format", create_format)
.build();
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
let rows = Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 3),
};
put_rows(&engine, region_id, rows).await;
let expected = "\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 0 | 0.0 | 1970-01-01T00:00:00 |
| 1 | 1.0 | 1970-01-01T00:00:01 |
| 2 | 2.0 | 1970-01-01T00:00:02 |
+-------+---------+---------------------+";
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
flush_region(&engine, region_id, None).await;
let request = ScanRequest::default();
let stream = engine.scan_to_stream(region_id, request).await.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}

View File

@@ -422,6 +422,7 @@ impl MemtableBuilderProvider {
);
}
// The format is not flat.
match &options.memtable {
Some(MemtableOptions::TimeSeries) => Arc::new(TimeSeriesMemtableBuilder::new(
self.write_buffer_manager.clone(),
@@ -440,22 +441,15 @@ impl MemtableBuilderProvider {
self.write_buffer_manager.clone(),
))
}
None => self.default_memtable_builder(dedup, merge_mode),
None => self.default_primary_key_memtable_builder(dedup, merge_mode),
}
}
fn default_memtable_builder(&self, dedup: bool, merge_mode: MergeMode) -> MemtableBuilderRef {
if self.config.default_experimental_flat_format {
return Arc::new(
BulkMemtableBuilder::new(
self.write_buffer_manager.clone(),
!dedup, // append_mode: true if not dedup, false if dedup
merge_mode,
)
.with_compact_dispatcher(self.compact_dispatcher.clone()),
);
}
fn default_primary_key_memtable_builder(
&self,
dedup: bool,
merge_mode: MergeMode,
) -> MemtableBuilderRef {
match &self.config.memtable {
MemtableConfig::PartitionTree(config) => {
let mut config = config.clone();

View File

@@ -353,12 +353,13 @@ impl TimePartitions {
.builder
.build(inner.alloc_memtable_id(), &self.metadata);
debug!(
"Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}",
"Create time partition {:?} for region {}, duration: {:?}, memtable_id: {}, parts_total: {}, metadata: {:?}",
range,
self.metadata.region_id,
self.part_duration,
memtable.id(),
inner.parts.len() + 1
inner.parts.len() + 1,
self.metadata,
);
let pos = inner.parts.len();
inner.parts.push(TimePartition {
@@ -454,6 +455,11 @@ impl TimePartitions {
self.part_duration
}
/// Returns the memtable builder.
pub(crate) fn memtable_builder(&self) -> &MemtableBuilderRef {
&self.builder
}
/// Returns memory usage.
pub(crate) fn memory_usage(&self) -> usize {
let inner = self.inner.lock().unwrap();
@@ -488,12 +494,16 @@ impl TimePartitions {
/// Creates a new empty partition list from this list and a `part_duration`.
/// It falls back to the old partition duration if `part_duration` is `None`.
pub(crate) fn new_with_part_duration(&self, part_duration: Option<Duration>) -> Self {
pub(crate) fn new_with_part_duration(
&self,
part_duration: Option<Duration>,
memtable_builder: Option<MemtableBuilderRef>,
) -> Self {
debug_assert!(self.is_empty());
Self::new(
self.metadata.clone(),
self.builder.clone(),
memtable_builder.unwrap_or_else(|| self.builder.clone()),
self.next_memtable_id(),
Some(part_duration.unwrap_or(self.part_duration)),
)
@@ -941,17 +951,17 @@ mod tests {
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)));
let new_parts = partitions.new_with_part_duration(Some(Duration::from_secs(5)), None);
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
assert_eq!(0, new_parts.next_memtable_id());
// Won't update the duration if it's None.
let new_parts = new_parts.new_with_part_duration(None);
let new_parts = new_parts.new_with_part_duration(None, None);
assert_eq!(Duration::from_secs(5), new_parts.part_duration());
// Don't need to create new memtables.
assert_eq!(0, new_parts.next_memtable_id());
let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)));
let new_parts = new_parts.new_with_part_duration(Some(Duration::from_secs(10)), None);
assert_eq!(Duration::from_secs(10), new_parts.part_duration());
// Don't need to create new memtables.
assert_eq!(0, new_parts.next_memtable_id());
@@ -959,7 +969,7 @@ mod tests {
let builder = Arc::new(PartitionTreeMemtableBuilder::default());
let partitions = TimePartitions::new(metadata.clone(), builder.clone(), 0, None);
// Need to build a new memtable as duration is still None.
let new_parts = partitions.new_with_part_duration(None);
let new_parts = partitions.new_with_part_duration(None, None);
assert_eq!(INITIAL_TIME_WINDOW, new_parts.part_duration());
assert_eq!(0, new_parts.next_memtable_id());
}

View File

@@ -82,7 +82,7 @@ impl MemtableVersion {
}
// Update the time window.
let mutable = self.mutable.new_with_part_duration(time_window);
let mutable = self.mutable.new_with_part_duration(time_window, None);
common_telemetry::debug!(
"Freeze empty memtable, update partition duration from {:?} to {:?}",
self.mutable.part_duration(),

View File

@@ -60,6 +60,7 @@ use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
use crate::region::options::MergeMode;
use crate::region::version::VersionRef;
use crate::sst::FormatType;
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::{
BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
@@ -221,8 +222,6 @@ pub(crate) struct ScanRegion {
/// Whether to filter out the deleted rows.
/// Usually true for normal read, and false for scan for compaction.
filter_deleted: bool,
/// Whether to use flat format.
flat_format: bool,
#[cfg(feature = "enterprise")]
extension_range_provider: Option<BoxedExtensionRangeProvider>,
}
@@ -247,7 +246,6 @@ impl ScanRegion {
ignore_bloom_filter: false,
start_time: None,
filter_deleted: true,
flat_format: false,
#[cfg(feature = "enterprise")]
extension_range_provider: None,
}
@@ -304,13 +302,6 @@ impl ScanRegion {
self.filter_deleted = filter_deleted;
}
/// Sets whether to use flat format.
#[must_use]
pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
self.flat_format = flat_format;
self
}
#[cfg(feature = "enterprise")]
pub(crate) fn set_extension_range_provider(
&mut self,
@@ -385,18 +376,24 @@ impl ScanRegion {
self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
}
/// Returns true if the region use flat format.
fn use_flat_format(&self) -> bool {
self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
}
/// Creates a scan input.
async fn scan_input(mut self) -> Result<ScanInput> {
let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
let time_range = self.build_time_range_predicate();
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
let flat_format = self.use_flat_format();
// The mapper always computes projected column ids as the schema of SSTs may change.
let mapper = match &self.request.projection {
Some(p) => {
ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)?
ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
}
None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?,
None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
};
let ssts = &self.version.ssts;
@@ -463,13 +460,14 @@ impl ScanRegion {
let region_id = self.region_id();
debug!(
"Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
"Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
region_id,
self.request,
time_range,
mem_range_builders.len(),
files.len(),
self.version.options.append_mode,
flat_format,
);
let (non_field_filters, field_filters) = self.partition_by_field_filters();
@@ -487,7 +485,7 @@ impl ScanRegion {
];
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
if self.flat_format {
if flat_format {
// The batch is already large enough so we use a small channel size here.
self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
}
@@ -509,7 +507,7 @@ impl ScanRegion {
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution)
.with_flat_format(self.flat_format);
.with_flat_format(flat_format);
#[cfg(feature = "enterprise")]
let input = if let Some(provider) = self.extension_range_provider {

View File

@@ -47,10 +47,8 @@ use crate::manifest::action::{
RegionChange, RegionManifest, RegionMetaAction, RegionMetaActionList,
};
use crate::manifest::manager::RegionManifestManager;
use crate::memtable::MemtableBuilderRef;
use crate::region::version::{VersionControlRef, VersionRef};
use crate::request::{OnFailure, OptionOutputTx};
use crate::sst::FormatType;
use crate::sst::file_purger::FilePurgerRef;
use crate::sst::location::{index_file_path, sst_file_path};
use crate::time_provider::TimeProviderRef;
@@ -140,10 +138,6 @@ pub struct MitoRegion {
pub(crate) topic_latest_entry_id: AtomicU64,
/// The total bytes written to the region.
pub(crate) written_bytes: Arc<AtomicU64>,
/// Memtable builder for the region.
pub(crate) memtable_builder: MemtableBuilderRef,
/// Format type of the SST file.
pub(crate) sst_format: FormatType,
/// manifest stats
stats: ManifestStats,
}
@@ -200,11 +194,6 @@ impl MitoRegion {
self.last_compaction_millis.load(Ordering::Relaxed)
}
/// Returns format type of the SST file.
pub(crate) fn sst_format(&self) -> FormatType {
self.sst_format
}
/// Update compaction time to current time.
pub(crate) fn update_compaction_millis(&self) {
let now = self.time_provider.current_time_millis();
@@ -460,11 +449,12 @@ impl MitoRegion {
if self.state() == RegionRoleState::Leader(RegionLeaderState::Writable) {
// Persist backfilled metadata if manifest is missing fields (e.g., partition_expr)
let manifest_meta = &manager.manifest().metadata;
let current_meta = &self.version().metadata;
let current_version = self.version();
let current_meta = &current_version.metadata;
if manifest_meta.partition_expr.is_none() && current_meta.partition_expr.is_some() {
let action = RegionMetaAction::Change(RegionChange {
metadata: current_meta.clone(),
sst_format: self.sst_format(),
sst_format: current_version.options.sst_format.unwrap_or_default(),
});
let result = manager
.update(
@@ -1220,7 +1210,6 @@ mod tests {
use crate::sst::FormatType;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::test_util::memtable_util::EmptyMemtableBuilder;
use crate::test_util::scheduler_util::SchedulerEnv;
use crate::test_util::version_util::VersionControlBuilder;
use crate::time_provider::StdTimeProvider;
@@ -1391,8 +1380,6 @@ mod tests {
time_provider: Arc::new(StdTimeProvider),
topic_latest_entry_id: Default::default(),
written_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder: Arc::new(EmptyMemtableBuilder::default()),
sst_format: FormatType::PrimaryKey,
stats: ManifestStats::default(),
};

View File

@@ -53,7 +53,7 @@ use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, Rem
use crate::manifest::storage::manifest_compress_type;
use crate::memtable::MemtableBuilderProvider;
use crate::memtable::bulk::part::BulkPart;
use crate::memtable::time_partition::TimePartitions;
use crate::memtable::time_partition::{TimePartitions, TimePartitionsRef};
use crate::metrics::{CACHE_FILL_DOWNLOADED_FILES, CACHE_FILL_PENDING_FILES};
use crate::region::options::RegionOptions;
use crate::region::version::{VersionBuilder, VersionControl, VersionControlRef};
@@ -65,7 +65,7 @@ use crate::request::OptionOutputTx;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::FormatType;
use crate::sst::file::RegionFileId;
use crate::sst::file_purger::create_local_file_purger;
use crate::sst::file_purger::{FilePurgerRef, create_local_file_purger};
use crate::sst::file_ref::FileReferenceManagerRef;
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
@@ -255,17 +255,19 @@ impl RegionOpener {
}
}
// Safety: must be set before calling this method.
let options = self.options.take().unwrap();
let mut options = self.options.take().unwrap();
let object_store = get_object_store(&options.storage, &self.object_store_manager)?;
let provider = self.provider::<S>(&options.wal_options)?;
let metadata = Arc::new(metadata);
// Set the sst_format based on options or flat_format flag
// Sets the sst_format based on options or flat_format flag
let sst_format = if let Some(format) = options.sst_format {
format
} else if config.default_experimental_flat_format {
options.sst_format = Some(FormatType::Flat);
FormatType::Flat
} else {
// Default to PrimaryKeyParquet for newly created regions
options.sst_format = Some(FormatType::PrimaryKey);
FormatType::PrimaryKey
};
// Create a manifest manager for this region and writes regions to the manifest file.
@@ -293,7 +295,10 @@ impl RegionOpener {
part_duration,
));
debug!("Create region {} with options: {:?}", region_id, options);
debug!(
"Create region {} with options: {:?}, default_flat_format: {}",
region_id, options, config.default_experimental_flat_format
);
let version = VersionBuilder::new(metadata, mutable)
.options(options)
@@ -328,9 +333,7 @@ impl RegionOpener {
last_compaction_millis: AtomicI64::new(now),
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(0),
memtable_builder,
written_bytes: Arc::new(AtomicU64::new(0)),
sst_format,
stats: self.stats,
}))
}
@@ -402,7 +405,7 @@ impl RegionOpener {
wal: &Wal<S>,
) -> Result<Option<MitoRegionRef>> {
let now = Instant::now();
let region_options = self.options.as_ref().unwrap().clone();
let mut region_options = self.options.as_ref().unwrap().clone();
let region_manifest_options = Self::manifest_options(
config,
@@ -432,6 +435,8 @@ impl RegionOpener {
} else {
manifest.metadata.clone()
};
// Updates the region options with the manifest.
sanitize_region_options(&manifest, &mut region_options);
let region_id = self.region_id;
let provider = self.provider::<S>(&region_options.wal_options)?;
@@ -460,6 +465,7 @@ impl RegionOpener {
self.cache_manager.clone(),
self.file_ref_manager.clone(),
);
// We should sanitize the region options before creating a new memtable.
let memtable_builder = self
.memtable_builder_provider
.builder_for_options(&region_options);
@@ -476,14 +482,16 @@ impl RegionOpener {
0,
part_duration,
));
let version = VersionBuilder::new(metadata, mutable)
.add_files(file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.compaction_time_window(manifest.compaction_time_window)
.options(region_options)
.build();
// Updates region options by manifest before creating version.
let version_builder = version_builder_from_manifest(
&manifest,
metadata,
file_purger.clone(),
mutable,
region_options,
);
let version = version_builder.build();
let flushed_entry_id = version.flushed_entry_id;
let version_control = Arc::new(VersionControl::new(version));
@@ -545,8 +553,6 @@ impl RegionOpener {
}
let now = self.time_provider.current_time_millis();
// Read sst_format from manifest
let sst_format = manifest.sst_format;
let region = MitoRegion {
region_id: self.region_id,
@@ -564,8 +570,6 @@ impl RegionOpener {
time_provider: self.time_provider.clone(),
topic_latest_entry_id: AtomicU64::new(topic_latest_entry_id),
written_bytes: Arc::new(AtomicU64::new(0)),
memtable_builder,
sst_format,
stats: self.stats.clone(),
};
@@ -599,6 +603,37 @@ impl RegionOpener {
}
}
/// Creates a version builder from a region manifest.
pub(crate) fn version_builder_from_manifest(
manifest: &RegionManifest,
metadata: RegionMetadataRef,
file_purger: FilePurgerRef,
mutable: TimePartitionsRef,
region_options: RegionOptions,
) -> VersionBuilder {
VersionBuilder::new(metadata, mutable)
.add_files(file_purger, manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.compaction_time_window(manifest.compaction_time_window)
.options(region_options)
}
/// Updates region options by persistent options.
pub(crate) fn sanitize_region_options(manifest: &RegionManifest, options: &mut RegionOptions) {
let option_format = options.sst_format.unwrap_or_default();
if option_format != manifest.sst_format {
common_telemetry::warn!(
"Overriding SST format from {:?} to {:?} for region {}",
option_format,
manifest.sst_format,
manifest.metadata.region_id,
);
options.sst_format = Some(manifest.sst_format);
}
}
/// Returns an object store corresponding to `name`. If `name` is `None`, this method returns the default object store.
pub fn get_object_store(
name: &Option<String>,

View File

@@ -161,13 +161,14 @@ impl VersionControl {
}
/// Mark all opened files as deleted and set the delete marker in [VersionControlData]
pub(crate) fn mark_dropped(&self, memtable_builder: &MemtableBuilderRef) {
pub(crate) fn mark_dropped(&self) {
let version = self.current().version;
let part_duration = Some(version.memtables.mutable.part_duration());
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let memtable_builder = version.memtables.mutable.memtable_builder().clone();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder.clone(),
memtable_builder,
next_memtable_id,
part_duration,
));
@@ -185,13 +186,14 @@ impl VersionControl {
///
/// It replaces existing mutable memtable with a memtable that uses the
/// new schema. Memtables of the version must be empty.
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef, builder: &MemtableBuilderRef) {
pub(crate) fn alter_schema(&self, metadata: RegionMetadataRef) {
let version = self.current().version;
let part_duration = Some(version.memtables.mutable.part_duration());
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let memtable_builder = version.memtables.mutable.memtable_builder().clone();
let new_mutable = Arc::new(TimePartitions::new(
metadata.clone(),
builder.clone(),
memtable_builder,
next_memtable_id,
part_duration,
));
@@ -208,19 +210,50 @@ impl VersionControl {
version_data.version = new_version;
}
/// Truncate current version.
pub(crate) fn truncate(
/// Alter schema and format of the region.
///
/// It replaces existing mutable memtable with a memtable that uses the
/// new format. Memtables of the version must be empty.
pub(crate) fn alter_schema_and_format(
&self,
truncate_kind: TruncateKind,
memtable_builder: &MemtableBuilderRef,
metadata: RegionMetadataRef,
options: RegionOptions,
memtable_builder: MemtableBuilderRef,
) {
let version = self.current().version;
let part_duration = Some(version.memtables.mutable.part_duration());
let next_memtable_id = version.memtables.mutable.next_memtable_id();
// Use the new metadata to build `TimePartitions`.
let new_mutable = Arc::new(TimePartitions::new(
metadata.clone(),
memtable_builder,
next_memtable_id,
part_duration,
));
debug_assert!(version.memtables.mutable.is_empty());
debug_assert!(version.memtables.immutables().is_empty());
let new_version = Arc::new(
VersionBuilder::from_version(version)
.metadata(metadata)
.options(options)
.memtables(MemtableVersion::new(new_mutable))
.build(),
);
let mut version_data = self.data.write().unwrap();
version_data.version = new_version;
}
/// Truncate current version.
pub(crate) fn truncate(&self, truncate_kind: TruncateKind) {
let version = self.current().version;
let part_duration = version.memtables.mutable.part_duration();
let next_memtable_id = version.memtables.mutable.next_memtable_id();
let memtable_builder = version.memtables.mutable.memtable_builder().clone();
let new_mutable = Arc::new(TimePartitions::new(
version.metadata.clone(),
memtable_builder.clone(),
memtable_builder,
next_memtable_id,
Some(part_duration),
));
@@ -230,7 +263,9 @@ impl VersionControl {
truncated_sequence,
} => {
let new_version = Arc::new(
VersionBuilder::new(version.metadata.clone(), new_mutable)
VersionBuilder::from_version(version)
.memtables(MemtableVersion::new(new_mutable))
.clear_files()
.flushed_entry_id(truncated_entry_id)
.flushed_sequence(truncated_sequence)
.truncated_entry_id(Some(truncated_entry_id))
@@ -456,6 +491,12 @@ impl VersionBuilder {
self
}
/// Clear all files in the builder.
pub(crate) fn clear_files(mut self) -> Self {
self.ssts = Arc::new(SstVersion::new());
self
}
/// Builds a new [Version] from the builder.
/// It overwrites the window size by compaction option.
pub(crate) fn build(self) -> Version {

View File

@@ -50,6 +50,7 @@ use crate::manifest::action::{RegionEdit, TruncateKind};
use crate::memtable::MemtableId;
use crate::memtable::bulk::part::BulkPart;
use crate::metrics::COMPACTION_ELAPSED_TOTAL;
use crate::region::options::RegionOptions;
use crate::sst::file::FileMeta;
use crate::sst::index::IndexBuildType;
use crate::wal::EntryId;
@@ -948,6 +949,8 @@ pub(crate) struct RegionChangeResult {
pub(crate) result: Result<()>,
/// Used for index build in schema change.
pub(crate) need_index: bool,
/// New options for the region.
pub(crate) new_options: Option<RegionOptions>,
}
/// Request to edit a region directly.

View File

@@ -21,7 +21,7 @@ use common_base::readable_size::ReadableSize;
use common_telemetry::info;
use common_telemetry::tracing::warn;
use humantime_serde::re::humantime;
use snafu::ResultExt;
use snafu::{ResultExt, ensure};
use store_api::metadata::{
InvalidSetRegionOptionRequestSnafu, MetadataError, RegionMetadata, RegionMetadataBuilder,
RegionMetadataRef,
@@ -35,9 +35,10 @@ use crate::flush::FlushReason;
use crate::manifest::action::RegionChange;
use crate::region::MitoRegionRef;
use crate::region::options::CompactionOptions::Twcs;
use crate::region::options::TwcsOptions;
use crate::region::options::{RegionOptions, TwcsOptions};
use crate::region::version::VersionRef;
use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
use crate::sst::FormatType;
use crate::worker::RegionWorkerLoop;
impl<S> RegionWorkerLoop<S> {
@@ -57,36 +58,38 @@ impl<S> RegionWorkerLoop<S> {
info!("Try to alter region: {}, request: {:?}", region_id, request);
// Get the version before alter.
let version = region.version();
// Gets the version before alter.
let mut version = region.version();
// fast path for memory state changes like options.
match request.kind {
AlterKind::SetRegionOptions { options } => {
match self.handle_alter_region_options(region, version, options) {
Ok(_) => sender.send(Ok(0)),
Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
}
return;
}
let set_options = match &request.kind {
AlterKind::SetRegionOptions { options } => options.clone(),
AlterKind::UnsetRegionOptions { keys } => {
// Converts the keys to SetRegionOption.
//
// It passes an empty string to achieve the purpose of unset
match self.handle_alter_region_options(
region,
version,
keys.iter().map(Into::into).collect(),
) {
Ok(_) => sender.send(Ok(0)),
Err(e) => sender.send(Err(e).context(InvalidMetadataSnafu)),
}
return;
keys.iter().map(Into::into).collect()
}
_ => Vec::new(),
};
if !set_options.is_empty() {
match self.handle_alter_region_options_fast(&region, version, set_options) {
Ok(new_version) => {
let Some(new_version) = new_version else {
// We don't have options to alter after flush.
sender.send(Ok(0));
return;
};
version = new_version;
}
Err(e) => {
sender.send(Err(e).context(InvalidMetadataSnafu));
return;
}
}
_ => {}
}
// Validate request.
// Validates request.
if let Err(e) = request.validate(&version.metadata) {
// Invalid request.
sender.send(Err(e).context(InvalidRegionRequestSnafu));
@@ -132,14 +135,15 @@ impl<S> RegionWorkerLoop<S> {
}
info!(
"Try to alter region {}, version.metadata: {:?}, request: {:?}",
region_id, version.metadata, request,
"Try to alter region {}, version.metadata: {:?}, version.options: {:?}, request: {:?}",
region_id, version.metadata, version.options, request,
);
self.handle_alter_region_metadata(region, version, request, sender);
self.handle_alter_region_with_empty_memtable(region, version, request, sender);
}
/// Handles region metadata changes.
fn handle_alter_region_metadata(
// TODO(yingwen): Optional new options and sst format.
/// Handles region metadata and format changes when the region memtable is empty.
fn handle_alter_region_with_empty_memtable(
&mut self,
region: MitoRegionRef,
version: VersionRef,
@@ -147,6 +151,7 @@ impl<S> RegionWorkerLoop<S> {
sender: OptionOutputTx,
) {
let need_index = need_change_index(&request.kind);
let new_options = new_region_options_on_empty_memtable(&version.options, &request.kind);
let new_meta = match metadata_after_alteration(&version.metadata, request) {
Ok(new_meta) => new_meta,
Err(e) => {
@@ -157,19 +162,30 @@ impl<S> RegionWorkerLoop<S> {
// Persist the metadata to region's manifest.
let change = RegionChange {
metadata: new_meta,
sst_format: region.sst_format(),
sst_format: new_options
.as_ref()
.unwrap_or(&version.options)
.sst_format
.unwrap_or_default(),
};
self.handle_manifest_region_change(region, change, need_index, sender);
self.handle_manifest_region_change(region, change, need_index, new_options, sender);
}
/// Handles requests that changes region options, like TTL. It only affects memory state
/// since changes are persisted in the `DatanodeTableValue` in metasrv.
fn handle_alter_region_options(
///
/// If the options require empty memtable, it only does validation.
///
/// Returns a new version with the updated options if it needs further alteration.
fn handle_alter_region_options_fast(
&mut self,
region: MitoRegionRef,
region: &MitoRegionRef,
version: VersionRef,
options: Vec<SetRegionOption>,
) -> std::result::Result<(), MetadataError> {
) -> std::result::Result<Option<VersionRef>, MetadataError> {
assert!(!options.is_empty());
let mut all_options_altered = true;
let mut current_options = version.options.clone();
for option in options {
match option {
@@ -190,13 +206,68 @@ impl<S> RegionWorkerLoop<S> {
region.region_id,
)?;
}
SetRegionOption::Format(format_str) => {
let new_format = format_str.parse::<FormatType>().map_err(|_| {
store_api::metadata::InvalidRegionRequestSnafu {
region_id: region.region_id,
err: format!("Invalid format type: {}", format_str),
}
.build()
})?;
// If the format is unchanged, we also consider the option is altered.
if new_format != current_options.sst_format.unwrap_or_default() {
all_options_altered = false;
// Validates the format type.
ensure!(
new_format == FormatType::Flat,
store_api::metadata::InvalidRegionRequestSnafu {
region_id: region.region_id,
err: "Only allow changing format type to flat",
}
);
}
}
}
}
region.version_control.alter_options(current_options);
Ok(())
if all_options_altered {
Ok(None)
} else {
Ok(Some(region.version()))
}
}
}
/// Returns the new region options if there are updates to the options.
fn new_region_options_on_empty_memtable(
current_options: &RegionOptions,
kind: &AlterKind,
) -> Option<RegionOptions> {
let AlterKind::SetRegionOptions { options } = kind else {
return None;
};
if options.is_empty() {
return None;
}
let mut current_options = current_options.clone();
for option in options {
match option {
SetRegionOption::Ttl(_) | SetRegionOption::Twsc(_, _) => (),
SetRegionOption::Format(format_str) => {
// Safety: handle_alter_region_options_fast() has validated this.
let new_format = format_str.parse::<FormatType>().unwrap();
assert_eq!(FormatType::Flat, new_format);
current_options.sst_format = Some(new_format);
}
}
}
Some(current_options)
}
/// Creates a metadata after applying the alter `request` to the old `metadata`.
///
/// Returns an error if the `request` is invalid.

View File

@@ -89,9 +89,7 @@ where
.await;
// Marks region version as dropped
region
.version_control
.mark_dropped(&region.memtable_builder);
region.version_control.mark_dropped();
info!(
"Region {} is dropped logically, but some files are not deleted yet",
region_id

View File

@@ -22,6 +22,7 @@ use std::sync::Arc;
use common_telemetry::{info, warn};
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::RegionId;
use crate::cache::CacheManagerRef;
@@ -31,8 +32,11 @@ use crate::error::{RegionBusySnafu, RegionNotFoundSnafu, Result};
use crate::manifest::action::{
RegionChange, RegionEdit, RegionMetaAction, RegionMetaActionList, RegionTruncate,
};
use crate::memtable::MemtableBuilderProvider;
use crate::metrics::WRITE_CACHE_INFLIGHT_DOWNLOAD;
use crate::region::version::VersionBuilder;
use crate::region::opener::{sanitize_region_options, version_builder_from_manifest};
use crate::region::options::RegionOptions;
use crate::region::version::VersionControlRef;
use crate::region::{MitoRegionRef, RegionLeaderState, RegionRoleState};
use crate::request::{
BackgroundNotify, BuildIndexRequest, OptionOutputTx, RegionChangeResult, RegionEditRequest,
@@ -102,15 +106,12 @@ impl<S: LogStore> RegionWorkerLoop<S> {
};
if change_result.result.is_ok() {
// Apply the metadata to region's version.
region
.version_control
.alter_schema(change_result.new_meta, &region.memtable_builder);
let version = region.version();
info!(
"Region {} is altered, metadata is {:?}, options: {:?}",
region.region_id, version.metadata, version.options,
// Updates the region metadata and format.
Self::update_region_version(
&region.version_control,
change_result.new_meta,
change_result.new_options,
&self.memtable_builder_provider,
);
}
@@ -164,6 +165,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
}
};
let version = region.version();
let mut region_options = version.options.clone();
let old_format = region_options.sst_format.unwrap_or_default();
// Updates the region options with the manifest.
sanitize_region_options(&manifest, &mut region_options);
if !version.memtables.is_empty() {
let current = region.version_control.current();
warn!(
@@ -171,23 +176,35 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region.region_id, manifest.manifest_version, current.last_entry_id
);
}
let region_options = version.options.clone();
// We should sanitize the region options before creating a new memtable.
let memtable_builder = if old_format != region_options.sst_format.unwrap_or_default() {
// Format changed, also needs to replace the memtable builder.
Some(
self.memtable_builder_provider
.builder_for_options(&region_options),
)
} else {
None
};
let new_mutable = Arc::new(
region
.version()
.memtables
.mutable
.new_with_part_duration(version.compaction_time_window),
.new_with_part_duration(version.compaction_time_window, memtable_builder),
);
// Here it assumes the leader has backfilled the partition_expr of the metadata.
let metadata = manifest.metadata.clone();
let version = VersionBuilder::new(metadata, new_mutable)
.add_files(region.file_purger.clone(), manifest.files.values().cloned())
.flushed_entry_id(manifest.flushed_entry_id)
.flushed_sequence(manifest.flushed_sequence)
.truncated_entry_id(manifest.truncated_entry_id)
.compaction_time_window(manifest.compaction_time_window)
.options(region_options)
.build();
let version_builder = version_builder_from_manifest(
&manifest,
metadata,
region.file_purger.clone(),
new_mutable,
region_options,
);
let version = version_builder.build();
region.version_control.overwrite_current(Arc::new(version));
let updated = manifest.manifest_version > original_manifest_version;
@@ -364,6 +381,7 @@ impl<S> RegionWorkerLoop<S> {
region: MitoRegionRef,
change: RegionChange,
need_index: bool,
new_options: Option<RegionOptions>,
sender: OptionOutputTx,
) {
// Marks the region as altering.
@@ -391,6 +409,7 @@ impl<S> RegionWorkerLoop<S> {
result,
new_meta,
need_index,
new_options,
}),
};
listener
@@ -408,6 +427,32 @@ impl<S> RegionWorkerLoop<S> {
}
});
}
fn update_region_version(
version_control: &VersionControlRef,
new_meta: RegionMetadataRef,
new_options: Option<RegionOptions>,
memtable_builder_provider: &MemtableBuilderProvider,
) {
let options_changed = new_options.is_some();
let region_id = new_meta.region_id;
if let Some(new_options) = new_options {
// Needs to update the region with new format and memtables.
// Creates a new memtable builder for the new options as it may change the memtable type.
let new_memtable_builder = memtable_builder_provider.builder_for_options(&new_options);
version_control.alter_schema_and_format(new_meta, new_options, new_memtable_builder);
} else {
// Only changes the schema.
version_control.alter_schema(new_meta);
}
let version_data = version_control.current();
let version = version_data.version;
info!(
"Region {} is altered, metadata is {:?}, options: {:?}, options_changed: {}",
region_id, version.metadata, version.options, options_changed,
);
}
}
/// Checks the edit, writes and applies it.

View File

@@ -129,7 +129,7 @@ impl<S: LogStore> RegionWorkerLoop<S> {
// Applies the truncate action to the region.
region
.version_control
.truncate(truncate_result.kind.clone(), &region.memtable_builder);
.truncate(truncate_result.kind.clone());
}
Err(e) => {
// Unable to truncate the region.

View File

@@ -51,7 +51,7 @@ use crate::metadata::{
use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use crate::metrics;
use crate::mito_engine_options::{
TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
};
use crate::path_utils::table_dir;
use crate::storage::{ColumnId, RegionId, ScanRequest};
@@ -881,11 +881,7 @@ impl AlterKind {
AlterKind::ModifyColumnTypes { columns } => columns
.iter()
.any(|col_to_change| col_to_change.need_alter(metadata)),
AlterKind::SetRegionOptions { .. } => {
// we need to update region options for `ChangeTableOptions`.
// todo: we need to check if ttl has ever changed.
true
}
AlterKind::SetRegionOptions { .. } => true,
AlterKind::UnsetRegionOptions { .. } => true,
AlterKind::SetIndexes { options, .. } => options
.iter()
@@ -1258,6 +1254,8 @@ pub enum SetRegionOption {
Ttl(Option<TimeToLive>),
// Modifying TwscOptions with values as (option name, new value).
Twsc(String, String),
// Modifying the SST format.
Format(String),
}
impl TryFrom<&PbOption> for SetRegionOption {
@@ -1275,6 +1273,7 @@ impl TryFrom<&PbOption> for SetRegionOption {
TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
Ok(Self::Twsc(key.clone(), value.clone()))
}
SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
_ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
}
}

View File

@@ -29,7 +29,7 @@ use derive_builder::Builder;
use serde::{Deserialize, Deserializer, Serialize};
use snafu::{OptionExt, ResultExt, ensure};
use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS};
use store_api::mito_engine_options::{COMPACTION_TYPE, COMPACTION_TYPE_TWCS, SST_FORMAT_KEY};
use store_api::region_request::{SetRegionOption, UnsetRegionOption};
use store_api::storage::{ColumnDescriptor, ColumnDescriptorBuilder, ColumnId, RegionId};
@@ -292,6 +292,11 @@ impl TableMeta {
new_options.extra_options.remove(key.as_str());
}
}
SetRegionOption::Format(value) => {
new_options
.extra_options
.insert(SST_FORMAT_KEY.to_string(), value.clone());
}
}
}
let mut builder = self.new_meta_builder();

View File

@@ -0,0 +1,218 @@
CREATE TABLE test_alt_format(h INTEGER, i INTEGER DEFAULT 0, j TIMESTAMP TIME INDEX, PRIMARY KEY (h)) WITH ('sst_format' = 'primary_key');
Affected Rows: 0
ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key';
Affected Rows: 0
INSERT INTO test_alt_format (h, j) VALUES (10, 0);
Affected Rows: 1
ALTER TABLE test_alt_format ADD COLUMN k INTEGER;
Affected Rows: 0
INSERT INTO test_alt_format (h, j) VALUES (11, 1);
Affected Rows: 1
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
+----+---+-------------------------+---+
| h | i | j | k |
+----+---+-------------------------+---+
| 10 | 0 | 1970-01-01T00:00:00 | |
| 11 | 0 | 1970-01-01T00:00:00.001 | |
+----+---+-------------------------+---+
-- SQLNESS SORT_RESULT 3 1
SELECT i, h FROM test_alt_format;
+---+----+
| i | h |
+---+----+
| 0 | 10 |
| 0 | 11 |
+---+----+
ALTER TABLE test_alt_format SET 'sst_format' = 'flat';
Affected Rows: 0
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
+----+---+-------------------------+---+
| h | i | j | k |
+----+---+-------------------------+---+
| 10 | 0 | 1970-01-01T00:00:00 | |
| 11 | 0 | 1970-01-01T00:00:00.001 | |
+----+---+-------------------------+---+
INSERT INTO test_alt_format (h, j) VALUES (12, 2);
Affected Rows: 1
INSERT INTO test_alt_format (h, j, i, k) VALUES (13, 3, 23, 33);
Affected Rows: 1
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
+----+----+-------------------------+----+
| h | i | j | k |
+----+----+-------------------------+----+
| 10 | 0 | 1970-01-01T00:00:00 | |
| 11 | 0 | 1970-01-01T00:00:00.001 | |
| 12 | 0 | 1970-01-01T00:00:00.002 | |
| 13 | 23 | 1970-01-01T00:00:00.003 | 33 |
+----+----+-------------------------+----+
-- SQLNESS SORT_RESULT 3 1
SELECT i, h FROM test_alt_format;
+----+----+
| i | h |
+----+----+
| 0 | 10 |
| 0 | 11 |
| 0 | 12 |
| 23 | 13 |
+----+----+
ADMIN flush_table('test_alt_format');
+--------------------------------------+
| ADMIN flush_table('test_alt_format') |
+--------------------------------------+
| 0 |
+--------------------------------------+
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
+----+----+-------------------------+----+
| h | i | j | k |
+----+----+-------------------------+----+
| 10 | 0 | 1970-01-01T00:00:00 | |
| 11 | 0 | 1970-01-01T00:00:00.001 | |
| 12 | 0 | 1970-01-01T00:00:00.002 | |
| 13 | 23 | 1970-01-01T00:00:00.003 | 33 |
+----+----+-------------------------+----+
-- SQLNESS SORT_RESULT 3 1
SELECT i, h FROM test_alt_format;
+----+----+
| i | h |
+----+----+
| 0 | 10 |
| 0 | 11 |
| 0 | 12 |
| 23 | 13 |
+----+----+
-- not allow to change from flat to primary_key
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key';
Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing format type to flat
DROP TABLE test_alt_format;
Affected Rows: 0
CREATE TABLE alt_format_phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "sst_format" = "primary_key");
Affected Rows: 0
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "alt_format_phy");
Affected Rows: 0
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:00', 1.23, 'example.com'),
('2022-01-01 00:00:00', 1.23, 'hello.com'),
('2022-01-02 00:00:00', 4.56, 'example.com');
Affected Rows: 3
ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key';
Affected Rows: 0
ALTER TABLE t1 SET 'sst_format' = 'primary_key';
Error: 1004(InvalidArguments), Alter logical tables invalid arguments: Only support add columns operation
ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY;
Affected Rows: 0
SELECT * FROM t1 ORDER BY ts ASC;
+-------------+---+---------------------+------+
| host | k | ts | val |
+-------------+---+---------------------+------+
| example.com | | 2022-01-01T00:00:00 | 1.23 |
| hello.com | | 2022-01-01T00:00:00 | 1.23 |
| example.com | | 2022-01-02T00:00:00 | 4.56 |
+-------------+---+---------------------+------+
ALTER TABLE alt_format_phy SET 'sst_format' = 'flat';
Affected Rows: 0
SELECT * FROM t1 ORDER BY ts ASC;
+-------------+---+---------------------+------+
| host | k | ts | val |
+-------------+---+---------------------+------+
| example.com | | 2022-01-01T00:00:00 | 1.23 |
| hello.com | | 2022-01-01T00:00:00 | 1.23 |
| example.com | | 2022-01-02T00:00:00 | 4.56 |
+-------------+---+---------------------+------+
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
+-------------+---------------------+------+
| host | ts | val |
+-------------+---------------------+------+
| example.com | 2022-01-01T00:00:00 | 1.23 |
| example.com | 2022-01-02T00:00:00 | 4.56 |
+-------------+---------------------+------+
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:01', 3.0, 'example.com'),
('2022-01-01 00:00:01', 4.0, 'hello.com');
Affected Rows: 2
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
+-------------+---------------------+------+
| host | ts | val |
+-------------+---------------------+------+
| example.com | 2022-01-01T00:00:00 | 1.23 |
| example.com | 2022-01-01T00:00:01 | 3.0 |
| example.com | 2022-01-02T00:00:00 | 4.56 |
+-------------+---------------------+------+
-- not allow to change from flat to primary_key
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key';
Error: 1004(InvalidArguments), Invalid region request, region_id: REDACTED, err: Only allow changing format type to flat
DROP TABLE t1;
Affected Rows: 0
DROP TABLE alt_format_phy;
Affected Rows: 0

View File

@@ -0,0 +1,81 @@
CREATE TABLE test_alt_format(h INTEGER, i INTEGER DEFAULT 0, j TIMESTAMP TIME INDEX, PRIMARY KEY (h)) WITH ('sst_format' = 'primary_key');
ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key';
INSERT INTO test_alt_format (h, j) VALUES (10, 0);
ALTER TABLE test_alt_format ADD COLUMN k INTEGER;
INSERT INTO test_alt_format (h, j) VALUES (11, 1);
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
-- SQLNESS SORT_RESULT 3 1
SELECT i, h FROM test_alt_format;
ALTER TABLE test_alt_format SET 'sst_format' = 'flat';
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
INSERT INTO test_alt_format (h, j) VALUES (12, 2);
INSERT INTO test_alt_format (h, j, i, k) VALUES (13, 3, 23, 33);
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
-- SQLNESS SORT_RESULT 3 1
SELECT i, h FROM test_alt_format;
ADMIN flush_table('test_alt_format');
-- SQLNESS SORT_RESULT 3 1
SELECT * FROM test_alt_format;
-- SQLNESS SORT_RESULT 3 1
SELECT i, h FROM test_alt_format;
-- not allow to change from flat to primary_key
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
ALTER TABLE test_alt_format SET 'sst_format' = 'primary_key';
DROP TABLE test_alt_format;
CREATE TABLE alt_format_phy (ts timestamp time index, val double) engine=metric with ("physical_metric_table" = "", "sst_format" = "primary_key");
CREATE TABLE t1 (ts timestamp time index, val double, host string primary key) engine = metric with ("on_physical_table" = "alt_format_phy");
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:00', 1.23, 'example.com'),
('2022-01-01 00:00:00', 1.23, 'hello.com'),
('2022-01-02 00:00:00', 4.56, 'example.com');
ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key';
ALTER TABLE t1 SET 'sst_format' = 'primary_key';
ALTER TABLE t1 ADD COLUMN k STRING PRIMARY KEY;
SELECT * FROM t1 ORDER BY ts ASC;
ALTER TABLE alt_format_phy SET 'sst_format' = 'flat';
SELECT * FROM t1 ORDER BY ts ASC;
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
INSERT INTO t1 (ts, val, host) VALUES
('2022-01-01 00:00:01', 3.0, 'example.com'),
('2022-01-01 00:00:01', 4.0, 'hello.com');
SELECT host, ts, val FROM t1 where host = 'example.com' ORDER BY ts ASC;
-- not allow to change from flat to primary_key
-- SQLNESS REPLACE \d+\(\d+,\s+\d+\) REDACTED
ALTER TABLE alt_format_phy SET 'sst_format' = 'primary_key';
DROP TABLE t1;
DROP TABLE alt_format_phy;