fix: mistakely removes compaction inputs on failure (#3635)

* fix: mistakely removes compaction inputs on failure

* test: add test for compaction failure

---------

Co-authored-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
Lei, HUANG
2024-04-03 19:54:20 +08:00
committed by GitHub
parent d33435fa84
commit ddeb73fbb7
9 changed files with 154 additions and 24 deletions

View File

@@ -439,8 +439,11 @@ impl CompactionTask for TwcsCompactionTask {
let notify = match self.handle_compaction().await {
Ok((added, deleted)) => {
info!(
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}",
deleted, added, self.compaction_time_window
"Compacted SST files, input: {:?}, output: {:?}, window: {:?}, waiter_num: {}",
deleted,
added,
self.compaction_time_window,
self.waiters.len(),
);
BackgroundNotify::CompactionFinished(CompactionFinished {

View File

@@ -401,6 +401,11 @@ impl MitoEngine {
}),
})
}
/// Returns the purge scheduler.
pub fn purge_scheduler(&self) -> &crate::schedule::scheduler::SchedulerRef {
self.inner.workers.purge_scheduler()
}
}
#[cfg(test)]

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::ops::Range;
use std::sync::Arc;
use api::v1::{ColumnSchema, Rows};
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
@@ -23,8 +24,10 @@ use store_api::region_request::{
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::Notify;
use crate::config::MitoConfig;
use crate::engine::listener::CompactionListener;
use crate::engine::MitoEngine;
use crate::test_util::{
build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
@@ -145,3 +148,73 @@ async fn test_compaction_region() {
let vec = collect_stream_ts(stream).await;
assert_eq!((0..25).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}
// For issue https://github.com/GreptimeTeam/greptimedb/issues/3633
#[tokio::test]
async fn test_readonly_during_compaction() {
common_telemetry::init_default_ut_logging();
let mut env = TestEnv::new();
let listener = Arc::new(CompactionListener::default());
let engine = env
.create_engine_with(
MitoConfig {
// Ensure there is only one background worker for purge task.
max_background_jobs: 1,
..Default::default()
},
None,
Some(listener.clone()),
)
.await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new()
.insert_option("compaction.type", "twcs")
.insert_option("compaction.twcs.max_active_window_files", "1")
.build();
let column_schemas = request
.column_metadatas
.iter()
.map(column_metadata_to_column_schema)
.collect::<Vec<_>>();
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
// Flush 2 SSTs for compaction.
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
// Waits until the engine receives compaction finished request.
listener.wait_handle_finished().await;
// Sets the region to read only mode.
engine.set_writable(region_id, false).unwrap();
// Wakes up the listener.
listener.wake();
let notify = Arc::new(Notify::new());
// We already sets max background jobs to 1, so we can submit a task to the
// purge scheduler to ensure all purge tasks are finished.
let job_notify = notify.clone();
engine
.purge_scheduler()
.schedule(Box::pin(async move {
job_notify.notify_one();
}))
.unwrap();
notify.notified().await;
let scanner = engine.scanner(region_id, ScanRequest::default()).unwrap();
assert_eq!(
2,
scanner.num_files(),
"unexpected files: {:?}",
scanner.file_ids()
);
let stream = scanner.scan().await.unwrap();
let vec = collect_stream_ts(stream).await;
assert_eq!((0..20).map(|v| v * 1000).collect::<Vec<_>>(), vec);
}

View File

@@ -26,13 +26,17 @@ use tokio::sync::Notify;
#[async_trait]
pub trait EventListener: Send + Sync {
/// Notifies the listener that a region is flushed successfully.
fn on_flush_success(&self, region_id: RegionId);
fn on_flush_success(&self, region_id: RegionId) {
let _ = region_id;
}
/// Notifies the listener that the engine is stalled.
fn on_write_stall(&self);
fn on_write_stall(&self) {}
/// Notifies the listener that the region starts to do flush.
async fn on_flush_begin(&self, region_id: RegionId);
async fn on_flush_begin(&self, region_id: RegionId) {
let _ = region_id;
}
/// Notifies the listener that the later drop task starts running.
/// Returns the gc interval if we want to override the default one.
@@ -46,6 +50,12 @@ pub trait EventListener: Send + Sync {
let _ = region_id;
let _ = removed;
}
/// Notifies the listener that the region is going to handle the compaction
/// finished request.
async fn on_handle_compaction_finished(&self, region_id: RegionId) {
let _ = region_id;
}
}
pub type EventListenerRef = Arc<dyn EventListener>;
@@ -70,10 +80,6 @@ impl EventListener for FlushListener {
self.notify.notify_one()
}
fn on_write_stall(&self) {}
async fn on_flush_begin(&self, _region_id: RegionId) {}
}
/// Listener to watch stall events.
@@ -98,8 +104,6 @@ impl EventListener for StallListener {
self.notify.notify_one();
}
async fn on_flush_begin(&self, _region_id: RegionId) {}
}
/// Listener to watch begin flush events.
@@ -130,10 +134,6 @@ impl FlushTruncateListener {
#[async_trait]
impl EventListener for FlushTruncateListener {
fn on_flush_success(&self, _region_id: RegionId) {}
fn on_write_stall(&self) {}
/// Calling this function will block the thread!
/// Notify the listener to perform a truncate region and block the flush region job.
async fn on_flush_begin(&self, region_id: RegionId) {
@@ -169,12 +169,6 @@ impl DropListener {
#[async_trait]
impl EventListener for DropListener {
fn on_flush_success(&self, _region_id: RegionId) {}
fn on_write_stall(&self) {}
async fn on_flush_begin(&self, _region_id: RegionId) {}
fn on_later_drop_begin(&self, _region_id: RegionId) -> Option<Duration> {
Some(self.gc_duration)
}
@@ -185,3 +179,34 @@ impl EventListener for DropListener {
self.notify.notify_one();
}
}
/// Listener on handling compaction requests.
#[derive(Default)]
pub struct CompactionListener {
handle_finished_notify: Notify,
blocker: Notify,
}
impl CompactionListener {
/// Waits for handling compaction finished request.
pub async fn wait_handle_finished(&self) {
self.handle_finished_notify.notified().await;
}
/// Wakes up the listener.
pub fn wake(&self) {
self.blocker.notify_one();
}
}
#[async_trait]
impl EventListener for CompactionListener {
async fn on_handle_compaction_finished(&self, region_id: RegionId) {
info!("Handle compaction finished request, region {region_id}");
self.handle_finished_notify.notify_one();
// Blocks current task.
self.blocker.notified().await;
}
}

View File

@@ -717,7 +717,7 @@ impl OnFailure for CompactionFinished {
region_id: self.region_id,
}));
}
for file in &self.compacted_files {
for file in &self.compaction_outputs {
warn!(
"Cleaning region {} compaction output file: {}",
self.region_id, file.file_id

View File

@@ -157,7 +157,7 @@ impl TestEnv {
.unwrap()
}
/// Creates a new engine with specific config and manager/listener under this env.
/// Creates a new engine with specific config and manager/listener/purge_scheduler under this env.
pub async fn create_engine_with(
&mut self,
config: MitoConfig,

View File

@@ -292,6 +292,11 @@ impl WorkerGroup {
cache_manager,
})
}
/// Returns the purge scheduler.
pub(crate) fn purge_scheduler(&self) -> &SchedulerRef {
&self.purge_scheduler
}
}
fn region_id_to_index(id: RegionId, num_workers: usize) -> usize {
@@ -819,6 +824,15 @@ impl WorkerListener {
let _ = region_id;
let _ = removed;
}
pub(crate) async fn on_handle_compaction_finished(&self, region_id: RegionId) {
#[cfg(any(test, feature = "test"))]
if let Some(listener) = &self.listener {
listener.on_handle_compaction_finished(region_id).await;
}
// Avoid compiler warning.
let _ = region_id;
}
}
#[cfg(test)]

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use common_telemetry::{error, info};
use common_telemetry::{error, info, warn};
use store_api::logstore::LogStore;
use store_api::storage::RegionId;
@@ -55,7 +55,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
region_id: RegionId,
mut request: CompactionFinished,
) {
self.listener.on_handle_compaction_finished(region_id).await;
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
warn!(
"Unable to finish the compaction task for a read only region {}",
region_id
);
return;
};

View File

@@ -191,6 +191,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
mut request: FlushFinished,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut request) else {
warn!(
"Unable to finish the flush task for a read only region {}",
region_id
);
return;
};