Merge remote-tracking branch 'GreptimeTeam/poc-write-path' into poc-write-path

This commit is contained in:
Lei, HUANG
2025-02-06 08:55:39 +00:00
4 changed files with 77 additions and 8 deletions

View File

@@ -139,7 +139,7 @@ impl BulkPartEncoder {
impl BulkPartEncoder {
/// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`.
fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<BulkPart>> {
pub(crate) fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<BulkPart>> {
let Some((arrow_record_batch, min_ts, max_ts)) =
mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
else {

View File

@@ -29,7 +29,7 @@ use store_api::metadata::RegionMetadataRef;
use crate::error::{InvalidRequestSnafu, Result};
use crate::memtable::key_values::KeyValue;
use crate::memtable::version::SmallMemtableVec;
use crate::memtable::{KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
use crate::memtable::{BulkPart, KeyValues, MemtableBuilderRef, MemtableId, MemtableRef};
/// A partition holds rows with timestamps between `[min, max)`.
#[derive(Debug, Clone)]
@@ -141,6 +141,18 @@ impl TimePartitions {
self.write_multi_parts(kvs, &parts)
}
/// Write bulk to the memtables.
///
/// It creates new partitions if necessary.
pub fn write_bulk(&self, bulk_part: BulkPart) -> Result<()> {
// Get all parts.
let parts = self.list_partitions();
// TODO(yingwen): Now we never flush so we always have a partition.
let last_part = parts.last().unwrap();
last_part.memtable.write_bulk(bulk_part)
}
/// Append memtables in partitions to `memtables`.
pub fn list_memtables(&self, memtables: &mut Vec<MemtableRef>) {
let inner = self.inner.lock().unwrap();

View File

@@ -16,15 +16,18 @@ use std::mem;
use std::sync::Arc;
use api::v1::{Mutation, OpType, Rows, WalEntry, WriteHint};
use futures::future::try_join_all;
use snafu::ResultExt;
use store_api::logstore::provider::Provider;
use store_api::logstore::LogStore;
use store_api::storage::{RegionId, SequenceNumber};
use crate::error::{Error, Result, WriteGroupSnafu};
use crate::memtable::KeyValues;
use crate::error::{Error, JoinSnafu, Result, WriteGroupSnafu};
use crate::memtable::bulk::part::BulkPartEncoder;
use crate::memtable::BulkPart;
use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
use crate::request::OptionOutputTx;
use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
use crate::wal::{EntryId, WalWriter};
/// Notifier to notify write result on drop.
@@ -93,6 +96,8 @@ pub(crate) struct RegionWriteCtx {
notifiers: Vec<WriteNotify>,
/// The write operation is failed and we should not write to the mutable memtable.
failed: bool,
/// Bulk parts to write to the memtable.
bulk_parts: Vec<Option<BulkPart>>,
// Metrics:
/// Rows to put.
@@ -125,6 +130,7 @@ impl RegionWriteCtx {
provider,
notifiers: Vec::new(),
failed: false,
bulk_parts: Vec::new(),
put_num: 0,
delete_num: 0,
}
@@ -208,15 +214,21 @@ impl RegionWriteCtx {
let mutable = &self.version.memtables.mutable;
// Takes mutations from the wal entry.
let mutations = mem::take(&mut self.wal_entry.mutations);
for (mutation, notify) in mutations.into_iter().zip(&mut self.notifiers) {
let bulk_parts = mem::take(&mut self.bulk_parts);
for (bulk_part, notify) in bulk_parts.into_iter().zip(&mut self.notifiers) {
// Write mutation to the memtable.
let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else {
let Some(bulk_part) = bulk_part else {
continue;
};
if let Err(e) = mutable.write(&kvs) {
if let Err(e) = mutable.write_bulk(bulk_part) {
notify.err = Some(Arc::new(e));
}
// let Some(kvs) = KeyValues::new(&self.version.metadata, mutation) else {
// continue;
// };
// if let Err(e) = mutable.write(&kvs) {
// notify.err = Some(Arc::new(e));
// }
}
// Updates region sequence and entry id. Since we stores last sequence and entry id in region, we need
@@ -224,4 +236,35 @@ impl RegionWriteCtx {
self.version_control
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
}
/// Encodes mutations into bulks and clears rows.
pub(crate) async fn encode_bulks(&mut self) -> Result<()> {
let mut tasks = Vec::with_capacity(self.wal_entry.mutations.len());
for mutation in self.wal_entry.mutations.drain(..) {
let metadata = self.version.metadata.clone();
let task = common_runtime::spawn_blocking_global(move || {
let encoder = BulkPartEncoder::new(metadata, true, DEFAULT_ROW_GROUP_SIZE);
let mutations = [mutation];
let part_opt = encoder.encode_mutations(&mutations)?;
let [mut mutation] = mutations;
// TODO(yingwen): This require clone the data, we should avoid this.
mutation.bulk = part_opt
.as_ref()
.map(|part| part.data.to_vec())
.unwrap_or_default();
mutation.rows = None;
Ok((part_opt, mutation))
});
tasks.push(task);
}
let results = try_join_all(tasks).await.context(JoinSnafu)?;
for result in results {
let (part_opt, mutation) = result?;
self.wal_entry.mutations.push(mutation);
self.bulk_parts.push(part_opt);
}
Ok(())
}
}

View File

@@ -69,6 +69,20 @@ impl<S: LogStore> RegionWorkerLoop<S> {
self.prepare_region_write_ctx(write_requests)
};
// Encodes all data into bulk parts before writing into the wal.
{
let _timer = WRITE_STAGE_ELAPSED
.with_label_values(&["encode_bulk"])
.start_timer();
for region_ctx in region_ctxs.values_mut() {
// TODO(yingwen): We don't do region level parallelism as we only test
// one region now.
if let Err(e) = region_ctx.encode_bulks().await.map_err(Arc::new) {
region_ctx.set_error(e);
}
}
}
// Write to WAL.
{
let _timer = WRITE_STAGE_ELAPSED