From ff5fa40b858662df37fcd5fb9f6661c23cb05116 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 4 Nov 2024 19:12:41 +0800 Subject: [PATCH] feat: skip wal --- src/mito2/src/config.rs | 4 ++++ src/mito2/src/worker/handle_write.rs | 28 +++++++++++++++------------- tests-integration/tests/http.rs | 1 + 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 001d5ffaa8..f36d9dd221 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -128,6 +128,9 @@ pub struct MitoConfig { /// To align with the old behavior, the default value is 0 (no restrictions). #[serde(with = "humantime_serde")] pub min_compaction_interval: Duration, + + /// Skip wal + pub skip_wal: bool, } impl Default for MitoConfig { @@ -161,6 +164,7 @@ impl Default for MitoConfig { fulltext_index: FulltextIndexConfig::default(), memtable: MemtableConfig::default(), min_compaction_interval: Duration::from_secs(0), + skip_wal: false, }; // Adjust buffer and cache size according to system memory if we can. diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 85ce49f315..92bdc556e4 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -79,21 +79,23 @@ impl RegionWorkerLoop { region_ctx.set_error(e); } } - match wal_writer.write_to_wal().await.map_err(Arc::new) { - Ok(response) => { - for (region_id, region_ctx) in region_ctxs.iter_mut() { - // Safety: the log store implementation ensures that either the `write_to_wal` fails and no - // response is returned or the last entry ids for each region do exist. - let last_entry_id = response.last_entry_ids.get(region_id).unwrap(); - region_ctx.set_next_entry_id(last_entry_id + 1); + if !self.config.skip_wal { + match wal_writer.write_to_wal().await.map_err(Arc::new) { + Ok(response) => { + for (region_id, region_ctx) in region_ctxs.iter_mut() { + // Safety: the log store implementation ensures that either the `write_to_wal` fails and no + // response is returned or the last entry ids for each region do exist. + let last_entry_id = response.last_entry_ids.get(region_id).unwrap(); + region_ctx.set_next_entry_id(last_entry_id + 1); + } } - } - Err(e) => { - // Failed to write wal. - for mut region_ctx in region_ctxs.into_values() { - region_ctx.set_error(e.clone()); + Err(e) => { + // Failed to write wal. + for mut region_ctx in region_ctxs.into_values() { + region_ctx.set_error(e.clone()); + } + return; } - return; } } } diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 638734faba..314cd05e72 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -897,6 +897,7 @@ sst_write_buffer_size = "8MiB" parallel_scan_channel_size = 32 allow_stale_entries = false min_compaction_interval = "0s" +skip_wal = false [region_engine.mito.index] aux_path = ""