test: add integration tests for repartition (#7560)

* test: add integration tests for mito repartition

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: update test result

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add integration tests for metric repartition

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: correct results

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: enable tests for object store

Signed-off-by: WenyXu <wenymedia@gmail.com>

* test: add compaction and gc

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: fix unit test

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* more cases

Signed-off-by: WenyXu <wenymedia@gmail.com>

* fix: file ref also in repart mapping

Signed-off-by: discord9 <discord9@163.com>

* chore: apply suggestions

Signed-off-by: WenyXu <wenymedia@gmail.com>

* chore: set a longer timeout for mock metasrv channel manager

Signed-off-by: WenyXu <wenymedia@gmail.com>

---------

Signed-off-by: WenyXu <wenymedia@gmail.com>
Signed-off-by: discord9 <discord9@163.com>
Co-authored-by: discord9 <discord9@163.com>
This commit is contained in:
Weny Xu
2026-01-22 18:14:40 +08:00
committed by GitHub
parent 38e4a94956
commit 4fb61047cb
18 changed files with 1059 additions and 69 deletions

View File

@@ -305,18 +305,38 @@ impl TableRepartManager {
/// Updates mappings from src region to dst regions.
/// Should be called once repartition is done.
pub async fn update_mappings(&self, src: RegionId, dst: &[RegionId]) -> Result<()> {
let table_id = src.table_id();
pub async fn update_mappings(
&self,
table_id: TableId,
region_mapping: &HashMap<RegionId, Vec<RegionId>>,
) -> Result<()> {
// Get current table repart with raw bytes for CAS operation
let current_table_repart = self
.get_with_raw_bytes(table_id)
.await?
.context(crate::error::TableRepartNotFoundSnafu { table_id })?;
let Some(current_table_repart) = self.get_with_raw_bytes(table_id).await? else {
let mut new_table_repart_value = TableRepartValue::new();
for (src, dsts) in region_mapping.iter() {
new_table_repart_value.update_mappings(*src, dsts);
}
let (txn, _) = self.build_create_txn(table_id, &new_table_repart_value)?;
let result = self.kv_backend.txn(txn).await?;
ensure!(
result.succeeded,
crate::error::MetadataCorruptionSnafu {
err_msg: format!(
"Failed to create table repart for table {}: CAS operation failed",
table_id
),
}
);
return Ok(());
};
// Clone the current repart value and update mappings
let mut new_table_repart_value = current_table_repart.inner.clone();
new_table_repart_value.update_mappings(src, dst);
for (src, dsts) in region_mapping.iter() {
new_table_repart_value.update_mappings(*src, dsts);
}
// Execute atomic update
let (txn, _) =
@@ -339,9 +359,11 @@ impl TableRepartManager {
/// Removes mappings from src region to dst regions.
/// Should be called once files from src region are cleaned up in dst regions.
pub async fn remove_mappings(&self, src: RegionId, dsts: &[RegionId]) -> Result<()> {
let table_id = src.table_id();
pub async fn remove_mappings(
&self,
table_id: TableId,
region_mapping: &HashMap<RegionId, Vec<RegionId>>,
) -> Result<()> {
// Get current table repart with raw bytes for CAS operation
let current_table_repart = self
.get_with_raw_bytes(table_id)
@@ -350,7 +372,9 @@ impl TableRepartManager {
// Clone the current repart value and remove mappings
let mut new_table_repart_value = current_table_repart.inner.clone();
new_table_repart_value.remove_mappings(src, dsts);
for (src, dsts) in region_mapping.iter() {
new_table_repart_value.remove_mappings(*src, dsts);
}
// Execute atomic update
let (txn, _) =
@@ -743,7 +767,11 @@ mod tests {
// Update mappings
let src = RegionId::new(1024, 1);
let dst = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
manager.update_mappings(src, &dst).await.unwrap();
let region_mapping = HashMap::from([(src, dst)]);
manager
.update_mappings(1024, &region_mapping)
.await
.unwrap();
// Verify update
let retrieved = manager.get(1024).await.unwrap().unwrap();
@@ -776,7 +804,11 @@ mod tests {
// Remove some mappings
let to_remove = vec![RegionId::new(1024, 2), RegionId::new(1024, 3)];
manager.remove_mappings(src, &to_remove).await.unwrap();
let region_mapping = HashMap::from([(src, to_remove)]);
manager
.remove_mappings(1024, &region_mapping)
.await
.unwrap();
// Verify removal
let retrieved = manager.get(1024).await.unwrap().unwrap();
@@ -831,8 +863,9 @@ mod tests {
let src = RegionId::new(1024, 1);
let dst = vec![RegionId::new(1024, 2)];
// Try to update mappings on non-existent table
let result = manager.update_mappings(src, &dst).await;
// Try to remove mappings on non-existent table
let region_mapping = HashMap::from([(src, dst.clone())]);
let result = manager.remove_mappings(1024, &region_mapping).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
@@ -840,14 +873,12 @@ mod tests {
"{err_msg}"
);
// Try to remove mappings on non-existent table
let result = manager.remove_mappings(src, &dst).await;
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Failed to find table repartition metadata for table id 1024"),
"{err_msg}"
);
// Try to update mappings on non-existent table
let region_mapping = HashMap::from([(src, dst)]);
manager
.update_mappings(1024, &region_mapping)
.await
.unwrap();
}
#[tokio::test]

View File

@@ -31,7 +31,7 @@ mod util;
pub use options::GcSchedulerOptions;
pub use procedure::BatchGcProcedure;
pub(crate) use scheduler::{GcScheduler, GcTickerRef};
pub use scheduler::{Event, GcScheduler, GcTickerRef};
/// Mapping from region ID to its associated peers (leader and followers).
pub type Region2Peers = HashMap<RegionId, (Peer, Vec<Peer>)>;

View File

@@ -274,14 +274,40 @@ impl BatchGcProcedure {
/// Clean up region repartition info in kvbackend after GC
/// according to cross reference in `FileRefsManifest`.
async fn cleanup_region_repartition(&self) -> Result<()> {
for (src_region, dst_regions) in self.data.file_refs.cross_region_refs.iter() {
// TODO(discord9): batch update
let mut table_grouped: HashMap<TableId, HashMap<RegionId, HashSet<RegionId>>> =
HashMap::new();
for (src_region, dst_regions) in &self.data.file_refs.cross_region_refs {
table_grouped
.entry(src_region.table_id())
.or_default()
.entry(*src_region)
.or_default()
.extend(dst_regions.iter().copied());
}
// make sure for files without cross-region refs but with tmp refs, we DO NOT clean up repartition key entry
// so that dropped regions can still keep their region ids here
for src_region in self.data.file_refs.file_refs.keys() {
table_grouped
.entry(src_region.table_id())
.or_default()
.entry(*src_region)
.or_default();
}
for (table_id, region_mappings) in table_grouped {
let region_mapping = region_mappings
.iter()
.map(|(src_region, dst_regions)| {
(*src_region, dst_regions.iter().cloned().collect_vec())
})
.collect::<HashMap<RegionId, Vec<RegionId>>>();
self.table_metadata_manager
.table_repart_manager()
.update_mappings(*src_region, &dst_regions.iter().cloned().collect_vec())
.update_mappings(table_id, &region_mapping)
.await
.context(KvBackendSnafu)?;
}
Ok(())
}

View File

@@ -21,8 +21,8 @@ use common_meta::key::TableMetadataManagerRef;
use common_procedure::ProcedureManagerRef;
use common_telemetry::{error, info};
use store_api::storage::GcReport;
use tokio::sync::Mutex;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{Mutex, oneshot};
use crate::cluster::MetaPeerClientRef;
use crate::define_ticker;
@@ -43,12 +43,15 @@ pub struct GcJobReport {
///
/// Variants:
/// - `Tick`: This event is used to trigger gc periodically.
pub(crate) enum Event {
/// - `Manually`: This event is used to trigger a manual gc run and provides a channel
/// to send back the [`GcJobReport`] for that run.
pub enum Event {
Tick,
Manually(oneshot::Sender<GcJobReport>),
}
#[allow(unused)]
pub(crate) type GcTickerRef = Arc<GcTicker>;
pub type GcTickerRef = Arc<GcTicker>;
define_ticker!(
/// [GcTicker] is used to trigger gc periodically.
@@ -118,9 +121,21 @@ impl GcScheduler {
Event::Tick => {
info!("Received gc tick");
if let Err(e) = self.handle_tick().await {
error!("Failed to handle gc tick: {}", e);
error!(e; "Failed to handle gc tick");
}
}
Event::Manually(sender) => {
info!("Received manually gc request");
match self.handle_tick().await {
Ok(report) => {
// ignore error
let _ = sender.send(report);
}
Err(e) => {
error!(e; "Failed to handle gc tick");
}
};
}
}
}
}

View File

@@ -947,6 +947,10 @@ impl Metasrv {
self.started.clone()
}
pub fn gc_ticker(&self) -> Option<GcTickerRef> {
self.gc_ticker.as_ref().cloned()
}
#[inline]
pub fn new_ctx(&self) -> Context {
let server_addr = self.options().grpc.server_addr.clone();

View File

@@ -131,7 +131,8 @@ pub async fn mock(
});
let config = ChannelConfig::new()
.timeout(Some(Duration::from_secs(10)))
// Use an long timeout to prevent test failures due to slow operations (e.g., when testing with S3).
.timeout(Some(Duration::from_secs(60)))
.connect_timeout(Duration::from_secs(10))
.tcp_nodelay(true);
let channel_manager = ChannelManager::with_config(config, None);

View File

@@ -489,6 +489,25 @@ impl Context {
.context(error::TableMetadataManagerSnafu)
}
/// Updates the table repart mapping.
pub async fn update_table_repart_mapping(&self) -> Result<()> {
info!(
"Updating table repart mapping for table: {}, group_id: {}, region mapping: {:?}",
self.persistent_ctx.table_id,
self.persistent_ctx.group_id,
self.persistent_ctx.region_mapping
);
self.table_metadata_manager
.table_repart_manager()
.update_mappings(
self.persistent_ctx.table_id,
&self.persistent_ctx.region_mapping,
)
.await
.context(error::TableMetadataManagerSnafu)
}
/// Returns the next operation timeout.
///
/// If the next operation timeout is not set, it will return `None`.

View File

@@ -99,6 +99,8 @@ impl UpdateMetadata {
});
};
ctx.update_table_repart_mapping().await?;
Ok(())
}
}

View File

@@ -47,7 +47,7 @@ const RECENT_DURATION: Duration = Duration::from_secs(300);
///
/// Variants:
/// - `Tick`: This event is used to trigger region flush trigger periodically.
pub(crate) enum Event {
pub enum Event {
Tick,
}

View File

@@ -28,10 +28,10 @@ macro_rules! define_ticker {
event_value = $event_val:expr
) => {
$(#[$meta])*
pub(crate) struct $name {
pub(crate) tick_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
pub(crate) tick_interval: std::time::Duration,
pub(crate) sender: tokio::sync::mpsc::Sender<$event_ty>,
pub struct $name {
pub tick_handle: std::sync::Mutex<Option<tokio::task::JoinHandle<()>>>,
pub tick_interval: std::time::Duration,
pub sender: tokio::sync::mpsc::Sender<$event_ty>,
}
#[async_trait::async_trait]

View File

@@ -60,28 +60,18 @@ fn should_delete_file(
is_in_tmp_ref: bool,
is_linger: bool,
is_eligible_for_delete: bool,
entry: &Entry,
unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
is_region_dropped: bool,
_entry: &Entry,
_unknown_file_may_linger_until: chrono::DateTime<chrono::Utc>,
) -> bool {
let is_known = is_linger || is_eligible_for_delete;
let is_unknown_linger_time_exceeded = || {
// if the file's expel time is unknown(because not appear in delta manifest), we keep it for a while
// using it's last modified time
// notice unknown files use a different lingering time
entry
.metadata()
.last_modified()
.map(|t| t < unknown_file_may_linger_until)
.unwrap_or(false)
};
!is_in_manifest
&& !is_in_tmp_ref
&& if is_known {
is_eligible_for_delete
} else {
is_unknown_linger_time_exceeded()
!is_in_tmp_ref && is_region_dropped
}
}
@@ -433,6 +423,8 @@ impl LocalGcWorker {
Default::default()
};
let is_region_dropped = region.is_none();
let in_tmp_ref = tmp_ref_files
.iter()
.map(|file_ref| (file_ref.file_id, file_ref.index_version))
@@ -441,6 +433,7 @@ impl LocalGcWorker {
let deletable_files = self
.list_to_be_deleted_files(
region_id,
is_region_dropped,
&in_manifest,
&in_tmp_ref,
recently_removed_files,
@@ -679,8 +672,10 @@ impl LocalGcWorker {
Ok(all_entries)
}
#[allow(clippy::too_many_arguments)]
fn filter_deletable_files(
&self,
is_region_dropped: bool,
entries: Vec<Entry>,
in_manifest: &HashMap<FileId, Option<IndexVersion>>,
in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
@@ -741,6 +736,7 @@ impl LocalGcWorker {
is_in_tmp_ref,
is_linger,
is_eligible_for_delete,
is_region_dropped,
&entry,
unknown_file_may_linger_until,
)
@@ -769,6 +765,7 @@ impl LocalGcWorker {
is_in_tmp_ref,
is_linger,
is_eligible_for_delete,
is_region_dropped,
&entry,
unknown_file_may_linger_until,
)
@@ -804,6 +801,7 @@ impl LocalGcWorker {
pub async fn list_to_be_deleted_files(
&self,
region_id: RegionId,
is_region_dropped: bool,
in_manifest: &HashMap<FileId, Option<IndexVersion>>,
in_tmp_ref: &HashSet<(FileId, Option<IndexVersion>)>,
recently_removed_files: BTreeMap<Timestamp, HashSet<RemovedFile>>,
@@ -888,6 +886,7 @@ impl LocalGcWorker {
// Step 3: Filter files to determine which ones can be deleted
let all_unused_files_ready_for_delete = self.filter_deletable_files(
is_region_dropped,
all_entries,
in_manifest,
in_tmp_ref,

View File

@@ -328,6 +328,7 @@ impl RegionOpener {
)),
file_purger: create_file_purger(
config.gc.enable,
self.path_type,
self.purge_scheduler,
access_layer,
self.cache_manager,
@@ -465,6 +466,7 @@ impl RegionOpener {
));
let file_purger = create_file_purger(
config.gc.enable,
self.path_type,
self.purge_scheduler.clone(),
access_layer.clone(),
self.cache_manager.clone(),

View File

@@ -16,6 +16,7 @@ use std::fmt;
use std::sync::Arc;
use common_telemetry::error;
use store_api::region_request::PathType;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
@@ -67,8 +68,23 @@ impl fmt::Debug for LocalFilePurger {
}
}
pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
sst_layer.object_store().info().scheme() == object_store::Scheme::Fs
#[cfg(not(debug_assertions))]
/// Whether to enable GC for the file purger.
pub fn should_enable_gc(
global_gc_enabled: bool,
object_store_scheme: object_store::Scheme,
) -> bool {
global_gc_enabled && object_store_scheme != object_store::Scheme::Fs
}
#[cfg(debug_assertions)]
/// For debug build, we may use Fs as the object store scheme,
/// so we need to enable GC for local file system.
pub fn should_enable_gc(
global_gc_enabled: bool,
_object_store_scheme: object_store::Scheme,
) -> bool {
global_gc_enabled
}
/// Creates a file purger based on the storage type of the access layer.
@@ -82,12 +98,18 @@ pub fn is_local_fs(sst_layer: &AccessLayerRef) -> bool {
///
pub fn create_file_purger(
gc_enabled: bool,
path_type: PathType,
scheduler: SchedulerRef,
sst_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
file_ref_manager: FileReferenceManagerRef,
) -> FilePurgerRef {
if gc_enabled && !is_local_fs(&sst_layer) {
// Only enable GC for:
// - object store based storage
// - data or bare path type (metadata region doesn't need to be GCed)
if should_enable_gc(gc_enabled, sst_layer.object_store().info().scheme())
&& matches!(path_type, PathType::Data | PathType::Bare)
{
Arc::new(ObjectStoreFilePurger { file_ref_manager })
} else {
Arc::new(LocalFilePurger::new(scheduler, sst_layer, cache_manager))

View File

@@ -1432,6 +1432,7 @@ impl StatementExecutor {
.chain(into_partition_exprs.clone().into_iter())
.collect();
let new_partition_exprs_len = new_partition_exprs.len();
let from_partition_exprs_len = from_partition_exprs.len();
// Validate the new partition expressions using MultiDimPartitionRule and PartitionChecker.
let _ = MultiDimPartitionRule::try_new(
@@ -1445,14 +1446,6 @@ impl StatementExecutor {
)
.context(InvalidPartitionSnafu)?;
info!(
"Submitting repartition task for table {} (table_id={}), from {} to {} partitions",
table_ref,
table_id,
from_partition_exprs.len(),
new_partition_exprs_len
);
let ddl_options = parse_ddl_options(&request.options)?;
let serialize_exprs = |exprs: Vec<PartitionExpr>| -> Result<Vec<String>> {
let mut json_exprs = Vec::with_capacity(exprs.len());
@@ -1478,6 +1471,16 @@ impl StatementExecutor {
req.wait = ddl_options.wait;
req.timeout = ddl_options.timeout;
info!(
"Submitting repartition task for table {} (table_id={}), from {} to {} partitions, timeout: {:?}, wait: {}",
table_ref,
table_id,
from_partition_exprs_len,
new_partition_exprs_len,
ddl_options.timeout,
ddl_options.wait
);
let response = self
.procedure_executor
.submit_ddl_task(&ExecutorContext::default(), req)

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::env;
use std::net::TcpListener;
use std::ops::RangeInclusive;
@@ -51,6 +51,7 @@ use frontend::frontend::{Frontend, FrontendOptions};
use frontend::instance::Instance as FeInstance;
use frontend::instance::builder::FrontendBuilder;
use frontend::server::Services;
use futures::TryStreamExt;
use hyper_util::rt::TokioIo;
use meta_client::client::MetaClientBuilder;
use meta_srv::cluster::MetaPeerClientRef;
@@ -90,6 +91,53 @@ impl GreptimeDbCluster {
pub fn fe_instance(&self) -> &Arc<FeInstance> {
&self.frontend.instance
}
/// List all SST files from all datanodes.
pub async fn list_sst_files_from_all_datanodes(&self) -> BTreeSet<String> {
let mut sst_files = BTreeSet::new();
for datanode in self.datanode_instances.values() {
let region_server = datanode.region_server();
let mito = region_server.mito_engine().unwrap();
let all_files = mito
.all_ssts_from_storage()
.try_collect::<Vec<_>>()
.await
.unwrap()
.into_iter()
.map(|e| e.file_path)
.collect::<Vec<_>>();
sst_files.extend(all_files);
}
sst_files
}
/// List all SST files from the manifests of all datanodes.
pub async fn list_sst_files_from_manifests(&self) -> BTreeSet<String> {
let mut sst_files = BTreeSet::new();
for datanode in self.datanode_instances.values() {
let region_server = datanode.region_server();
let mito = region_server.mito_engine().unwrap();
let all_files = mito
.all_ssts_from_manifest()
.await
.into_iter()
.flat_map(|e| {
if e.index_file_path.is_some() {
vec![e.file_path, e.index_file_path.unwrap()]
} else {
vec![e.file_path]
}
})
.collect::<BTreeSet<_>>();
sst_files.extend(all_files);
}
sst_files
}
}
pub struct GreptimeDbClusterBuilder {

View File

@@ -13,12 +13,15 @@
// limitations under the License.
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;
use common_meta::key::TableMetadataManagerRef;
use common_procedure::ProcedureWithId;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::DatanodeWalConfig;
use futures::TryStreamExt as _;
use itertools::Itertools;
use meta_srv::gc::{BatchGcProcedure, GcSchedulerOptions, Region2Peers};
@@ -84,8 +87,13 @@ async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirG
let test_name = uuid::Uuid::new_v4().to_string();
let (store_config, guard) = get_test_store_config(store_type);
let builder = GreptimeDbClusterBuilder::new(&test_name)
.await
let mut builder = GreptimeDbClusterBuilder::new(&test_name).await;
if matches!(store_type, StorageType::File) {
let home_dir = create_temp_dir("test_gc_data_home");
builder = builder.with_shared_home_dir(Arc::new(home_dir));
}
let builder = builder
.with_metasrv_gc_config(GcSchedulerOptions {
enable: true,
..Default::default()
@@ -96,6 +104,7 @@ async fn distributed_with_gc(store_type: &StorageType) -> (TestContext, TempDirG
lingering_time: Some(Duration::ZERO),
..Default::default()
})
.with_datanode_wal_config(DatanodeWalConfig::Noop)
.with_store_config(store_config);
(
TestContext::new(MockInstanceBuilder::Distributed(builder)).await,
@@ -110,9 +119,6 @@ async fn test_gc_basic_different_store() {
let store_type = StorageType::build_storage_types_based_on_env();
info!("store type: {:?}", store_type);
for store in store_type {
if store == StorageType::File {
continue; // no point in test gc in fs storage
}
info!("Running GC test with storage type: {}", store);
test_gc_basic(&store).await;
}

View File

@@ -21,6 +21,8 @@ mod jsonbench;
mod sql;
#[macro_use]
mod region_migration;
#[macro_use]
mod repartition;
grpc_tests!(File, S3, S3WithCache, Oss, Azblob, Gcs);
@@ -30,4 +32,7 @@ sql_tests!(File);
region_migration_tests!(File);
repartition_tests!(File);
repartition_tests!(S3, S3WithCache, Oss, Azblob, Gcs);
// TODO(niebayes): add integration tests for remote wal.

View File

@@ -0,0 +1,807 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::time::Duration;
use client::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_meta::key::table_name::TableNameKey;
use common_procedure::{ProcedureWithId, watcher};
use common_query::Output;
use common_telemetry::info;
use common_test_util::recordbatch::check_output_stream;
use common_test_util::temp_dir::create_temp_dir;
use common_wal::config::DatanodeWalConfig;
use frontend::error::Result as FrontendResult;
use frontend::instance::Instance;
use meta_srv::gc::{self, BatchGcProcedure, GcSchedulerOptions, GcTickerRef};
use meta_srv::metasrv::Metasrv;
use mito2::gc::GcConfig;
use servers::query_handler::sql::SqlQueryHandler;
use session::context::{QueryContext, QueryContextRef};
use tests_integration::cluster::GreptimeDbClusterBuilder;
use tests_integration::test_util::{StorageType, get_test_store_config};
use tokio::sync::oneshot;
#[macro_export]
macro_rules! repartition_tests {
($($service:ident),*) => {
$(
paste::item! {
mod [<integration_repartition_ $service:lower _test>] {
#[tokio::test(flavor = "multi_thread")]
async fn [< test_repartition_mito >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::init_default_ut_logging();
$crate::repartition::test_repartition_mito(store_type).await
}
}
#[tokio::test(flavor = "multi_thread")]
async fn [< test_repartition_metric >]() {
let store_type = tests_integration::test_util::StorageType::$service;
if store_type.test_on() {
common_telemetry::init_default_ut_logging();
$crate::repartition::test_repartition_metric(store_type).await
}
}
}
}
)*
};
}
async fn trigger_table_gc(metasrv: &Arc<Metasrv>, table_name: &str) {
info!("triggering table gc for table: {}", table_name);
let table_metadata_manager = metasrv.table_metadata_manager();
let table_id = table_metadata_manager
.table_name_manager()
.get(TableNameKey::new(
DEFAULT_CATALOG_NAME,
DEFAULT_SCHEMA_NAME,
table_name,
))
.await
.unwrap()
.unwrap()
.table_id();
let (_, table_route_value) = table_metadata_manager
.table_route_manager()
.get_physical_table_route(table_id)
.await
.unwrap();
let region_ids = table_route_value
.region_routes
.iter()
.map(|r| r.region.id)
.collect::<Vec<_>>();
let procedure = BatchGcProcedure::new(
metasrv.mailbox().clone(),
metasrv.table_metadata_manager().clone(),
metasrv.options().grpc.server_addr.clone(),
region_ids.clone(),
false, // full_file_listing
Duration::from_secs(10), // timeout
Default::default(),
);
// Submit the procedure to the procedure manager
let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));
let mut watcher = metasrv
.procedure_manager()
.submit(procedure_with_id)
.await
.unwrap();
watcher::wait(&mut watcher).await.unwrap();
}
async fn trigger_full_gc(ticker: &GcTickerRef) {
info!("triggering full gc");
let (tx, rx) = oneshot::channel();
ticker.sender.send(gc::Event::Manually(tx)).await.unwrap();
let _ = rx.await.unwrap();
}
pub async fn test_repartition_mito(store_type: StorageType) {
let cluster_name = "test_repartition_mito";
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 3u64;
let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await;
if matches!(store_type, StorageType::File) {
let home_dir = create_temp_dir("test_repartition_mito_data_home");
builder = builder.with_shared_home_dir(Arc::new(home_dir));
}
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_datanode_wal_config(DatanodeWalConfig::Noop)
.with_metasrv_gc_config(GcSchedulerOptions {
enable: true,
gc_cooldown_period: Duration::from_nanos(1),
..Default::default()
})
.with_datanode_gc_config(GcConfig {
enable: true,
lingering_time: Some(Duration::from_secs(0)),
unknown_file_lingering_time: Duration::from_secs(0),
..Default::default()
})
.build(true)
.await;
let metasrv = &cluster.metasrv;
let ticker = metasrv.gc_ticker().unwrap();
let query_ctx = QueryContext::arc();
let instance = cluster.fe_instance();
// 1. Setup: Create a table with partitions
let sql = r#"
CREATE TABLE `repartition_mito_table`(
`id` INT,
`city` STRING,
`ts` TIMESTAMP TIME INDEX,
PRIMARY KEY(`id`, `city`)
) PARTITION ON COLUMNS (`id`) (
`id` < 10,
`id` >= 10 AND `id` < 20,
`id` >= 20
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
INSERT INTO `repartition_mito_table` VALUES
(1, 'New York', '2022-01-01 00:00:00'),
(5, 'London', '2022-01-01 00:00:00'),
(10, 'Paris', '2022-01-01 00:00:00'),
(15, 'Tokyo', '2022-01-01 00:00:00'),
(20, 'Beijing', '2022-01-01 00:00:00'),
(25, 'Shanghai', '2022-01-01 00:00:00');
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+----+----------+---------------------+
| id | city | ts |
+----+----------+---------------------+
| 1 | New York | 2022-01-01T00:00:00 |
| 5 | London | 2022-01-01T00:00:00 |
| 10 | Paris | 2022-01-01T00:00:00 |
| 15 | Tokyo | 2022-01-01T00:00:00 |
| 20 | Beijing | 2022-01-01T00:00:00 |
| 25 | Shanghai | 2022-01-01T00:00:00 |
+----+----------+---------------------+";
check_output_stream(result.data, expected).await;
// 2. Split Partition
let sql = r#"
ALTER TABLE `repartition_mito_table` SPLIT PARTITION (
`id` < 10
) INTO (
`id` < 5,
`id` >= 5 AND `id` < 10
);
"#;
let _result = run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+----+----------+---------------------+
| id | city | ts |
+----+----------+---------------------+
| 1 | New York | 2022-01-01T00:00:00 |
| 5 | London | 2022-01-01T00:00:00 |
| 10 | Paris | 2022-01-01T00:00:00 |
| 15 | Tokyo | 2022-01-01T00:00:00 |
| 20 | Beijing | 2022-01-01T00:00:00 |
| 25 | Shanghai | 2022-01-01T00:00:00 |
+----+----------+---------------------+";
check_output_stream(result.data, expected).await;
trigger_table_gc(metasrv, "repartition_mito_table").await;
// Should be ok before compact.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
// It should be ok, if we try to compact the table after split partition.
let compact_sql = "ADMIN COMPACT_TABLE('repartition_mito_table', 'swcs', '3600')";
let _result = run_sql(instance, compact_sql, query_ctx.clone())
.await
.unwrap();
// Should be no change after compact.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
// Trigger GC to clean up the compacted files.
trigger_table_gc(metasrv, "repartition_mito_table").await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
let result = run_sql(
instance,
"SHOW CREATE TABLE `repartition_mito_table`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_split = r#"+------------------------+-------------------------------------------------------+
| Table | Create Table |
+------------------------+-------------------------------------------------------+
| repartition_mito_table | CREATE TABLE IF NOT EXISTS "repartition_mito_table" ( |
| | "id" INT NULL, |
| | "city" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("id", "city") |
| | ) |
| | PARTITION ON COLUMNS ("id") ( |
| | id < 5, |
| | id >= 10 AND id < 20, |
| | id >= 20, |
| | id >= 5 AND id < 10 |
| | ) |
| | ENGINE=mito |
| | |
+------------------------+-------------------------------------------------------+"#;
check_output_stream(result.data, expected_create_table_after_split).await;
let sql =
r#"INSERT INTO `repartition_mito_table` VALUES (2, 'Split1', '2022-01-02 00:00:00');"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql =
r#"INSERT INTO `repartition_mito_table` VALUES (7, 'Split2', '2022-01-02 00:00:00');"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` WHERE `id` IN (2, 7) ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_split_inserts = "\
+----+--------+---------------------+
| id | city | ts |
+----+--------+---------------------+
| 2 | Split1 | 2022-01-02T00:00:00 |
| 7 | Split2 | 2022-01-02T00:00:00 |
+----+--------+---------------------+";
check_output_stream(result.data, expected_split_inserts).await;
// 3. Merge Partition
let sql = r#"
ALTER TABLE `repartition_mito_table` MERGE PARTITION (
`id` >= 10 AND `id` < 20,
`id` >= 20
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_all = "\
+----+----------+---------------------+
| id | city | ts |
+----+----------+---------------------+
| 1 | New York | 2022-01-01T00:00:00 |
| 2 | Split1 | 2022-01-02T00:00:00 |
| 5 | London | 2022-01-01T00:00:00 |
| 7 | Split2 | 2022-01-02T00:00:00 |
| 10 | Paris | 2022-01-01T00:00:00 |
| 15 | Tokyo | 2022-01-01T00:00:00 |
| 20 | Beijing | 2022-01-01T00:00:00 |
| 25 | Shanghai | 2022-01-01T00:00:00 |
+----+----------+---------------------+";
check_output_stream(result.data, expected_all).await;
trigger_table_gc(metasrv, "repartition_mito_table").await;
// Trigger GC to clean up the compacted files.
trigger_full_gc(&ticker).await;
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected_all).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
// It should be ok, if we try to compact the table after merge partition.
let compact_sql = "ADMIN COMPACT_TABLE('repartition_mito_table', 'swcs', '3600')";
let _result = run_sql(instance, compact_sql, query_ctx.clone())
.await
.unwrap();
// Should be no change after compact.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected_all).await;
trigger_table_gc(metasrv, "repartition_mito_table").await;
trigger_full_gc(&ticker).await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected_all).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
let result = run_sql(
instance,
"SHOW CREATE TABLE `repartition_mito_table`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_merge = r#"+------------------------+-------------------------------------------------------+
| Table | Create Table |
+------------------------+-------------------------------------------------------+
| repartition_mito_table | CREATE TABLE IF NOT EXISTS "repartition_mito_table" ( |
| | "id" INT NULL, |
| | "city" STRING NULL, |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("id", "city") |
| | ) |
| | PARTITION ON COLUMNS ("id") ( |
| | id < 5, |
| | id >= 10 AND id < 20 OR id >= 20, |
| | id >= 5 AND id < 10 |
| | ) |
| | ENGINE=mito |
| | |
+------------------------+-------------------------------------------------------+"#;
check_output_stream(result.data, expected_create_table_after_merge).await;
let sql =
r#"INSERT INTO `repartition_mito_table` VALUES (12, 'Merge1', '2022-01-03 00:00:00');"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql =
r#"INSERT INTO `repartition_mito_table` VALUES (30, 'Merge2', '2022-01-03 00:00:00');"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repartition_mito_table` WHERE `id` IN (12, 30) ORDER BY `id`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_merge_inserts = "\
+----+--------+---------------------+
| id | city | ts |
+----+--------+---------------------+
| 12 | Merge1 | 2022-01-03T00:00:00 |
| 30 | Merge2 | 2022-01-03T00:00:00 |
+----+--------+---------------------+";
check_output_stream(result.data, expected_merge_inserts).await;
run_sql(
instance,
"DROP TABLE `repartition_mito_table`",
query_ctx.clone(),
)
.await
.unwrap();
}
pub async fn test_repartition_metric(store_type: StorageType) {
let cluster_name = "test_repartition_metric";
let (store_config, _guard) = get_test_store_config(&store_type);
let datanodes = 3u64;
let mut builder = GreptimeDbClusterBuilder::new(cluster_name).await;
if matches!(store_type, StorageType::File) {
let home_dir = create_temp_dir("test_repartition_metric_data_home");
builder = builder.with_shared_home_dir(Arc::new(home_dir));
}
let cluster = builder
.with_datanodes(datanodes as u32)
.with_store_config(store_config)
.with_datanode_wal_config(DatanodeWalConfig::Noop)
.with_metasrv_gc_config(GcSchedulerOptions {
enable: true,
gc_cooldown_period: Duration::from_nanos(1),
..Default::default()
})
.with_datanode_gc_config(GcConfig {
enable: true,
lingering_time: Some(Duration::from_secs(0)),
unknown_file_lingering_time: Duration::from_secs(0),
..Default::default()
})
.build(true)
.await;
let metasrv = &cluster.metasrv;
let ticker = metasrv.gc_ticker().unwrap();
let query_ctx = QueryContext::arc();
let instance = cluster.fe_instance();
let sql = r#"
CREATE TABLE `repart_phy_metric`(
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
`host` STRING PRIMARY KEY
) PARTITION ON COLUMNS (`host`) (
`host` < 'm',
`host` >= 'm'
) ENGINE = metric WITH ("physical_metric_table" = "");
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
CREATE TABLE `repart_log_metric`(
`ts` TIMESTAMP TIME INDEX,
`val` DOUBLE,
`host` STRING PRIMARY KEY
) ENGINE = metric WITH ("on_physical_table" = "repart_phy_metric");
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"
INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES
('a_host', '2022-01-01 00:00:00', 1),
('z_host', '2022-01-01 00:00:00', 2);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| a_host | 2022-01-01T00:00:00 | 1.0 |
| z_host | 2022-01-01T00:00:00 | 2.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
// Split physical table partition
let sql = r#"
ALTER TABLE `repart_phy_metric` SPLIT PARTITION (
`host` < 'm'
) INTO (
`host` < 'g',
`host` >= 'g' AND `host` < 'm'
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SHOW CREATE TABLE `repart_phy_metric`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_split = r#"+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| repart_phy_metric | CREATE TABLE IF NOT EXISTS "repart_phy_metric" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "host" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | PARTITION ON COLUMNS ("host") ( |
| | host < 'g', |
| | host >= 'm', |
| | host >= 'g' AND host < 'm' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------------------+--------------------------------------------------+"#;
check_output_stream(result.data, expected_create_table_after_split).await;
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| a_host | 2022-01-01T00:00:00 | 1.0 |
| z_host | 2022-01-01T00:00:00 | 2.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
trigger_table_gc(metasrv, "repart_phy_metric").await;
// Should be ok before compact.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
// It should be ok, if we try to compact the table after split partition.
let compact_sql = "ADMIN COMPACT_TABLE('repart_phy_metric', 'swcs', '3600')";
let _result = run_sql(instance, compact_sql, query_ctx.clone())
.await
.unwrap();
// Should be no change after compact.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
// Trigger GC to clean up the compacted files.
trigger_table_gc(metasrv, "repart_phy_metric").await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
let sql = r#"INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('b_host', '2022-01-02 00:00:00', 3.0);"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let sql = r#"INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('h_host', '2022-01-02 00:00:00', 4.0);"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` WHERE `host` IN ('b_host', 'h_host') ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| b_host | 2022-01-02T00:00:00 | 3.0 |
| h_host | 2022-01-02T00:00:00 | 4.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
let sql = r#"
ALTER TABLE `repart_phy_metric` MERGE PARTITION (
`host` < 'g',
`host` >= 'g' AND `host` < 'm'
);
"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SHOW CREATE TABLE `repart_phy_metric`",
query_ctx.clone(),
)
.await
.unwrap();
let expected_create_table_after_merge = r#"+-------------------+--------------------------------------------------+
| Table | Create Table |
+-------------------+--------------------------------------------------+
| repart_phy_metric | CREATE TABLE IF NOT EXISTS "repart_phy_metric" ( |
| | "ts" TIMESTAMP(3) NOT NULL, |
| | "val" DOUBLE NULL, |
| | "host" STRING NULL, |
| | TIME INDEX ("ts"), |
| | PRIMARY KEY ("host") |
| | ) |
| | PARTITION ON COLUMNS ("host") ( |
| | host < 'g' OR host >= 'g' AND host < 'm', |
| | host >= 'm' |
| | ) |
| | ENGINE=metric |
| | WITH( |
| | physical_metric_table = '' |
| | ) |
+-------------------+--------------------------------------------------+"#;
check_output_stream(result.data, expected_create_table_after_merge).await;
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| a_host | 2022-01-01T00:00:00 | 1.0 |
| b_host | 2022-01-02T00:00:00 | 3.0 |
| h_host | 2022-01-02T00:00:00 | 4.0 |
| z_host | 2022-01-01T00:00:00 | 2.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
trigger_table_gc(metasrv, "repart_phy_metric").await;
trigger_full_gc(&ticker).await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
// It should be ok, if we try to compact the table after merge partition.
let compact_sql = "ADMIN COMPACT_TABLE('repart_phy_metric', 'swcs', '3600')";
let _result = run_sql(instance, compact_sql, query_ctx.clone())
.await
.unwrap();
// Should be no change after compact.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
// Trigger GC to clean up the compacted files.
trigger_table_gc(metasrv, "repart_phy_metric").await;
trigger_full_gc(&ticker).await;
// Should be no change after GC.
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` ORDER BY `host`",
query_ctx.clone(),
)
.await
.unwrap();
check_output_stream(result.data, expected).await;
let sst_files_after_gc = cluster.list_sst_files_from_all_datanodes().await;
let sst_files_after_gc_manifests = cluster.list_sst_files_from_manifests().await;
assert_eq!(sst_files_after_gc, sst_files_after_gc_manifests);
let sql = r#"INSERT INTO `repart_log_metric` (`host`, `ts`, `val`) VALUES ('c_host', '2022-01-03 00:00:00', 5.0);"#;
run_sql(instance, sql, query_ctx.clone()).await.unwrap();
let result = run_sql(
instance,
"SELECT * FROM `repart_log_metric` WHERE `host` = 'c_host'",
query_ctx.clone(),
)
.await
.unwrap();
let expected = "\
+--------+---------------------+-----+
| host | ts | val |
+--------+---------------------+-----+
| c_host | 2022-01-03T00:00:00 | 5.0 |
+--------+---------------------+-----+";
check_output_stream(result.data, expected).await;
run_sql(
instance,
"DROP TABLE `repart_log_metric`",
query_ctx.clone(),
)
.await
.unwrap();
run_sql(
instance,
"DROP TABLE `repart_phy_metric`",
query_ctx.clone(),
)
.await
.unwrap();
}
async fn run_sql(
instance: &Arc<Instance>,
sql: &str,
query_ctx: QueryContextRef,
) -> FrontendResult<Output> {
info!("Run SQL: {sql}");
instance.do_query(sql, query_ctx).await.remove(0)
}