From 6341fb86c727637fad6aad9d1122247479063155 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Thu, 6 Feb 2025 17:29:57 +0800 Subject: [PATCH] feat: write memtable in parallel (#5456) * feat: write memtable in parallel Signed-off-by: Ruihang Xia * some comments Signed-off-by: Ruihang Xia * remove unwrap Signed-off-by: Ruihang Xia * fix clippy Signed-off-by: Ruihang Xia * unwrap spawn result Signed-off-by: Ruihang Xia * use FuturesUnordered Signed-off-by: Ruihang Xia --------- Signed-off-by: Ruihang Xia --- src/mito2/src/region/opener.rs | 2 +- src/mito2/src/region_write_ctx.rs | 43 +++++++++++++++++++++------- src/mito2/src/worker/handle_write.rs | 31 ++++++++++++++++++-- 3 files changed, 61 insertions(+), 15 deletions(-) diff --git a/src/mito2/src/region/opener.rs b/src/mito2/src/region/opener.rs index 1071a2ffb2..2992a475ab 100644 --- a/src/mito2/src/region/opener.rs +++ b/src/mito2/src/region/opener.rs @@ -554,7 +554,7 @@ where // set next_entry_id and write to memtable. region_write_ctx.set_next_entry_id(last_entry_id + 1); - region_write_ctx.write_memtable(); + region_write_ctx.write_memtable().await; } // TODO(weny): We need to update `flushed_entry_id` in the region manifest diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 0047822b4d..2a1f935245 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -16,6 +16,7 @@ use std::mem; use std::sync::Arc; use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint}; +use futures::stream::{FuturesUnordered, StreamExt}; use snafu::ResultExt; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; @@ -197,23 +198,43 @@ impl RegionWriteCtx { } /// Consumes mutations and writes them into mutable memtable. - pub(crate) fn write_memtable(&mut self) { + pub(crate) async fn write_memtable(&mut self) { debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len()); if self.failed { return; } - let mutable = &self.version.memtables.mutable; - // Takes mutations from the wal entry. - let mutations = mem::take(&mut self.wal_entry.mutations); - for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) { - // Write mutation to the memtable. - let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { - continue; - }; - if let Err(e) = mutable.write(&kvs) { - notify.err = Some(Arc::new(e)); + let mutable = self.version.memtables.mutable.clone(); + let mutations = mem::take(&mut self.wal_entry.mutations) + .into_iter() + .enumerate() + .filter_map(|(i, mutation)| { + let kvs = KeyValues::new(&self.version.metadata, mutation)?; + Some((i, kvs)) + }) + .collect::>(); + + if mutations.len() == 1 { + if let Err(err) = mutable.write(&mutations[0].1) { + self.notifiers[mutations[0].0].err = Some(Arc::new(err)); + } + } else { + let mut tasks = FuturesUnordered::new(); + for (i, kvs) in mutations { + let mutable = mutable.clone(); + // use tokio runtime to schedule tasks. + tasks.push(common_runtime::spawn_blocking_global(move || { + (i, mutable.write(&kvs)) + })); + } + + while let Some(result) = tasks.next().await { + // first unwrap the result from `spawn` above + let (i, result) = result.unwrap(); + if let Err(err) = result { + self.notifiers[i].err = Some(Arc::new(err)); + } } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index 5f9fdd698c..2804aabf1a 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -18,7 +18,7 @@ use std::collections::{hash_map, HashMap}; use std::sync::Arc; use api::v1::OpType; -use common_telemetry::debug; +use common_telemetry::{debug, error}; use snafu::ensure; use store_api::codec::PrimaryKeyEncoding; use store_api::logstore::LogStore; @@ -105,10 +105,35 @@ impl RegionWorkerLoop { let _timer = WRITE_STAGE_ELAPSED .with_label_values(&["write_memtable"]) .start_timer(); - for mut region_ctx in region_ctxs.into_values() { - region_ctx.write_memtable(); + if region_ctxs.len() == 1 { + // fast path for single region. + let mut region_ctx = region_ctxs.into_values().next().unwrap(); + region_ctx.write_memtable().await; put_rows += region_ctx.put_num; delete_rows += region_ctx.delete_num; + } else { + let region_write_task = region_ctxs + .into_values() + .map(|mut region_ctx| { + // use tokio runtime to schedule tasks. + common_runtime::spawn_global(async move { + region_ctx.write_memtable().await; + (region_ctx.put_num, region_ctx.delete_num) + }) + }) + .collect::>(); + + for result in futures::future::join_all(region_write_task).await { + match result { + Ok((put, delete)) => { + put_rows += put; + delete_rows += delete; + } + Err(e) => { + error!(e; "unexpected error when joining region write tasks"); + } + } + } } } WRITE_ROWS_TOTAL