test: add rebuild index coverage (#8175)

* test: basic rebuild index test

Signed-off-by: discord9 <discord9@163.com>

* test: address rebuild index review comments

Signed-off-by: discord9 <discord9@163.com>

* test: update build index restart expected output

Signed-off-by: discord9 <discord9@163.com>

---------

Signed-off-by: discord9 <discord9@163.com>
This commit is contained in:
discord9
2026-06-11 22:03:24 +08:00
committed by GitHub
parent 3608c90b2e
commit 2febea6ec3
7 changed files with 1058 additions and 5 deletions

View File

@@ -27,7 +27,7 @@ use store_api::storage::{RegionId, ScanRequest};
use crate::config::{IndexBuildMode, MitoConfig, Mode};
use crate::engine::MitoEngine;
use crate::engine::compaction_test::put_and_flush;
use crate::engine::listener::IndexBuildListener;
use crate::engine::listener::{GateIndexBuildListener, IndexBuildListener};
use crate::read::scan_region::Scanner;
use crate::sst::location;
use crate::test_util::{
@@ -298,6 +298,95 @@ async fn test_index_build_type_schema_change() {
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
}
/// Tests that a schema change (ALTER SetIndexes) triggers index rebuild
/// for all pre-existing SST files, not just one. Covers the scenario
/// where multiple SSTs were flushed before the index was defined:
/// 1. Create region without index, flush 3 files.
/// 2. Reset scheduler state via reopen_region (flush-triggered no-index
/// builds pollute building_files).
/// 3. Verify 3 SST files and 0 index files.
/// 4. ALTER SetIndexes — triggers rebuild of all 3 inconsistent SSTs.
/// 5. Wait for 3 finishes, then verify 3 SST files + 3 index files.
#[tokio::test]
async fn test_index_build_type_schema_change_multiple_files() {
let mut env = TestEnv::with_prefix("test_index_build_type_schema_change_multiple_files_").await;
let listener = Arc::new(IndexBuildListener::default());
let engine = env
.create_engine_with(
async_build_mode_config(true),
None,
Some(listener.clone()),
None,
)
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
// Create a region without index.
let request = CreateRequestBuilder::new().build();
let table_dir = request.table_dir.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 3 SST files without any index defined.
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
put_and_flush(&engine, region_id, &column_schemas, 30..40).await;
// Async flush still schedules index builds for flushed SSTs. Since this
// region has no index metadata yet, those builds are no-ops; if they already
// stopped, reopening is harmless, and if they are still running, reopening
// clears building_files so the subsequent ALTER rebuild schedules cleanly.
reopen_region(&engine, region_id, table_dir, true, HashMap::new()).await;
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 3);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 0);
// Set Index via ALTER — triggers schema-change rebuild of all 3 SSTs.
let set_index_request = RegionAlterRequest {
kind: AlterKind::SetIndexes {
options: vec![SetIndexOption::Inverted {
column_name: "tag_0".to_string(),
}],
},
};
engine
.handle_request(region_id, RegionRequest::Alter(set_index_request))
.await
.unwrap();
// Wait for all 3 schema-change rebuilds to finish.
tokio::time::timeout(std::time::Duration::from_secs(5), listener.wait_finish(3))
.await
.unwrap();
assert_eq!(listener.finish_count(), 3);
// Verify all 3 SST files now have corresponding index files.
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 3);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 3);
}
#[tokio::test]
async fn test_index_build_type_manual_basic() {
let mut env = TestEnv::with_prefix("test_index_build_type_manual_").await;
@@ -465,3 +554,277 @@ async fn test_index_build_type_manual_consistency() {
// Because the file is inconsistent, new index build task is triggered.
assert_listener_counts(&listener, 2, 2);
}
#[tokio::test]
async fn test_gate_index_build_listener_smoke() {
use store_api::storage::{FileId, RegionId};
use crate::engine::listener::{EventListener, GateIndexBuildListener};
use crate::sst::file::RegionFileId;
let gate = Arc::new(GateIndexBuildListener::default());
// Initial counts are zero.
assert_eq!(gate.begin_count(), 0);
assert_eq!(gate.finish_count(), 0);
assert_eq!(gate.abort_count(), 0);
// Spawn a task that will block in on_index_build_begin.
let gate_clone = gate.clone();
let handle = tokio::spawn(async move {
gate_clone
.on_index_build_begin(RegionFileId::new(RegionId::new(1, 1), FileId::random()))
.await;
});
// Wait for begin to arrive.
tokio::time::timeout(std::time::Duration::from_secs(5), gate.wait_begin(1))
.await
.unwrap();
assert_eq!(gate.begin_count(), 1);
assert_eq!(gate.finish_count(), 0);
assert_eq!(gate.abort_count(), 0);
// Release the blocked begin.
gate.release_begin();
// The spawned task should now complete.
tokio::time::timeout(std::time::Duration::from_secs(5), handle)
.await
.unwrap()
.unwrap();
}
#[tokio::test]
async fn test_index_build_type_manual_duplicate_in_flight() {
let mut env = TestEnv::with_prefix("test_index_build_type_manual_duplicate_in_flight_").await;
let gate = Arc::new(GateIndexBuildListener::default());
let engine = Arc::new(
env.create_engine_with(
async_build_mode_config(false), // Disable index file creation on flush.
None,
Some(gate.clone()),
None,
)
.await,
);
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
// Create a region with index metadata.
let request = CreateRequestBuilder::new().build_with_index();
let table_dir = request.table_dir.clone();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush: the flush-triggered index build begins but is blocked by the gate.
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
// Wait for the flush-triggered build to begin (blocked by gate).
tokio::time::timeout(std::time::Duration::from_secs(5), gate.wait_begin(1))
.await
.unwrap();
assert_eq!(gate.begin_count(), 1);
assert_eq!(gate.finish_count(), 0);
assert_eq!(gate.abort_count(), 0);
// Release the gate so the flush-triggered build can complete.
// With create_on_flush=false the build produces no index file (file_size=0),
// so on_index_build_finish is NOT called.
gate.release_begin();
// Reset scheduler state via reopen_region. This ensures the flush-triggered
// build's entry is removed from building_files and the scheduler is clean.
reopen_region(&engine, region_id, table_dir.clone(), true, HashMap::new()).await;
// Verify no index file exists after flush (create_on_flush=false).
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 0);
// Spawn the first manual BuildIndex in background. It will schedule the task,
// the file enters building_files, and the begin is blocked by the gate.
// handle_request blocks until the background collector sends the response,
// so we must spawn it in a separate task.
let engine_clone = engine.clone();
let first_handle = tokio::spawn(async move {
let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {});
engine_clone.handle_request(region_id, request).await
});
// Wait for the first manual build to begin (blocked by gate).
tokio::time::timeout(std::time::Duration::from_secs(5), gate.wait_begin(2))
.await
.unwrap(); // begin 1 = flush, begin 2 = first manual
assert_eq!(gate.begin_count(), 2);
assert_eq!(gate.finish_count(), 0);
assert_eq!(gate.abort_count(), 0);
// Issue the second manual BuildIndex for the same region/file.
// Since the file is already in building_files (from the first manual build),
// schedule_build detects the duplicate and calls on_index_build_abort.
let request = RegionRequest::BuildIndex(RegionBuildIndexRequest {});
engine.handle_request(region_id, request).await.unwrap();
// The second request should have been aborted as duplicate.
assert_eq!(gate.abort_count(), 1, "duplicate request should be aborted");
assert_eq!(gate.begin_count(), 2, "no new begin for duplicate");
assert_eq!(gate.finish_count(), 0, "first build hasn't finished yet");
// Release the gate to let the first manual build proceed.
gate.release_begin();
tokio::time::timeout(std::time::Duration::from_secs(5), gate.wait_finish(1))
.await
.unwrap(); // first manual build completes
// Final counts: only one successful build, one aborted duplicate.
assert_eq!(gate.begin_count(), 2); // flush + first manual
assert_eq!(gate.finish_count(), 1); // first manual only
assert_eq!(gate.abort_count(), 1); // second manual duplicate abort
// Await the first build's handle_request to ensure it completed cleanly.
tokio::time::timeout(std::time::Duration::from_secs(5), first_handle)
.await
.unwrap()
.unwrap()
.unwrap();
// Verify exactly one SST and one index file.
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
}
/// Tests the race between an in-flight index build and a compaction that
/// removes the source SST. The test blocks all flush-triggered index builds
/// via [`GateIndexBuildListener`] so that at least one old SST build is
/// still at `on_index_build_begin` when compaction completes. After the
/// gate is released, the stale builds find the SSTs gone and abort, while the
/// compaction-triggered build for the new SST succeeds.
///
/// Deterministic orchestration (no sleeps):
/// 1. Flush 3 files — gate blocks all 3 index builds.
/// 2. Flush the 4th file (TWCS trigger_file_num=4) — compaction starts.
/// 3. Wait for 5 begins (4 flush + 1 compaction) — at this point compaction
/// is finished and the old SSTs are removed from the version.
/// 4. Release all 5 blocked begins.
/// 5. Wait for all 5 to stop.
/// 6. Assert: at least one abort, final state = 1 SST + 1 index file.
#[tokio::test]
async fn test_index_build_type_compact_abort_race() {
common_telemetry::init_default_ut_logging();
// We must raise max_background_index_builds because the gate blocks all
// flush-triggered builds at `on_index_build_begin`, causing them to be
// in "building_files" indefinitely. The default limit (cpu/8, often ~2-4)
// would prevent the compaction-triggered build from being scheduled.
// Setting a generous limit ensures all 5 builds can be scheduled.
let mut config = async_build_mode_config(true);
config.max_background_index_builds = 8;
let mut env = TestEnv::with_prefix("test_index_build_type_compact_abort_race_").await;
let gate = Arc::new(GateIndexBuildListener::default());
let engine = env
.create_engine_with(config, None, Some(gate.clone()), None)
.await;
let region_id = RegionId::new(1, 1);
env.get_schema_metadata_manager()
.register_region_table_info(
region_id.table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
env.get_kv_backend(),
)
.await;
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.trigger_file_num", "4")
.build_with_index();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 3 files — all 3 index builds blocked at begin by the gate.
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
put_and_flush(&engine, region_id, &column_schemas, 35..45).await;
common_telemetry::info!("After flush 3 files, waiting for begins");
tokio::time::timeout(std::time::Duration::from_secs(5), gate.wait_begin(3))
.await
.unwrap();
assert_eq!(gate.begin_count(), 3);
assert_eq!(gate.finish_count(), 0);
assert_eq!(gate.abort_count(), 0);
// Flush 4th file — triggers compaction on the TWCS picker.
put_and_flush(&engine, region_id, &column_schemas, 45..50).await;
common_telemetry::info!("After flush 4th file, waiting for compaction begin");
// Wait for 5 begins: 4 flush-triggered + 1 compaction-triggered.
// The 5th begin indicates compaction has finished and the compacted SST's
// index build is now blocked at begin. All old SST files have been
// removed from the version at this point.
tokio::time::timeout(std::time::Duration::from_secs(5), gate.wait_begin(5))
.await
.unwrap();
common_telemetry::info!("All 5 builds blocked, releasing gates");
// Release all blocked begins — the old SST builds will see their SSTs
// are gone and abort; the compaction SST build will succeed.
for _ in 0..5 {
gate.release_begin();
}
// Wait for all builds to complete (finish or abort).
tokio::time::timeout(std::time::Duration::from_secs(5), gate.wait_stop(5))
.await
.unwrap();
common_telemetry::info!("All builds stopped, checking results");
// Verify the compaction race caused the old SST index builds to abort. In
// this blocked-then-compact scenario, all 4 flush-triggered builds abort
// (their SST files were removed by compaction) and only the compacted SST
// build finishes.
assert_eq!(gate.begin_count(), 5);
assert_eq!(gate.finish_count(), 1);
assert_eq!(gate.abort_count(), 4);
// Final state: all files compacted into 1 SST with 1 index file.
let scanner = engine
.scanner(region_id, ScanRequest::default())
.await
.unwrap();
assert_eq!(scanner.num_files(), 1);
assert_eq!(num_of_index_files(&engine, &scanner, region_id).await, 1);
}

View File

@@ -21,7 +21,7 @@ use std::time::Duration;
use async_trait::async_trait;
use common_telemetry::info;
use store_api::storage::{FileId, RegionId};
use tokio::sync::Notify;
use tokio::sync::{Notify, Semaphore};
use crate::sst::file::RegionFileId;
@@ -394,6 +394,11 @@ impl IndexBuildListener {
pub fn begin_count(&self) -> usize {
self.begin_count.load(Ordering::Relaxed)
}
/// Returns the abort count.
pub fn abort_count(&self) -> usize {
self.abort_count.load(Ordering::Relaxed)
}
}
#[async_trait]
@@ -418,3 +423,132 @@ impl EventListener for IndexBuildListener {
self.stop_notify.notify_one();
}
}
/// A listener that can gate (pause) index build at the begin stage.
///
/// Like [`IndexBuildListener`], it tracks begin/finish/abort counts and exposes
/// wait methods. Additionally, [`GateIndexBuildListener::on_index_build_begin`]
/// blocks until the test calls [`GateIndexBuildListener::release_begin`],
/// enabling deterministic race-condition tests without sleep/flakiness.
///
/// # Usage
///
/// ```ignore
/// let gate = Arc::new(GateIndexBuildListener::default());
/// // pass gate as listener to engine
/// // ... trigger index build ...
/// gate.wait_begin(1).await; // wait for begin to arrive
/// // ... do something while begin is blocked ...
/// gate.release_begin(); // let begin proceed
/// ```
pub struct GateIndexBuildListener {
begin_count: AtomicUsize,
begin_notify: Notify,
/// Blocks `on_index_build_begin` until released by the test.
begin_blocker: Semaphore,
finish_count: AtomicUsize,
finish_notify: Notify,
abort_count: AtomicUsize,
abort_notify: Notify,
/// stop = finish or abort
stop_notify: Notify,
}
impl Default for GateIndexBuildListener {
fn default() -> Self {
Self {
begin_count: AtomicUsize::new(0),
begin_notify: Notify::new(),
begin_blocker: Semaphore::new(0),
finish_count: AtomicUsize::new(0),
finish_notify: Notify::new(),
abort_count: AtomicUsize::new(0),
abort_notify: Notify::new(),
stop_notify: Notify::new(),
}
}
}
impl GateIndexBuildListener {
/// Wait until index build begin is reached for `times` times.
pub async fn wait_begin(&self, times: usize) {
while self.begin_count.load(Ordering::Relaxed) < times {
self.begin_notify.notified().await;
}
}
/// Wait until index build finish is done for `times` times.
pub async fn wait_finish(&self, times: usize) {
while self.finish_count.load(Ordering::Relaxed) < times {
self.finish_notify.notified().await;
}
}
/// Wait until index build is stopped (finished or aborted) for `times` times.
pub async fn wait_stop(&self, times: usize) {
while self.finish_count.load(Ordering::Relaxed) + self.abort_count.load(Ordering::Relaxed)
< times
{
self.stop_notify.notified().await;
}
}
/// Release one blocked `on_index_build_begin` invocation.
///
/// This adds one semaphore permit, so each call releases at most one blocked
/// begin hook. If a test blocks multiple index build tasks, call this method
/// once per task. Permits are accumulated if release happens before a task
/// reaches the gate, but the release order still cannot target a specific
/// region/file.
///
/// Tests should generally call [`Self::wait_begin`] first and then release
/// the gate, so the build is known to be blocked before the test performs
/// the operation under test.
pub fn release_begin(&self) {
self.begin_blocker.add_permits(1);
}
/// Returns the begin count.
pub fn begin_count(&self) -> usize {
self.begin_count.load(Ordering::Relaxed)
}
/// Returns the finish count.
pub fn finish_count(&self) -> usize {
self.finish_count.load(Ordering::Relaxed)
}
/// Returns the abort count.
pub fn abort_count(&self) -> usize {
self.abort_count.load(Ordering::Relaxed)
}
}
#[async_trait]
impl EventListener for GateIndexBuildListener {
async fn on_index_build_begin(&self, region_file_id: RegionFileId) {
info!("Region {} index build begin (gated)", region_file_id);
self.begin_count.fetch_add(1, Ordering::Relaxed);
self.begin_notify.notify_one();
// Block until the test releases the gate.
let _permit = self
.begin_blocker
.acquire()
.await
.expect("gate semaphore should not be closed");
}
async fn on_index_build_finish(&self, region_file_id: RegionFileId) {
info!("Region {} index build successfully", region_file_id);
self.finish_count.fetch_add(1, Ordering::Relaxed);
self.finish_notify.notify_one();
self.stop_notify.notify_one();
}
async fn on_index_build_abort(&self, region_file_id: RegionFileId) {
info!("Region {} index build aborted", region_file_id);
self.abort_count.fetch_add(1, Ordering::Relaxed);
self.abort_notify.notify_one();
self.stop_notify.notify_one();
}
}

View File

@@ -2155,12 +2155,25 @@ mod tests {
region_id: RegionId,
reason: IndexBuildType,
) -> IndexBuildTask {
create_mock_task_for_schedule_with_result(env, file_id, region_id, reason)
.await
.0
}
/// Like [`create_mock_task_for_schedule`] but also returns the result receiver
/// so tests can verify pending task cancellation errors.
async fn create_mock_task_for_schedule_with_result(
env: &SchedulerEnv,
file_id: FileId,
region_id: RegionId,
reason: IndexBuildType,
) -> (IndexBuildTask, mpsc::Receiver<Result<IndexBuildOutcome>>) {
let metadata = Arc::new(sst_region_metadata());
let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
let file_purger = Arc::new(NoopFilePurger {});
let indexer_builder = mock_indexer_builder(metadata, env).await;
let (tx, _rx) = mpsc::channel(4);
let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
let (result_tx, result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
let file_meta = FileMeta {
region_id,
@@ -2171,7 +2184,7 @@ mod tests {
let file = FileHandle::new(file_meta.clone(), file_purger.clone());
IndexBuildTask {
let task = IndexBuildTask {
file,
file_meta,
reason,
@@ -2183,7 +2196,8 @@ mod tests {
indexer_builder,
request_sender: tx,
result_sender: result_tx,
}
};
(task, result_rx)
}
#[tokio::test]
@@ -2337,4 +2351,218 @@ mod tests {
scheduler.on_region_dropped(region_id).await;
assert!(!scheduler.region_status.contains_key(&region_id));
}
/// Helper to set up a scheduler with files_limit=1 and 3 scheduled tasks,
/// returning the scheduler, the two pending-task result receivers, and the
/// version_control (needed for no-op assertion after cleanup).
async fn setup_scheduler_with_pending_tasks(
env: &SchedulerEnv,
) -> (
IndexBuildScheduler,
mpsc::Receiver<Result<IndexBuildOutcome>>,
mpsc::Receiver<Result<IndexBuildOutcome>>,
VersionControlRef,
RegionId,
FileId, // building file_id for no-op assertion
) {
let metadata = Arc::new(sst_region_metadata());
let region_id = metadata.region_id;
let file_purger = Arc::new(NoopFilePurger {});
let file_id1 = FileId::random();
let file_id2 = FileId::random();
let file_id3 = FileId::random();
let files = HashMap::from([
(
file_id1,
FileMeta {
region_id,
file_id: file_id1,
file_size: 100,
..Default::default()
},
),
(
file_id2,
FileMeta {
region_id,
file_id: file_id2,
file_size: 100,
..Default::default()
},
),
(
file_id3,
FileMeta {
region_id,
file_id: file_id3,
file_size: 100,
..Default::default()
},
),
]);
let version_control =
mock_version_control(metadata.clone(), file_purger.clone(), files).await;
let mut scheduler = env.mock_index_build_scheduler(1);
// task1 becomes the "building" task (files_limit=1).
// We intentionally drop its result receiver: the building task's late-stop
// behavior is covered by the manual on_task_stopped no-op assertion below.
let (task1, _rx1) = create_mock_task_for_schedule_with_result(
env,
file_id1,
region_id,
IndexBuildType::Flush,
)
.await;
let (task2, rx2) = create_mock_task_for_schedule_with_result(
env,
file_id2,
region_id,
IndexBuildType::Flush,
)
.await;
let (task3, rx3) = create_mock_task_for_schedule_with_result(
env,
file_id3,
region_id,
IndexBuildType::Flush,
)
.await;
scheduler
.schedule_build(&version_control, task1)
.await
.unwrap();
scheduler
.schedule_build(&version_control, task2)
.await
.unwrap();
scheduler
.schedule_build(&version_control, task3)
.await
.unwrap();
// Verify: 1 building + 2 pending.
assert!(scheduler.region_status.contains_key(&region_id));
let status = scheduler.region_status.get(&region_id).unwrap();
assert_eq!(status.building_files.len(), 1);
assert_eq!(status.pending_tasks.len(), 2);
(scheduler, rx2, rx3, version_control, region_id, file_id1)
}
/// Patternmatches a pendingtask cancellation error: outer **must** be
/// [`crate::error::Error::BuildIndexAsync`] and the inner source **must** be
/// the lifecycle variant named by `expected_source` (`"dropped"`, `"closed"`,
/// or `"truncated"`). Panics with a descriptive message on mismatch.
fn assert_lifecycle_error(
err: crate::error::Error,
expected_source: &str,
lifecycle_name: &str,
) {
let crate::error::Error::BuildIndexAsync { source, .. } = err else {
panic!("[{lifecycle_name}] Expected BuildIndexAsync outer error, got: {err:?}");
};
let actual = match &*source {
crate::error::Error::RegionDropped { .. } => "dropped",
crate::error::Error::RegionClosed { .. } => "closed",
crate::error::Error::RegionTruncated { .. } => "truncated",
other => panic!(
"[{lifecycle_name}] Expected lifecycle source variant (RegionDropped/RegionClosed/RegionTruncated), got: {other:?}"
),
};
assert_eq!(
actual, expected_source,
"[{lifecycle_name}] Source variant mismatch"
);
}
/// Receives a pendingtask error from `rx` with a 5second timeout, then
/// delegates to [`assert_lifecycle_error`].
async fn recv_lifecycle_error(
rx: &mut mpsc::Receiver<Result<IndexBuildOutcome>>,
expected_source: &str,
lifecycle_name: &str,
) {
let result = tokio::time::timeout(std::time::Duration::from_secs(5), rx.recv())
.await
.unwrap_or_else(|_| {
panic!(
"[{lifecycle_name}] Timeout (5s) waiting for lifecycle error from pending task"
)
});
let result = result.unwrap_or_else(|| {
panic!("[{lifecycle_name}] Channel closed without receiving lifecycle error")
});
let err = result.unwrap_err();
assert_lifecycle_error(err, expected_source, lifecycle_name);
}
#[tokio::test]
async fn test_scheduler_lifecycle_cleanup() {
let env = SchedulerEnv::new().await;
// --- on_region_dropped ---
{
let (mut scheduler, mut rx2, mut rx3, version_control, region_id, building_file_id) =
setup_scheduler_with_pending_tasks(&env).await;
scheduler.on_region_dropped(region_id).await;
// region_status is removed.
assert!(
!scheduler.region_status.contains_key(&region_id),
"region_status should be removed after on_region_dropped"
);
// Pending-task receivers get lifecycle errors (with timeout).
recv_lifecycle_error(&mut rx2, "dropped", "on_region_dropped").await;
recv_lifecycle_error(&mut rx3, "dropped", "on_region_dropped").await;
// on_task_stopped after cleanup is a safe no-op.
scheduler.on_task_stopped(region_id, building_file_id, &version_control);
assert!(!scheduler.region_status.contains_key(&region_id));
}
// --- on_region_closed ---
{
let (mut scheduler, mut rx2, mut rx3, version_control, region_id, building_file_id) =
setup_scheduler_with_pending_tasks(&env).await;
scheduler.on_region_closed(region_id).await;
assert!(
!scheduler.region_status.contains_key(&region_id),
"region_status should be removed after on_region_closed"
);
recv_lifecycle_error(&mut rx2, "closed", "on_region_closed").await;
recv_lifecycle_error(&mut rx3, "closed", "on_region_closed").await;
scheduler.on_task_stopped(region_id, building_file_id, &version_control);
assert!(!scheduler.region_status.contains_key(&region_id));
}
// --- on_region_truncated ---
{
let (mut scheduler, mut rx2, mut rx3, version_control, region_id, building_file_id) =
setup_scheduler_with_pending_tasks(&env).await;
scheduler.on_region_truncated(region_id).await;
assert!(
!scheduler.region_status.contains_key(&region_id),
"region_status should be removed after on_region_truncated"
);
recv_lifecycle_error(&mut rx2, "truncated", "on_region_truncated").await;
recv_lifecycle_error(&mut rx3, "truncated", "on_region_truncated").await;
scheduler.on_task_stopped(region_id, building_file_id, &version_control);
assert!(!scheduler.region_status.contains_key(&region_id));
}
}
}

View File

@@ -0,0 +1,16 @@
-- Test error cases for ADMIN BUILD_INDEX
-- Error: wrong number of args (0 args, expected 1)
ADMIN BUILD_INDEX();
Error: 1004(InvalidArguments), Expected 1 args, but actual 0
-- Error: wrong argument type (Int64, expected Utf8)
ADMIN BUILD_INDEX(1);
Error: 1004(InvalidArguments), Failed to build admin function args: failed to cast 1
-- Error: table does not exist
ADMIN BUILD_INDEX('non_existent_table');
Error: 1002(Unexpected), Failed to execute admin function build_index: Execution error: Table not found: greptime.public.non_existent_table

View File

@@ -0,0 +1,10 @@
-- Test error cases for ADMIN BUILD_INDEX
-- Error: wrong number of args (0 args, expected 1)
ADMIN BUILD_INDEX();
-- Error: wrong argument type (Int64, expected Utf8)
ADMIN BUILD_INDEX(1);
-- Error: table does not exist
ADMIN BUILD_INDEX('non_existent_table');

View File

@@ -0,0 +1,205 @@
CREATE TABLE build_index_restart_test (
ts TIMESTAMP TIME INDEX,
msg TEXT,
);
Affected Rows: 0
INSERT INTO build_index_restart_test VALUES
(1,"The quick brown fox jumps over the lazy dog"),
(2,"The quick brown fox jumps over the lazy cat"),
(3,"The quick brown fox jumps over the lazy mouse"),
(4,"The quick brown fox jumps over the lazy rabbit"),
(5,"The quick brown fox jumps over the lazy turtle");
Affected Rows: 5
ADMIN FLUSH_TABLE('build_index_restart_test');
+-----------------------------------------------+
| ADMIN FLUSH_TABLE('build_index_restart_test') |
+-----------------------------------------------+
| 0 |
+-----------------------------------------------+
-- SQLNESS SLEEP 1s
-- No fulltext index yet
SHOW INDEX FROM build_index_restart_test;
+--------------------------+------------+------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment | Visible | Expression |
+--------------------------+------------+------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
| build_index_restart_test | 1 | TIME INDEX | 1 | ts | A | | | | | TIME | | | YES | |
+--------------------------+------------+------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
-- No physical fulltext index metadata yet.
-- ssts_index_meta.index_type uses physical backend names (fulltext_bloom/fulltext_tantivy),
-- unlike SHOW INDEX's greptime-fulltext-index-* display value.
-- One flushed SST with one fulltext column should produce exactly one fulltext metadata entry.
SELECT COUNT(*) AS fulltext_index_meta_count
FROM information_schema.ssts_index_meta
WHERE table_id = (
SELECT table_id
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'build_index_restart_test'
)
AND index_type LIKE 'fulltext%';
+---------------------------+
| fulltext_index_meta_count |
+---------------------------+
| 0 |
+---------------------------+
ALTER TABLE build_index_restart_test MODIFY COLUMN msg SET FULLTEXT INDEX;
Affected Rows: 0
ADMIN BUILD_INDEX('build_index_restart_test');
+-----------------------------------------------+
| ADMIN BUILD_INDEX('build_index_restart_test') |
+-----------------------------------------------+
| 0 |
+-----------------------------------------------+
-- ALTER may already schedule an async schema-change rebuild. If ADMIN BUILD_INDEX
-- races with it, the manual task can be duplicate-aborted immediately while the
-- schema-change task is still finishing, so wait before checking query visibility.
-- SQLNESS SLEEP 1s
-- Fulltext index built, verify via fulltext query
SELECT msg FROM build_index_restart_test WHERE MATCHES(msg, 'fox') ORDER BY ts;
+------------------------------------------------+
| msg |
+------------------------------------------------+
| The quick brown fox jumps over the lazy dog |
| The quick brown fox jumps over the lazy cat |
| The quick brown fox jumps over the lazy mouse |
| The quick brown fox jumps over the lazy rabbit |
| The quick brown fox jumps over the lazy turtle |
+------------------------------------------------+
-- Verify index metadata
SHOW INDEX FROM build_index_restart_test;
+--------------------------+------------+--------------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment | Visible | Expression |
+--------------------------+------------+--------------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
| build_index_restart_test | 1 | FULLTEXT_INDEX_msg | 1 | msg | A | | | | YES | FULLTEXT | | | YES | |
| build_index_restart_test | 1 | TIME INDEX | 1 | ts | A | | | | | TIME | | | YES | |
+--------------------------+------------+--------------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
-- Verify physical fulltext index metadata before restart
SELECT COUNT(*) AS fulltext_index_meta_count
FROM information_schema.ssts_index_meta
WHERE table_id = (
SELECT table_id
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'build_index_restart_test'
)
AND index_type LIKE 'fulltext%';
+---------------------------+
| fulltext_index_meta_count |
+---------------------------+
| 1 |
+---------------------------+
-- SQLNESS ARG restart=true
-- After restart, verify fulltext query still works
SELECT msg FROM build_index_restart_test WHERE MATCHES(msg, 'fox') ORDER BY ts;
+------------------------------------------------+
| msg |
+------------------------------------------------+
| The quick brown fox jumps over the lazy dog |
| The quick brown fox jumps over the lazy cat |
| The quick brown fox jumps over the lazy mouse |
| The quick brown fox jumps over the lazy rabbit |
| The quick brown fox jumps over the lazy turtle |
+------------------------------------------------+
-- Verify index metadata persists
SHOW INDEX FROM build_index_restart_test;
+--------------------------+------------+--------------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment | Visible | Expression |
+--------------------------+------------+--------------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
| build_index_restart_test | 1 | FULLTEXT_INDEX_msg | 1 | msg | A | | | | YES | FULLTEXT | | | YES | |
| build_index_restart_test | 1 | TIME INDEX | 1 | ts | A | | | | | TIME | | | YES | |
+--------------------------+------------+--------------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
-- Verify physical fulltext index metadata persists
SELECT COUNT(*) AS fulltext_index_meta_count
FROM information_schema.ssts_index_meta
WHERE table_id = (
SELECT table_id
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'build_index_restart_test'
)
AND index_type LIKE 'fulltext%';
+---------------------------+
| fulltext_index_meta_count |
+---------------------------+
| 1 |
+---------------------------+
-- Run ADMIN BUILD_INDEX again (idempotency)
ADMIN BUILD_INDEX('build_index_restart_test');
+-----------------------------------------------+
| ADMIN BUILD_INDEX('build_index_restart_test') |
+-----------------------------------------------+
| 0 |
+-----------------------------------------------+
-- SQLNESS SLEEP 1s
-- Fulltext query still works after idempotent rebuild
SELECT msg FROM build_index_restart_test WHERE MATCHES(msg, 'fox') ORDER BY ts;
+------------------------------------------------+
| msg |
+------------------------------------------------+
| The quick brown fox jumps over the lazy dog |
| The quick brown fox jumps over the lazy cat |
| The quick brown fox jumps over the lazy mouse |
| The quick brown fox jumps over the lazy rabbit |
| The quick brown fox jumps over the lazy turtle |
+------------------------------------------------+
-- Verify index metadata still present
SHOW INDEX FROM build_index_restart_test;
+--------------------------+------------+--------------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
| Table | Non_unique | Key_name | Seq_in_index | Column_name | Collation | Cardinality | Sub_part | Packed | Null | Index_type | Comment | Index_comment | Visible | Expression |
+--------------------------+------------+--------------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
| build_index_restart_test | 1 | FULLTEXT_INDEX_msg | 1 | msg | A | | | | YES | FULLTEXT | | | YES | |
| build_index_restart_test | 1 | TIME INDEX | 1 | ts | A | | | | | TIME | | | YES | |
+--------------------------+------------+--------------------+--------------+-------------+-----------+-------------+----------+--------+------+------------+---------+---------------+---------+------------+
-- Verify idempotent rebuild did not duplicate physical fulltext index metadata
SELECT COUNT(*) AS fulltext_index_meta_count
FROM information_schema.ssts_index_meta
WHERE table_id = (
SELECT table_id
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'build_index_restart_test'
)
AND index_type LIKE 'fulltext%';
+---------------------------+
| fulltext_index_meta_count |
+---------------------------+
| 1 |
+---------------------------+
DROP TABLE build_index_restart_test;
Affected Rows: 0

View File

@@ -0,0 +1,97 @@
CREATE TABLE build_index_restart_test (
ts TIMESTAMP TIME INDEX,
msg TEXT,
);
INSERT INTO build_index_restart_test VALUES
(1,"The quick brown fox jumps over the lazy dog"),
(2,"The quick brown fox jumps over the lazy cat"),
(3,"The quick brown fox jumps over the lazy mouse"),
(4,"The quick brown fox jumps over the lazy rabbit"),
(5,"The quick brown fox jumps over the lazy turtle");
ADMIN FLUSH_TABLE('build_index_restart_test');
-- SQLNESS SLEEP 1s
-- No fulltext index yet
SHOW INDEX FROM build_index_restart_test;
-- No physical fulltext index metadata yet.
-- ssts_index_meta.index_type uses physical backend names (fulltext_bloom/fulltext_tantivy),
-- unlike SHOW INDEX's greptime-fulltext-index-* display value.
-- One flushed SST with one fulltext column should produce exactly one fulltext metadata entry.
SELECT COUNT(*) AS fulltext_index_meta_count
FROM information_schema.ssts_index_meta
WHERE table_id = (
SELECT table_id
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'build_index_restart_test'
)
AND index_type LIKE 'fulltext%';
ALTER TABLE build_index_restart_test MODIFY COLUMN msg SET FULLTEXT INDEX;
ADMIN BUILD_INDEX('build_index_restart_test');
-- ALTER may already schedule an async schema-change rebuild. If ADMIN BUILD_INDEX
-- races with it, the manual task can be duplicate-aborted immediately while the
-- schema-change task is still finishing, so wait before checking query visibility.
-- SQLNESS SLEEP 1s
-- Fulltext index built, verify via fulltext query
SELECT msg FROM build_index_restart_test WHERE MATCHES(msg, 'fox') ORDER BY ts;
-- Verify index metadata
SHOW INDEX FROM build_index_restart_test;
-- Verify physical fulltext index metadata before restart
SELECT COUNT(*) AS fulltext_index_meta_count
FROM information_schema.ssts_index_meta
WHERE table_id = (
SELECT table_id
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'build_index_restart_test'
)
AND index_type LIKE 'fulltext%';
-- SQLNESS ARG restart=true
-- After restart, verify fulltext query still works
SELECT msg FROM build_index_restart_test WHERE MATCHES(msg, 'fox') ORDER BY ts;
-- Verify index metadata persists
SHOW INDEX FROM build_index_restart_test;
-- Verify physical fulltext index metadata persists
SELECT COUNT(*) AS fulltext_index_meta_count
FROM information_schema.ssts_index_meta
WHERE table_id = (
SELECT table_id
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'build_index_restart_test'
)
AND index_type LIKE 'fulltext%';
-- Run ADMIN BUILD_INDEX again (idempotency)
ADMIN BUILD_INDEX('build_index_restart_test');
-- SQLNESS SLEEP 1s
-- Fulltext query still works after idempotent rebuild
SELECT msg FROM build_index_restart_test WHERE MATCHES(msg, 'fox') ORDER BY ts;
-- Verify index metadata still present
SHOW INDEX FROM build_index_restart_test;
-- Verify idempotent rebuild did not duplicate physical fulltext index metadata
SELECT COUNT(*) AS fulltext_index_meta_count
FROM information_schema.ssts_index_meta
WHERE table_id = (
SELECT table_id
FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = 'build_index_restart_test'
)
AND index_type LIKE 'fulltext%';
DROP TABLE build_index_restart_test;