From 713a73e9b262d8b7c1a5f3c744918bd8aa7f9fea Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 6 Feb 2025 15:04:54 +0800 Subject: [PATCH] feat: encode bulk before writing wal * store bulk in wal * write the BulkPart to the memtable directly --- src/mito2/src/memtable/bulk/part.rs | 2 +- src/mito2/src/memtable/time_partition.rs | 14 +++++- src/mito2/src/region_write_ctx.rs | 55 +++++++++++++++++++++--- src/mito2/src/worker/handle_write.rs | 14 ++++++ 4 files changed, 77 insertions(+), 8 deletions(-) diff --git a/src/mito2/src/memtable/bulk/part.rs b/src/mito2/src/memtable/bulk/part.rs index 646f5046e6..e2fd61144e 100644 --- a/src/mito2/src/memtable/bulk/part.rs +++ b/src/mito2/src/memtable/bulk/part.rs @@ -139,7 +139,7 @@ impl BulkPartEncoder { impl BulkPartEncoder { /// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`. - fn encode_mutations(&self, mutations: &[Mutation]) -> Result> { + pub(crate) fn encode_mutations(&self, mutations: &[Mutation]) -> Result> { let Some((arrow_record_batch, min_ts, max_ts)) = mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)? else { diff --git a/src/mito2/src/memtable/time_partition.rs b/src/mito2/src/memtable/time_partition.rs index 4a49d9c031..3fc1d03c0e 100644 --- a/src/mito2/src/memtable/time_partition.rs +++ b/src/mito2/src/memtable/time_partition.rs @@ -29,7 +29,7 @@ use store_api::metadata::RegionMetadataRef; use crate::error::{InvalidRequestSnafu, Result}; use crate::memtable::key_values::KeyValue; use crate::memtable::version::SmallMemtableVec; -use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef}; +use crate::memtable::{BulkPart, KeyValues, MemtableBuilderRef, MemtableId, MemtableRef}; /// A partition holds rows with timestamps between `[min, max)`. #[derive(Debug, Clone)] @@ -141,6 +141,18 @@ impl TimePartitions { self.write_multi_parts(kvs, &parts) } + /// Write bulk to the memtables. + /// + /// It creates new partitions if necessary. + pub fn write_bulk(&self, bulk_part: BulkPart) -> Result<()> { + // Get all parts. + let parts = self.list_partitions(); + + // TODO(yingwen): Now we never flush so we always have a partition. + let last_part = parts.last().unwrap(); + last_part.memtable.write_bulk(bulk_part) + } + /// Append memtables in partitions to `memtables`. pub fn list_memtables(&self, memtables: &mut Vec) { let inner = self.inner.lock().unwrap(); diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs index 2aa1e48880..316a021806 100644 --- a/src/mito2/src/region_write_ctx.rs +++ b/src/mito2/src/region_write_ctx.rs @@ -16,15 +16,18 @@ use std::mem; use std::sync::Arc; use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint}; +use futures::future::try_join_all; use snafu::ResultExt; use store_api::logstore::provider::Provider; use store_api::logstore::LogStore; use store_api::storage::{RegionId, SequenceNumber}; -use crate::error::{Error, Result, WriteGroupSnafu}; -use crate::memtable::KeyValues; +use crate::error::{Error, JoinSnafu, Result, WriteGroupSnafu}; +use crate::memtable::bulk::part::BulkPartEncoder; +use crate::memtable::BulkPart; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::OptionOutputTx; +use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE; use crate::wal::{EntryId, WalWriter}; /// Notifier to notify write result on drop. @@ -93,6 +96,8 @@ pub(crate) struct RegionWriteCtx { notifiers: Vec, /// The write operation is failed and we should not write to the mutable memtable. failed: bool, + /// Bulk parts to write to the memtable. + bulk_parts: Vec>, // Metrics: /// Rows to put. @@ -125,6 +130,7 @@ impl RegionWriteCtx { provider, notifiers: Vec::new(), failed: false, + bulk_parts: Vec::new(), put_num: 0, delete_num: 0, } @@ -208,15 +214,21 @@ impl RegionWriteCtx { let mutable = &self.version.memtables.mutable; // Takes mutations from the wal entry. - let mutations = mem::take(&mut self.wal_entry.mutations); - for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) { + let bulk_parts = mem::take(&mut self.bulk_parts); + for (bulk_part, notify) in bulk_parts.into_iter().zip(&mut self.notifiers) { // Write mutation to the memtable. - let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { + let Some(bulk_part) = bulk_part else { continue; }; - if let Err(e) = mutable.write(&kvs) { + if let Err(e) = mutable.write_bulk(bulk_part) { notify.err = Some(Arc::new(e)); } + // let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else { + // continue; + // }; + // if let Err(e) = mutable.write(&kvs) { + // notify.err = Some(Arc::new(e)); + // } } // Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need @@ -224,4 +236,35 @@ impl RegionWriteCtx { self.version_control .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1); } + + /// Encodes mutations into bulks and clears rows. + pub(crate) async fn encode_bulks(&mut self) -> Result<()> { + let mut tasks = Vec::with_capacity(self.wal_entry.mutations.len()); + for mutation in self.wal_entry.mutations.drain(..) { + let metadata = self.version.metadata.clone(); + let task = common_runtime::spawn_blocking_global(move || { + let encoder = BulkPartEncoder::new(metadata, true, DEFAULT_ROW_GROUP_SIZE); + let mutations = [mutation]; + let part_opt = encoder.encode_mutations(&mutations)?; + let [mut mutation] = mutations; + // TODO(yingwen): This require clone the data, we should avoid this. + mutation.bulk = part_opt + .as_ref() + .map(|part| part.data.to_vec()) + .unwrap_or_default(); + mutation.rows = None; + Ok((part_opt, mutation)) + }); + tasks.push(task); + } + + let results = try_join_all(tasks).await.context(JoinSnafu)?; + for result in results { + let (part_opt, mutation) = result?; + self.wal_entry.mutations.push(mutation); + self.bulk_parts.push(part_opt); + } + + Ok(()) + } } diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index aa84be93fc..7ac9cd1076 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -69,6 +69,20 @@ impl RegionWorkerLoop { self.prepare_region_write_ctx(write_requests) }; + // Encodes all data into bulk parts before writing into the wal. + { + let _timer = WRITE_STAGE_ELAPSED + .with_label_values(&["encode_bulk"]) + .start_timer(); + for region_ctx in region_ctxs.values_mut() { + // TODO(yingwen): We don't do region level parallelism as we only test + // one region now. + if let Err(e) = region_ctx.encode_bulks().await.map_err(Arc::new) { + region_ctx.set_error(e); + } + } + } + // Write to WAL. { let _timer = WRITE_STAGE_ELAPSED