diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs index aac02db91e..c134def6aa 100644 --- a/src/mito2/src/engine/flush_test.rs +++ b/src/mito2/src/engine/flush_test.rs @@ -402,3 +402,72 @@ async fn test_auto_flush_engine() { +-------+---------+---------------------+"; assert_eq!(expected, batches.pretty_print().unwrap()); } + +#[tokio::test] +async fn test_flush_workers() { + let mut env = TestEnv::new(); + let write_buffer_manager = Arc::new(MockWriteBufferManager::default()); + let listener = Arc::new(FlushListener::default()); + let engine = env + .create_engine_with( + MitoConfig { + num_workers: 2, + ..Default::default() + }, + Some(write_buffer_manager.clone()), + Some(listener.clone()), + ) + .await; + + let region_id0 = RegionId::new(1, 0); + let region_id1 = RegionId::new(1, 1); + let request = CreateRequestBuilder::new().region_dir("r0").build(); + let column_schemas = rows_schema(&request); + engine + .handle_request(region_id0, RegionRequest::Create(request)) + .await + .unwrap(); + let request = CreateRequestBuilder::new().region_dir("r1").build(); + engine + .handle_request(region_id1, RegionRequest::Create(request.clone())) + .await + .unwrap(); + + // Prepares rows for flush. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("a", 0, 2, 0), + }; + put_rows(&engine, region_id0, rows.clone()).await; + put_rows(&engine, region_id1, rows).await; + + write_buffer_manager.set_should_flush(true); + + // Writes to the mutable memtable and triggers flush for region 0. + let rows = Rows { + schema: column_schemas.clone(), + rows: build_rows_for_key("b", 0, 2, 0), + }; + put_rows(&engine, region_id0, rows).await; + + // Waits until flush is finished. + while listener.success_count() < 2 { + listener.wait().await; + } + + // Scans region 1. + let request = ScanRequest::default(); + let scanner = engine.scanner(region_id1, request).unwrap(); + assert_eq!(0, scanner.num_memtables()); + assert_eq!(1, scanner.num_files()); + let stream = scanner.scan().await.unwrap(); + let batches = RecordBatches::try_collect(stream).await.unwrap(); + let expected = "\ ++-------+---------+---------------------+ +| tag_0 | field_0 | ts | ++-------+---------+---------------------+ +| a | 0.0 | 1970-01-01T00:00:00 | +| a | 1.0 | 1970-01-01T00:00:01 | ++-------+---------+---------------------+"; + assert_eq!(expected, batches.pretty_print().unwrap()); +} diff --git a/src/mito2/src/engine/listener.rs b/src/mito2/src/engine/listener.rs index a79cb6eafd..83679f96e4 100644 --- a/src/mito2/src/engine/listener.rs +++ b/src/mito2/src/engine/listener.rs @@ -14,6 +14,7 @@ //! Engine event listener for tests. +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -77,6 +78,7 @@ pub type EventListenerRef = Arc; #[derive(Default)] pub struct FlushListener { notify: Notify, + success_count: AtomicUsize, } impl FlushListener { @@ -84,6 +86,11 @@ impl FlushListener { pub async fn wait(&self) { self.notify.notified().await; } + + /// Returns the success count. + pub fn success_count(&self) -> usize { + self.success_count.load(Ordering::Relaxed) + } } #[async_trait] @@ -91,7 +98,8 @@ impl EventListener for FlushListener { fn on_flush_success(&self, region_id: RegionId) { info!("Region {} flush successfully", region_id); - self.notify.notify_one() + self.success_count.fetch_add(1, Ordering::Relaxed); + self.notify.notify_one(); } } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 0f872e24e4..c2fbc80982 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -670,6 +670,11 @@ impl RegionWorkerLoop { // The channel is disconnected. break; } else { + // Also flush this worker if other workers trigger flush as this worker may have + // a large memtable to flush. We may not have chance to flush that memtable if we + // never write to this worker. So only flushing other workers may not release enough + // memory. + self.maybe_flush_worker(); // A flush job is finished, handles stalled requests. self.handle_stalled_requests().await; continue;