feat: write memtable in parallel (#5456)

* feat: write memtable in parallel

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* some comments

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove unwrap

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix clippy

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* unwrap spawn result

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* use FuturesUnordered

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2025-02-06 17:29:57 +08:00
committed by GitHub
parent fa09e181be
commit 6341fb86c7
3 changed files with 61 additions and 15 deletions

View File

@@ -554,7 +554,7 @@ where
// set next_entry_id and write to memtable.
region_write_ctx.set_next_entry_id(last_entry_id + 1);
region_write_ctx.write_memtable();
region_write_ctx.write_memtable().await;
}
// TODO(weny): We need to update `flushed_entry_id` in the region manifest

View File

@@ -16,6 +16,7 @@ use std::mem;
use std::sync::Arc;
use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
use futures::stream::{FuturesUnordered, StreamExt};
use snafu::ResultExt;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
@@ -197,23 +198,43 @@ impl RegionWriteCtx {
}
/// Consumes mutations and writes them into mutable memtable.
pub(crate) fn write_memtable(&mut self) {
pub(crate) async fn write_memtable(&mut self) {
debug_assert_eq!(self.notifiers.len(), self.wal_entry.mutations.len());
if self.failed {
return;
}
let mutable = &self.version.memtables.mutable;
// Takes mutations from the wal entry.
let mutations = mem::take(&mut self.wal_entry.mutations);
for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) {
// Write mutation to the memtable.
let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else {
continue;
};
if let Err(e) = mutable.write(&kvs) {
notify.err = Some(Arc::new(e));
let mutable = self.version.memtables.mutable.clone();
let mutations = mem::take(&mut self.wal_entry.mutations)
.into_iter()
.enumerate()
.filter_map(|(i, mutation)| {
let kvs = KeyValues::new(&self.version.metadata, mutation)?;
Some((i, kvs))
})
.collect::<Vec<_>>();
if mutations.len() == 1 {
if let Err(err) = mutable.write(&mutations[0].1) {
self.notifiers[mutations[0].0].err = Some(Arc::new(err));
}
} else {
let mut tasks = FuturesUnordered::new();
for (i, kvs) in mutations {
let mutable = mutable.clone();
// use tokio runtime to schedule tasks.
tasks.push(common_runtime::spawn_blocking_global(move || {
(i, mutable.write(&kvs))
}));
}
while let Some(result) = tasks.next().await {
// first unwrap the result from `spawn` above
let (i, result) = result.unwrap();
if let Err(err) = result {
self.notifiers[i].err = Some(Arc::new(err));
}
}
}

View File

@@ -18,7 +18,7 @@ use std::collections::{hash_map, HashMap};
use std::sync::Arc;
use api::v1::OpType;
use common_telemetry::debug;
use common_telemetry::{debug, error};
use snafu::ensure;
use store_api::codec::PrimaryKeyEncoding;
use store_api::logstore::LogStore;
@@ -105,10 +105,35 @@ impl<S: LogStore> RegionWorkerLoop<S> {
let _timer = WRITE_STAGE_ELAPSED
.with_label_values(&["write_memtable"])
.start_timer();
for mut region_ctx in region_ctxs.into_values() {
region_ctx.write_memtable();
if region_ctxs.len() == 1 {
// fast path for single region.
let mut region_ctx = region_ctxs.into_values().next().unwrap();
region_ctx.write_memtable().await;
put_rows += region_ctx.put_num;
delete_rows += region_ctx.delete_num;
} else {
let region_write_task = region_ctxs
.into_values()
.map(|mut region_ctx| {
// use tokio runtime to schedule tasks.
common_runtime::spawn_global(async move {
region_ctx.write_memtable().await;
(region_ctx.put_num, region_ctx.delete_num)
})
})
.collect::<Vec<_>>();
for result in futures::future::join_all(region_write_task).await {
match result {
Ok((put, delete)) => {
put_rows += put;
delete_rows += delete;
}
Err(e) => {
error!(e; "unexpected error when joining region write tasks");
}
}
}
}
}
WRITE_ROWS_TOTAL