Files
greptimedb/tests-integration/tests/repartition.rs
Yingwen 7840aa1bb4 refactor(mito2)!: remove PartitionTreeMemtable (#8080)
* feat: switch partition tree to bulk

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: keep partition tree memtable for migration test

Restore PartitionTreeMemtable construction when memtable.type=partition_tree
is explicit, and move the sparse-encoding bulk override into the default
(no explicit memtable.type) arm so phase 2's memtable.type=bulk wins on
reopen. Rewrite test_reopen_time_series_sparse_memtable_with_bulk to use a
metric-engine-shaped schema and sparse-encoded rows with WriteHint::Sparse,
so the test actually exercises a PartitionTreeMemtable in phase 1 and
verifies WAL replay into the new BulkMemtable on reopen without flushing.

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: drop partition tree memtable from runtime

Re-apply the unconditional sparse-encoding override in
`MemtableBuilderProvider::builder_for_options` and route the
`MemtableOptions::PartitionTree` arm to `BulkMemtable` with a deprecation
warning. After this change, `PartitionTreeMemtableBuilder` is no longer
reachable from the engine runtime; benchmarks still reference the type.

Remove `test_reopen_time_series_sparse_memtable_with_bulk` and the
`put_sparse_rows` helper added in the previous commit — that test only
existed to validate the PartitionTree -> Bulk reopen migration and is
unnecessary now that the override is in place.

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor(mito2): move timestamp_array_to_i64_slice into read module

Relocate the timestamp_array_to_i64_slice helper from
memtable/partition_tree/data.rs to the read module so that the read
path no longer depends on the partition_tree internals. All call sites
(both inside and outside the partition_tree module) now import from
crate::read.

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor(mito2): use TimeSeriesMemtableBuilder in time_partition tests

The time_partition tests use the memtable builder purely as a generic
backend for the TimePartitions write/scan paths; nothing in them is
specific to the partition-tree memtable. Switch the seven affected
tests to TimeSeriesMemtableBuilder so the tests no longer depend on
PartitionTreeMemtableBuilder.

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore(mito2): delete PartitionTreeMemtable implementation

The runtime already falls back to BulkMemtable for the PartitionTree
variant. Drop the now-unreachable implementation, its metrics, the
partition_tree benchmarks, the metric-engine Unsupported fallback in
bulk_insert.rs, and the test helpers that only existed for the deleted
module.

MemtableOptions::PartitionTree, its parsing, the runtime fallback, the
store-api MEMTABLE_PARTITION_TREE_* constants, and the SQL fixtures
remain so existing region options keep round-tripping.

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor(mito-codec): drop skip_partition_column parameter

PartitionTreeMemtable was the only caller passing
skip_partition_column=true; every other caller passes false. Now that
the partition_tree module is gone, the parameter is uniformly false
and the guard branch is dead. Drop the parameter from the trait method
and both impls, remove the guard and the is_partition_column helper,
and update the four remaining call sites in mito2 plus the bench.

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore(mito2): remove unused MemtableConfig enum

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: fmt code

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: remove unused variant

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update test_config_api

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: remove unused memtable test helpers

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: address review comment

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: support bulk memtable options

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: sanitize config

Signed-off-by: evenyag <realevenyag@gmail.com>

* feat: remove partition tree options from region options

Move primary_key_encoding to the top level

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: make ssts test datetime replaced text stable

Signed-off-by: evenyag <realevenyag@gmail.com>

* test: update sqlness result

Signed-off-by: evenyag <realevenyag@gmail.com>

* chore: validate_enum_options consider bulk memtable

Signed-off-by: evenyag <realevenyag@gmail.com>

* refactor: pass region id when parsing region options

Replace the `TryFrom<&HashMap>` impl for `RegionOptions` with
`try_from_options(region_id, options_map)` so the legacy partition_tree
fallback can log the affected region. The fallback now also overrides
the SST format to flat in addition to clearing the memtable type.

Signed-off-by: evenyag <realevenyag@gmail.com>

* fix: align sst_format with bulk memtable on parse and open

Signed-off-by: evenyag <realevenyag@gmail.com>

---------

Signed-off-by: evenyag <realevenyag@gmail.com>
2026-05-15 11:49:27 +00:00

890 lines
34 KiB
Rust

// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::table_name::TableNameKey;
use common_procedure::{ProcedureWithId, watcher};
use common_query::Output;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::DatanodeWalConfig;
use frontend::instance::Instance;
use meta_srv::gc::{self, BatchGcProcedure, GcSchedulerOptions, GcTickerRef};
use meta_srv::metasrv::Metasrv;
use mito2::gc::GcConfig;
use servers::error::Result as ServerResult;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::RegionId;
use tests_integration::cluster::GreptimeDbClusterBuilder;
use tests_integration::test_util::{StorageType, get_test_store_config};
use tokio::sync::oneshot;
#[macro_export]
macro_rules! repartition_tests {
($($service:ident),*) => {
$(
paste::item! {
mod [<integration_repartition_ $service:lower _test>] {
#[tokio::test(flavor = "multi_thread")]
async fn [< test_repartition_mito >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::init_default_ut_logging();
// Cover both storage formats for repartition behavior.
// for flat format
$crate::repartition::test_repartition_mito(store_type, true).await;
// for primary key format
$crate::repartition::test_repartition_mito(store_type, false).await;
}
}
#[tokio::test(flavor = "multi_thread")]
async fn [< test_repartition_metric >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
use store_api::codec::PrimaryKeyEncoding;
common_telemetry::init_default_ut_logging();
// Exercise format + primary key encoding matrix for metric engine.
// for flat format with sparse primary key encoding
$crate::repartition::test_repartition_metric(store_type, true, PrimaryKeyEncoding::Sparse).await;
// for flat format with dense primary key encoding
$crate::repartition::test_repartition_metric(store_type, true, PrimaryKeyEncoding::Dense).await;
// for primary key format with sparse primary key encoding
$crate::repartition::test_repartition_metric(store_type, false, PrimaryKeyEncoding::Sparse).await;
// for primary key format with dense primary key encoding
$crate::repartition::test_repartition_metric(store_type, false, PrimaryKeyEncoding::Dense).await;
}
}
}
}
)*
};
}
async fn trigger_table_gc(metasrv: &Arc<Metasrv>, table_name: &str) {
info!("triggering table gc for table: {}", table_name);
let table_metadata_manager = metasrv.table_metadata_manager();
let table_id = table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
table_name,
))
.await
.unwrap()
.unwrap()
.table_id();
let (_, table_route_value) = table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.unwrap();
let region_ids = table_route_value
.region_routes
.iter()
.map(|r| r.region.id)
.collect::<Vec<_>>();
let procedure = BatchGcProcedure::new(
metasrv.mailbox().clone(),
metasrv.table_metadata_manager().clone(),
metasrv.options().grpc.server_addr.clone(),
region_ids.clone(),
false, // full_file_listing
Duration::from_secs(10), // timeout
Default::default(),
);
// Submit the procedure to the procedure manager
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = metasrv
.procedure_manager()
.submit(procedure_with_id)
.await
.unwrap();
watcher::wait(&mut watcher).await.unwrap();
}
async fn trigger_full_gc(ticker: &GcTickerRef) {
info!("triggering full gc");
let (tx, rx) = oneshot::channel();
ticker
.sender
.send(gc::Event::Manually {
sender: tx,
region_ids: None,
full_file_listing: None,
timeout: None,
})
.await
.unwrap();
let _ = rx.await.unwrap().unwrap();
}
fn query_partitions_sql(table_name: &str) -> String {
// We query information_schema.partitions to assert repartition results across engines,
// rather than relying on SHOW CREATE TABLE formatting differences.
format!(
"\
SELECT table_catalog, table_schema, table_name, partition_name, partition_expression, \
partition_description, greptime_partition_id, partition_ordinal_position \
FROM information_schema.partitions \
WHERE table_name = '{}' \
ORDER BY partition_ordinal_position;",
table_name
)
}
pub async fn test_repartition_mito(store_type: StorageType, flat_format: bool) {
info!(
"test_repartition_mito: store_type: {:?}, flat_format: {:?}",
store_type, flat_format
);
let cluster_name = "test_repartition_mito";
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 3u64;
let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await;
if matches!(store_type, StorageType::File) {
let home_dir = create_temp_dir("test_repartition_mito_data_home");
builder = builder.with_shared_home_dir(Arc::new(home_dir));
}
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_datanode_wal_config(DatanodeWalConfig::Noop)
.with_metasrv_gc_config(GcSchedulerOptions {
enable: true,
gc_cooldown_period: Duration::from_nanos(1),
..Default::default()
})
.with_datanode_gc_config(GcConfig {
enable: true,
lingering_time: Some(Duration::from_secs(0)),
unknown_file_lingering_time: Duration::from_secs(0),
..Default::default()
})
.build(true)
.await;
let metasrv = &cluster.metasrv;
let ticker = metasrv.gc_ticker().unwrap();
let query_ctx = QueryContext::arc();
let instance = cluster.fe_instance();
// 1. Setup: Create a table with partitions (format varies by test case)
let sql = if flat_format {
r#"
CREATE TABLE `repartition_mito_table`(
`id` INT,
`city` STRING,
`ts` TIMESTAMP TIME INDEX,
PRIMARY KEY(`id`, `city`)
) PARTITION ON COLUMNS (`id`) (
`id` < 10,
`id` >= 10 AND `id` < 20,
`id` >= 20
) ENGINE = mito
WITH (
'sst_format' = 'flat'
);
"#
} else {
r#"
CREATE TABLE `repartition_mito_table`(
`id` INT,
`city` STRING,
`ts` TIMESTAMP TIME INDEX,
PRIMARY KEY(`id`, `city`)
) PARTITION ON COLUMNS (`id`) (
`id` < 10,
`id` >= 10 AND `id` < 20,
`id` >= 20
) ENGINE = mito;
"#
};
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
INSERT INTO `repartition_mito_table` VALUES
(1, 'New York', '2022-01-01 00:00:00'),
(5, 'London', '2022-01-01 00:00:00'),
(10, 'Paris', '2022-01-01 00:00:00'),
(15, 'Tokyo', '2022-01-01 00:00:00'),
(20, 'Beijing', '2022-01-01 00:00:00'),
(25, 'Shanghai', '2022-01-01 00:00:00');
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+----+----------+---------------------+
| id | city | ts |
+----+----------+---------------------+
| 1 | New York | 2022-01-01T00:00:00 |
| 5 | London | 2022-01-01T00:00:00 |
| 10 | Paris | 2022-01-01T00:00:00 |
| 15 | Tokyo | 2022-01-01T00:00:00 |
| 20 | Beijing | 2022-01-01T00:00:00 |
| 25 | Shanghai | 2022-01-01T00:00:00 |
+----+----------+---------------------+";
check_output_stream(result.data, expected).await;
// 2. Split Partition
let sql = r#"
ALTER TABLE `repartition_mito_table` SPLIT PARTITION (
`id` < 10
) INTO (
`id` < 5,
`id` >= 5 AND `id` < 10
);
"#;
let _result = run_sql(instance, sql, query_ctx.clone()).await.unwrap();
// Wait for cache invalidation
tokio::time::sleep(Duration::from_millis(500)).await;
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+----+----------+---------------------+
| id | city | ts |
+----+----------+---------------------+
| 1 | New York | 2022-01-01T00:00:00 |
| 5 | London | 2022-01-01T00:00:00 |
| 10 | Paris | 2022-01-01T00:00:00 |
| 15 | Tokyo | 2022-01-01T00:00:00 |
| 20 | Beijing | 2022-01-01T00:00:00 |
| 25 | Shanghai | 2022-01-01T00:00:00 |
+----+----------+---------------------+";
check_output_stream(result.data, expected).await;
trigger_table_gc(metasrv, "repartition_mito_table").await;
// Should be ok before compact.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
// It should be ok, if we try to compact the table after split partition.
let compact_sql = "ADMIN COMPACT_TABLE('repartition_mito_table', 'swcs', '3600')";
let _result = run_sql(instance, compact_sql, query_ctx.clone())
.await
.unwrap();
// Should be no change after compact.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
// Trigger GC to clean up the compacted files.
trigger_table_gc(metasrv, "repartition_mito_table").await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
let result = run_sql(
instance,
&query_partitions_sql("repartition_mito_table"),
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_split = r#"+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position |
+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| greptime | public | repartition_mito_table | p0 | id | id < 5 | 4398046511104 | 1 |
| greptime | public | repartition_mito_table | p1 | id | id >= 10 AND id < 20 | 4398046511105 | 2 |
| greptime | public | repartition_mito_table | p2 | id | id >= 20 | 4398046511106 | 3 |
| greptime | public | repartition_mito_table | p3 | id | id >= 5 AND id < 10 | 4398046511107 | 4 |
+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+"#;
check_output_stream(result.data, expected_create_table_after_split).await;
let sql =
r#"INSERT INTO `repartition_mito_table` VALUES (2, 'Split1', '2022-01-02 00:00:00');"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql =
r#"INSERT INTO `repartition_mito_table` VALUES (7, 'Split2', '2022-01-02 00:00:00');"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` WHERE `id` IN (2, 7) ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_split_inserts = "\
+----+--------+---------------------+
| id | city | ts |
+----+--------+---------------------+
| 2 | Split1 | 2022-01-02T00:00:00 |
| 7 | Split2 | 2022-01-02T00:00:00 |
+----+--------+---------------------+";
check_output_stream(result.data, expected_split_inserts).await;
// 3. Merge Partition
let sql = r#"
ALTER TABLE `repartition_mito_table` MERGE PARTITION (
`id` >= 10 AND `id` < 20,
`id` >= 20
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
// Wait for cache invalidation
tokio::time::sleep(Duration::from_millis(500)).await;
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_all = "\
+----+----------+---------------------+
| id | city | ts |
+----+----------+---------------------+
| 1 | New York | 2022-01-01T00:00:00 |
| 2 | Split1 | 2022-01-02T00:00:00 |
| 5 | London | 2022-01-01T00:00:00 |
| 7 | Split2 | 2022-01-02T00:00:00 |
| 10 | Paris | 2022-01-01T00:00:00 |
| 15 | Tokyo | 2022-01-01T00:00:00 |
| 20 | Beijing | 2022-01-01T00:00:00 |
| 25 | Shanghai | 2022-01-01T00:00:00 |
+----+----------+---------------------+";
check_output_stream(result.data, expected_all).await;
trigger_table_gc(metasrv, "repartition_mito_table").await;
// Trigger GC to clean up the compacted files.
trigger_full_gc(&ticker).await;
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected_all).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
// It should be ok, if we try to compact the table after merge partition.
let compact_sql = "ADMIN COMPACT_TABLE('repartition_mito_table', 'swcs', '3600')";
let _result = run_sql(instance, compact_sql, query_ctx.clone())
.await
.unwrap();
// Should be no change after compact.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected_all).await;
trigger_table_gc(metasrv, "repartition_mito_table").await;
trigger_full_gc(&ticker).await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected_all).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
let result = run_sql(
instance,
&query_partitions_sql("repartition_mito_table"),
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_merge = r#"+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position |
+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| greptime | public | repartition_mito_table | p0 | id | id < 5 | 4398046511104 | 1 |
| greptime | public | repartition_mito_table | p1 | id | id >= 10 | 4398046511105 | 2 |
| greptime | public | repartition_mito_table | p2 | id | id >= 5 AND id < 10 | 4398046511107 | 3 |
+---------------+--------------+------------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+"#;
check_output_stream(result.data, expected_create_table_after_merge).await;
let sql =
r#"INSERT INTO `repartition_mito_table` VALUES (12, 'Merge1', '2022-01-03 00:00:00');"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql =
r#"INSERT INTO `repartition_mito_table` VALUES (30, 'Merge2', '2022-01-03 00:00:00');"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` WHERE `id` IN (12, 30) ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_merge_inserts = "\
+----+--------+---------------------+
| id | city | ts |
+----+--------+---------------------+
| 12 | Merge1 | 2022-01-03T00:00:00 |
| 30 | Merge2 | 2022-01-03T00:00:00 |
+----+--------+---------------------+";
check_output_stream(result.data, expected_merge_inserts).await;
run_sql(
instance,
"DROP TABLE `repartition_mito_table`",
query_ctx.clone(),
)
.await
.unwrap();
}
pub async fn test_repartition_metric(
store_type: StorageType,
flat_format: bool,
primary_key_encoding: PrimaryKeyEncoding,
) {
info!(
"test_repartition_metric: store_type: {:?}, flat_format: {:?}, primary_key_encoding: {:?}",
store_type, flat_format, primary_key_encoding
);
let cluster_name = "test_repartition_metric";
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 3u64;
let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await;
if matches!(store_type, StorageType::File) {
let home_dir = create_temp_dir("test_repartition_metric_data_home");
builder = builder.with_shared_home_dir(Arc::new(home_dir));
}
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_datanode_wal_config(DatanodeWalConfig::Noop)
.with_metasrv_gc_config(GcSchedulerOptions {
enable: true,
gc_cooldown_period: Duration::from_nanos(1),
..Default::default()
})
.with_datanode_gc_config(GcConfig {
enable: true,
lingering_time: Some(Duration::from_secs(0)),
unknown_file_lingering_time: Duration::from_secs(0),
..Default::default()
})
.build(true)
.await;
let metasrv = &cluster.metasrv;
let ticker = metasrv.gc_ticker().unwrap();
let query_ctx = QueryContext::arc();
let instance = cluster.fe_instance();
// Explicitly configure sst_format and primary key encoding to cover the matrix.
let sst_format = if flat_format { "flat" } else { "primary_key" };
let primary_key_encoding = match primary_key_encoding {
PrimaryKeyEncoding::Dense => "dense",
PrimaryKeyEncoding::Sparse => "sparse",
};
let sql = format!(
r#"
CREATE TABLE `repart_phy_metric`(
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
`host` STRING PRIMARY KEY
) PARTITION ON COLUMNS (`host`) (
`host` < 'm',
`host` >= 'm'
) ENGINE = metric
WITH (
"physical_metric_table" = "",
'sst_format' = '{sst_format}',
"primary_key_encoding" = "{primary_key_encoding}",
"index.type" = "inverted",
);
"#
);
run_sql(instance, &sql, query_ctx.clone()).await.unwrap();
// A second logical table exercises repartition behavior across multiple logical tables
// sharing the same physical metric table.
let sql = r#"
CREATE TABLE `repart_log_metric`(
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
`host` STRING PRIMARY KEY
) ENGINE = metric WITH ("on_physical_table" = "repart_phy_metric");
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
CREATE TABLE `repart_log_metric_job`(
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
`job` STRING PRIMARY KEY
) ENGINE = metric WITH ("on_physical_table" = "repart_phy_metric");
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES
('a_host', '2022-01-01 00:00:00', 1),
('z_host', '2022-01-01 00:00:00', 2);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| a_host | 2022-01-01T00:00:00 | 1.0 |
| z_host | 2022-01-01T00:00:00 | 2.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
// Split physical table partition
let sql = r#"
ALTER TABLE `repart_phy_metric` SPLIT PARTITION (
`host` < 'm'
) INTO (
`host` < 'g',
`host` >= 'g' AND `host` < 'm'
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
// Wait for cache invalidation
tokio::time::sleep(Duration::from_millis(500)).await;
let result = run_sql(
instance,
&query_partitions_sql("repart_phy_metric"),
query_ctx.clone(),
)
.await
.unwrap();
// Partition ids and order are expected to be stable within a single test run.
let expected_create_table_after_split = r#"+---------------+--------------+-------------------+----------------+----------------------+------------------------+-----------------------+----------------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position |
+---------------+--------------+-------------------+----------------+----------------------+------------------------+-----------------------+----------------------------+
| greptime | public | repart_phy_metric | p0 | host | host < g | 4398046511104 | 1 |
| greptime | public | repart_phy_metric | p1 | host | host >= m | 4398046511105 | 2 |
| greptime | public | repart_phy_metric | p2 | host | host >= g AND host < m | 4398046511106 | 3 |
+---------------+--------------+-------------------+----------------+----------------------+------------------------+-----------------------+----------------------------+"#;
check_output_stream(result.data, expected_create_table_after_split).await;
let regions = cluster.list_all_regions().await;
let region0 = regions.get(&RegionId::new(1024, 0)).unwrap();
let region2 = regions.get(&RegionId::new(1024, 2)).unwrap();
let primary_keys_in_region_0 = region0
.metadata()
.primary_key_columns()
.cloned()
.collect::<Vec<_>>();
let primary_keys_in_region_2 = region2
.metadata()
.primary_key_columns()
.cloned()
.collect::<Vec<_>>();
info!("primary_keys_in_region_0: {:?}", primary_keys_in_region_0);
info!("primary_keys_in_region_2: {:?}", primary_keys_in_region_2);
assert_eq!(primary_keys_in_region_0, primary_keys_in_region_2);
let sql = r#"
ALTER TABLE `repart_log_metric_job` ADD COLUMN `cpu` STRING PRIMARY KEY;
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| a_host | 2022-01-01T00:00:00 | 1.0 |
| z_host | 2022-01-01T00:00:00 | 2.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
trigger_table_gc(metasrv, "repart_phy_metric").await;
// Should be ok before compact.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
// It should be ok, if we try to compact the table after split partition.
let compact_sql = "ADMIN COMPACT_TABLE('repart_phy_metric', 'swcs', '3600')";
let _result = run_sql(instance, compact_sql, query_ctx.clone())
.await
.unwrap();
// Should be no change after compact.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
// Trigger GC to clean up the compacted files.
trigger_table_gc(metasrv, "repart_phy_metric").await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
let sql = r#"INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('b_host', '2022-01-02 00:00:00', 3.0);"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('h_host', '2022-01-02 00:00:00', 4.0);"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` WHERE `host` IN ('b_host', 'h_host') ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| b_host | 2022-01-02T00:00:00 | 3.0 |
| h_host | 2022-01-02T00:00:00 | 4.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
let sql = r#"
ALTER TABLE `repart_phy_metric` MERGE PARTITION (
`host` < 'g',
`host` >= 'g' AND `host` < 'm'
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
// Wait for cache invalidation
tokio::time::sleep(Duration::from_millis(500)).await;
let result = run_sql(
instance,
&query_partitions_sql("repart_phy_metric"),
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_merge = r#"+---------------+--------------+-------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| table_catalog | table_schema | table_name | partition_name | partition_expression | partition_description | greptime_partition_id | partition_ordinal_position |
+---------------+--------------+-------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+
| greptime | public | repart_phy_metric | p0 | host | host < m | 4398046511104 | 1 |
| greptime | public | repart_phy_metric | p1 | host | host >= m | 4398046511105 | 2 |
+---------------+--------------+-------------------+----------------+----------------------+-----------------------+-----------------------+----------------------------+"#;
check_output_stream(result.data, expected_create_table_after_merge).await;
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| a_host | 2022-01-01T00:00:00 | 1.0 |
| b_host | 2022-01-02T00:00:00 | 3.0 |
| h_host | 2022-01-02T00:00:00 | 4.0 |
| z_host | 2022-01-01T00:00:00 | 2.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
trigger_table_gc(metasrv, "repart_phy_metric").await;
trigger_full_gc(&ticker).await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
// It should be ok, if we try to compact the table after merge partition.
let compact_sql = "ADMIN COMPACT_TABLE('repart_phy_metric', 'swcs', '3600')";
let _result = run_sql(instance, compact_sql, query_ctx.clone())
.await
.unwrap();
// Should be no change after compact.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
// Trigger GC to clean up the compacted files.
trigger_table_gc(metasrv, "repart_phy_metric").await;
trigger_full_gc(&ticker).await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
let sql = r#"INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('c_host', '2022-01-03 00:00:00', 5.0);"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` WHERE `host` = 'c_host'",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| c_host | 2022-01-03T00:00:00 | 5.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
run_sql(
instance,
"DROP TABLE `repart_log_metric_job`",
query_ctx.clone(),
)
.await
.unwrap();
run_sql(
instance,
"DROP TABLE `repart_log_metric`",
query_ctx.clone(),
)
.await
.unwrap();
run_sql(
instance,
"DROP TABLE `repart_phy_metric`",
query_ctx.clone(),
)
.await
.unwrap();
}
async fn run_sql(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> ServerResult<Output> {
info!("Run SQL: {sql}");
instance.do_query(sql, query_ctx).await.remove(0)
}