mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
feat: flush other workers if still need flush (#4746)
This commit is contained in:
@@ -402,3 +402,72 @@ async fn test_auto_flush_engine() {
|
||||
+-------+---------+---------------------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_flush_workers() {
|
||||
let mut env = TestEnv::new();
|
||||
let write_buffer_manager = Arc::new(MockWriteBufferManager::default());
|
||||
let listener = Arc::new(FlushListener::default());
|
||||
let engine = env
|
||||
.create_engine_with(
|
||||
MitoConfig {
|
||||
num_workers: 2,
|
||||
..Default::default()
|
||||
},
|
||||
Some(write_buffer_manager.clone()),
|
||||
Some(listener.clone()),
|
||||
)
|
||||
.await;
|
||||
|
||||
let region_id0 = RegionId::new(1, 0);
|
||||
let region_id1 = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().region_dir("r0").build();
|
||||
let column_schemas = rows_schema(&request);
|
||||
engine
|
||||
.handle_request(region_id0, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
let request = CreateRequestBuilder::new().region_dir("r1").build();
|
||||
engine
|
||||
.handle_request(region_id1, RegionRequest::Create(request.clone()))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Prepares rows for flush.
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("a", 0, 2, 0),
|
||||
};
|
||||
put_rows(&engine, region_id0, rows.clone()).await;
|
||||
put_rows(&engine, region_id1, rows).await;
|
||||
|
||||
write_buffer_manager.set_should_flush(true);
|
||||
|
||||
// Writes to the mutable memtable and triggers flush for region 0.
|
||||
let rows = Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows_for_key("b", 0, 2, 0),
|
||||
};
|
||||
put_rows(&engine, region_id0, rows).await;
|
||||
|
||||
// Waits until flush is finished.
|
||||
while listener.success_count() < 2 {
|
||||
listener.wait().await;
|
||||
}
|
||||
|
||||
// Scans region 1.
|
||||
let request = ScanRequest::default();
|
||||
let scanner = engine.scanner(region_id1, request).unwrap();
|
||||
assert_eq!(0, scanner.num_memtables());
|
||||
assert_eq!(1, scanner.num_files());
|
||||
let stream = scanner.scan().await.unwrap();
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| a | 0.0 | 1970-01-01T00:00:00 |
|
||||
| a | 1.0 | 1970-01-01T00:00:01 |
|
||||
+-------+---------+---------------------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@
|
||||
|
||||
//! Engine event listener for tests.
|
||||
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -77,6 +78,7 @@ pub type EventListenerRef = Arc<dyn EventListener>;
|
||||
#[derive(Default)]
|
||||
pub struct FlushListener {
|
||||
notify: Notify,
|
||||
success_count: AtomicUsize,
|
||||
}
|
||||
|
||||
impl FlushListener {
|
||||
@@ -84,6 +86,11 @@ impl FlushListener {
|
||||
pub async fn wait(&self) {
|
||||
self.notify.notified().await;
|
||||
}
|
||||
|
||||
/// Returns the success count.
|
||||
pub fn success_count(&self) -> usize {
|
||||
self.success_count.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -91,7 +98,8 @@ impl EventListener for FlushListener {
|
||||
fn on_flush_success(&self, region_id: RegionId) {
|
||||
info!("Region {} flush successfully", region_id);
|
||||
|
||||
self.notify.notify_one()
|
||||
self.success_count.fetch_add(1, Ordering::Relaxed);
|
||||
self.notify.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -670,6 +670,11 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
// The channel is disconnected.
|
||||
break;
|
||||
} else {
|
||||
// Also flush this worker if other workers trigger flush as this worker may have
|
||||
// a large memtable to flush. We may not have chance to flush that memtable if we
|
||||
// never write to this worker. So only flushing other workers may not release enough
|
||||
// memory.
|
||||
self.maybe_flush_worker();
|
||||
// A flush job is finished, handles stalled requests.
|
||||
self.handle_stalled_requests().await;
|
||||
continue;
|
||||
|
||||
Reference in New Issue
Block a user