mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 13:52:59 +00:00
feat: Implements compaction for bulk memtable (#6923)
* feat: initial bulk memtable compaction implementation Signed-off-by: evenyag <realevenyag@gmail.com> * feat: implement compact for memtable Signed-off-by: evenyag <realevenyag@gmail.com> * test: add tests for bulk memtable compaction Signed-off-by: evenyag <realevenyag@gmail.com> * style: clippy Signed-off-by: evenyag <realevenyag@gmail.com> * chore: address review comments Signed-off-by: evenyag <realevenyag@gmail.com> --------- Signed-off-by: evenyag <realevenyag@gmail.com>
This commit is contained in:
@@ -53,8 +53,8 @@ fn random_array(num: usize) -> BulkPart {
|
||||
.unwrap();
|
||||
BulkPart {
|
||||
batch,
|
||||
max_ts: max,
|
||||
min_ts: min,
|
||||
max_timestamp: max,
|
||||
min_timestamp: min,
|
||||
sequence: 0,
|
||||
timestamp_index: 0,
|
||||
raw_data: None,
|
||||
@@ -86,8 +86,8 @@ fn filter_arrow_impl(part: &BulkPart, min: i64, max: i64) -> Option<BulkPart> {
|
||||
let batch = arrow::compute::filter_record_batch(&part.batch, &predicate).unwrap();
|
||||
Some(BulkPart {
|
||||
batch,
|
||||
max_ts: max,
|
||||
min_ts: min,
|
||||
max_timestamp: max,
|
||||
min_timestamp: min,
|
||||
sequence: 0,
|
||||
timestamp_index: part.timestamp_index,
|
||||
raw_data: None,
|
||||
|
||||
@@ -21,6 +21,7 @@ use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
pub use bulk::part::EncodedBulkPart;
|
||||
use bytes::Bytes;
|
||||
use common_time::Timestamp;
|
||||
use datatypes::arrow::record_batch::RecordBatch;
|
||||
use mito_codec::key_values::KeyValue;
|
||||
@@ -40,6 +41,7 @@ use crate::read::prune::PruneTimeIterator;
|
||||
use crate::read::scan_region::PredicateGroup;
|
||||
use crate::region::options::{MemtableOptions, MergeMode};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
use crate::sst::parquet::SstInfo;
|
||||
|
||||
mod builder;
|
||||
pub mod bulk;
|
||||
@@ -199,6 +201,14 @@ pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
///
|
||||
/// A region must freeze the memtable before invoking this method.
|
||||
fn fork(&self, id: MemtableId, metadata: &RegionMetadataRef) -> MemtableRef;
|
||||
|
||||
/// Compacts the memtable.
|
||||
///
|
||||
/// The `for_flush` is true when the flush job calls this method.
|
||||
fn compact(&self, for_flush: bool) -> Result<()> {
|
||||
let _ = for_flush;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub type MemtableRef = Arc<dyn Memtable>;
|
||||
@@ -394,6 +404,14 @@ pub(crate) struct MemScanMetricsData {
|
||||
pub(crate) scan_cost: Duration,
|
||||
}
|
||||
|
||||
/// Encoded range in the memtable.
|
||||
pub struct EncodedRange {
|
||||
/// Encoded file data.
|
||||
pub data: Bytes,
|
||||
/// Metadata of the encoded range.
|
||||
pub sst_info: SstInfo,
|
||||
}
|
||||
|
||||
/// Builder to build an iterator to read the range.
|
||||
/// The builder should know the projection and the predicate to build the iterator.
|
||||
pub trait IterBuilder: Send + Sync {
|
||||
@@ -416,6 +434,11 @@ pub trait IterBuilder: Send + Sync {
|
||||
}
|
||||
.fail()
|
||||
}
|
||||
|
||||
/// Returns the [EncodedRange] if the range is already encoded into SST.
|
||||
fn encoded_range(&self) -> Option<EncodedRange> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
pub type BoxedIterBuilder = Box<dyn IterBuilder>;
|
||||
|
||||
@@ -21,27 +21,35 @@ pub mod part;
|
||||
pub mod part_reader;
|
||||
mod row_group_reader;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::sync::atomic::{AtomicI64, AtomicU64, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::time::Instant;
|
||||
|
||||
use datatypes::arrow::datatypes::SchemaRef;
|
||||
use mito_codec::key_values::KeyValue;
|
||||
use rayon::prelude::*;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber};
|
||||
use store_api::storage::{ColumnId, RegionId, SequenceNumber};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::error::{Result, UnsupportedOperationSnafu};
|
||||
use crate::flush::WriteBufferManagerRef;
|
||||
use crate::memtable::bulk::context::BulkIterContext;
|
||||
use crate::memtable::bulk::part::BulkPart;
|
||||
use crate::memtable::bulk::part::{BulkPart, BulkPartEncodeMetrics, BulkPartEncoder};
|
||||
use crate::memtable::bulk::part_reader::BulkPartRecordBatchIter;
|
||||
use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, IterBuilder,
|
||||
KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
|
||||
AllocTracker, BoxedBatchIterator, BoxedRecordBatchIterator, EncodedBulkPart, EncodedRange,
|
||||
IterBuilder, KeyValues, MemScanMetrics, Memtable, MemtableBuilder, MemtableId, MemtableRange,
|
||||
MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats, PredicateGroup,
|
||||
};
|
||||
use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
|
||||
use crate::read::flat_merge::FlatMergeIterator;
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::parquet::format::FIXED_POS_COLUMN_NUM;
|
||||
use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE};
|
||||
use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
|
||||
|
||||
/// All parts in a bulk memtable.
|
||||
@@ -63,6 +71,166 @@ impl BulkParts {
|
||||
fn is_empty(&self) -> bool {
|
||||
self.parts.is_empty() && self.encoded_parts.is_empty()
|
||||
}
|
||||
|
||||
/// Returns true if the bulk parts should be merged.
|
||||
fn should_merge_bulk_parts(&self) -> bool {
|
||||
let unmerged_count = self.parts.iter().filter(|wrapper| !wrapper.merging).count();
|
||||
// If the total number of unmerged parts is >= 8, start a merge task.
|
||||
unmerged_count >= 8
|
||||
}
|
||||
|
||||
/// Returns true if the encoded parts should be merged.
|
||||
fn should_merge_encoded_parts(&self) -> bool {
|
||||
let unmerged_count = self
|
||||
.encoded_parts
|
||||
.iter()
|
||||
.filter(|wrapper| !wrapper.merging)
|
||||
.count();
|
||||
// If the total number of unmerged encoded parts is >= 8, start a merge task.
|
||||
unmerged_count >= 8
|
||||
}
|
||||
|
||||
/// Collects unmerged parts and marks them as being merged.
|
||||
/// Returns the collected parts to merge.
|
||||
fn collect_bulk_parts_to_merge(&mut self) -> Vec<PartToMerge> {
|
||||
let mut collected_parts = Vec::new();
|
||||
|
||||
for wrapper in &mut self.parts {
|
||||
if !wrapper.merging {
|
||||
wrapper.merging = true;
|
||||
collected_parts.push(PartToMerge::Bulk {
|
||||
part: wrapper.part.clone(),
|
||||
file_id: wrapper.file_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
collected_parts
|
||||
}
|
||||
|
||||
/// Collects unmerged encoded parts within size threshold and marks them as being merged.
|
||||
/// Returns the collected parts to merge.
|
||||
fn collect_encoded_parts_to_merge(&mut self) -> Vec<PartToMerge> {
|
||||
// Find minimum size among unmerged parts
|
||||
let min_size = self
|
||||
.encoded_parts
|
||||
.iter()
|
||||
.filter(|wrapper| !wrapper.merging)
|
||||
.map(|wrapper| wrapper.part.size_bytes())
|
||||
.min();
|
||||
|
||||
let Some(min_size) = min_size else {
|
||||
return Vec::new();
|
||||
};
|
||||
|
||||
let max_allowed_size = min_size.saturating_mul(16).min(4 * 1024 * 1024);
|
||||
let mut collected_parts = Vec::new();
|
||||
|
||||
for wrapper in &mut self.encoded_parts {
|
||||
if !wrapper.merging {
|
||||
let size = wrapper.part.size_bytes();
|
||||
if size <= max_allowed_size {
|
||||
wrapper.merging = true;
|
||||
collected_parts.push(PartToMerge::Encoded {
|
||||
part: wrapper.part.clone(),
|
||||
file_id: wrapper.file_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
collected_parts
|
||||
}
|
||||
|
||||
/// Installs merged encoded parts and removes the original parts by file ids.
|
||||
/// Returns the total number of rows in the merged parts.
|
||||
fn install_merged_parts<I>(
|
||||
&mut self,
|
||||
merged_parts: I,
|
||||
merged_file_ids: &HashSet<FileId>,
|
||||
merge_encoded: bool,
|
||||
) -> usize
|
||||
where
|
||||
I: IntoIterator<Item = EncodedBulkPart>,
|
||||
{
|
||||
let mut total_output_rows = 0;
|
||||
|
||||
for encoded_part in merged_parts {
|
||||
total_output_rows += encoded_part.metadata().num_rows;
|
||||
self.encoded_parts.push(EncodedPartWrapper {
|
||||
part: encoded_part,
|
||||
file_id: FileId::random(),
|
||||
merging: false,
|
||||
});
|
||||
}
|
||||
|
||||
if merge_encoded {
|
||||
self.encoded_parts
|
||||
.retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
|
||||
} else {
|
||||
self.parts
|
||||
.retain(|wrapper| !merged_file_ids.contains(&wrapper.file_id));
|
||||
}
|
||||
|
||||
total_output_rows
|
||||
}
|
||||
|
||||
/// Resets merging flag for parts with the given file ids.
|
||||
/// Used when merging fails or is cancelled.
|
||||
fn reset_merging_flags(&mut self, file_ids: &HashSet<FileId>, merge_encoded: bool) {
|
||||
if merge_encoded {
|
||||
for wrapper in &mut self.encoded_parts {
|
||||
if file_ids.contains(&wrapper.file_id) {
|
||||
wrapper.merging = false;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for wrapper in &mut self.parts {
|
||||
if file_ids.contains(&wrapper.file_id) {
|
||||
wrapper.merging = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// RAII guard for managing merging flags.
|
||||
/// Automatically resets merging flags when dropped if the merge operation wasn't successful.
|
||||
struct MergingFlagsGuard<'a> {
|
||||
bulk_parts: &'a RwLock<BulkParts>,
|
||||
file_ids: &'a HashSet<FileId>,
|
||||
merge_encoded: bool,
|
||||
success: bool,
|
||||
}
|
||||
|
||||
impl<'a> MergingFlagsGuard<'a> {
|
||||
/// Creates a new guard for the given file ids.
|
||||
fn new(
|
||||
bulk_parts: &'a RwLock<BulkParts>,
|
||||
file_ids: &'a HashSet<FileId>,
|
||||
merge_encoded: bool,
|
||||
) -> Self {
|
||||
Self {
|
||||
bulk_parts,
|
||||
file_ids,
|
||||
merge_encoded,
|
||||
success: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Marks the merge operation as successful.
|
||||
/// When this is called, the guard will not reset the flags on drop.
|
||||
fn mark_success(&mut self) {
|
||||
self.success = true;
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Drop for MergingFlagsGuard<'a> {
|
||||
fn drop(&mut self) {
|
||||
if !self.success
|
||||
&& let Ok(mut parts) = self.bulk_parts.write()
|
||||
{
|
||||
parts.reset_merging_flags(self.file_ids, self.merge_encoded);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Memtable that ingests and scans parts directly.
|
||||
@@ -78,6 +246,14 @@ pub struct BulkMemtable {
|
||||
/// Cached flat SST arrow schema for memtable compaction.
|
||||
#[allow(dead_code)]
|
||||
flat_arrow_schema: SchemaRef,
|
||||
/// Compactor for merging bulk parts
|
||||
compactor: Arc<Mutex<MemtableCompactor>>,
|
||||
/// Dispatcher for scheduling compaction tasks
|
||||
compact_dispatcher: Option<Arc<CompactDispatcher>>,
|
||||
/// Whether the append mode is enabled
|
||||
append_mode: bool,
|
||||
/// Mode to handle duplicate rows while merging
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for BulkMemtable {
|
||||
@@ -115,8 +291,8 @@ impl Memtable for BulkMemtable {
|
||||
let local_metrics = WriteMetrics {
|
||||
key_bytes: 0,
|
||||
value_bytes: fragment.estimated_size(),
|
||||
min_ts: fragment.min_ts,
|
||||
max_ts: fragment.max_ts,
|
||||
min_ts: fragment.min_timestamp,
|
||||
max_ts: fragment.max_timestamp,
|
||||
num_rows: fragment.num_rows(),
|
||||
max_sequence: fragment.sequence,
|
||||
};
|
||||
@@ -126,6 +302,7 @@ impl Memtable for BulkMemtable {
|
||||
bulk_parts.parts.push(BulkPartWrapper {
|
||||
part: fragment,
|
||||
file_id: FileId::random(),
|
||||
merging: false,
|
||||
});
|
||||
|
||||
// Since this operation should be fast, we do it in parts lock scope.
|
||||
@@ -135,6 +312,10 @@ impl Memtable for BulkMemtable {
|
||||
self.update_stats(local_metrics);
|
||||
}
|
||||
|
||||
if self.should_compact() {
|
||||
self.schedule_compact();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -157,6 +338,7 @@ impl Memtable for BulkMemtable {
|
||||
let mut ranges = BTreeMap::new();
|
||||
let mut range_id = 0;
|
||||
|
||||
// TODO(yingwen): Filter ranges by sequence.
|
||||
let context = Arc::new(BulkIterContext::new(
|
||||
self.metadata.clone(),
|
||||
&projection,
|
||||
@@ -286,8 +468,46 @@ impl Memtable for BulkMemtable {
|
||||
max_sequence: AtomicU64::new(0),
|
||||
num_rows: AtomicUsize::new(0),
|
||||
flat_arrow_schema,
|
||||
compactor: Arc::new(Mutex::new(MemtableCompactor::new(metadata.region_id, id))),
|
||||
compact_dispatcher: self.compact_dispatcher.clone(),
|
||||
append_mode: self.append_mode,
|
||||
merge_mode: self.merge_mode,
|
||||
})
|
||||
}
|
||||
|
||||
fn compact(&self, for_flush: bool) -> Result<()> {
|
||||
let mut compactor = self.compactor.lock().unwrap();
|
||||
|
||||
if for_flush {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Try to merge regular parts first
|
||||
let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
|
||||
if should_merge {
|
||||
compactor.merge_bulk_parts(
|
||||
&self.flat_arrow_schema,
|
||||
&self.parts,
|
||||
&self.metadata,
|
||||
!self.append_mode,
|
||||
self.merge_mode,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Then try to merge encoded parts
|
||||
let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
|
||||
if should_merge {
|
||||
compactor.merge_encoded_parts(
|
||||
&self.flat_arrow_schema,
|
||||
&self.parts,
|
||||
&self.metadata,
|
||||
!self.append_mode,
|
||||
self.merge_mode,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl BulkMemtable {
|
||||
@@ -296,12 +516,16 @@ impl BulkMemtable {
|
||||
id: MemtableId,
|
||||
metadata: RegionMetadataRef,
|
||||
write_buffer_manager: Option<WriteBufferManagerRef>,
|
||||
compact_dispatcher: Option<Arc<CompactDispatcher>>,
|
||||
append_mode: bool,
|
||||
merge_mode: MergeMode,
|
||||
) -> Self {
|
||||
let flat_arrow_schema = to_flat_sst_arrow_schema(
|
||||
&metadata,
|
||||
&FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
|
||||
);
|
||||
|
||||
let region_id = metadata.region_id;
|
||||
Self {
|
||||
id,
|
||||
parts: Arc::new(RwLock::new(BulkParts::default())),
|
||||
@@ -312,6 +536,10 @@ impl BulkMemtable {
|
||||
max_sequence: AtomicU64::new(0),
|
||||
num_rows: AtomicUsize::new(0),
|
||||
flat_arrow_schema,
|
||||
compactor: Arc::new(Mutex::new(MemtableCompactor::new(region_id, id))),
|
||||
compact_dispatcher,
|
||||
append_mode,
|
||||
merge_mode,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -340,6 +568,33 @@ impl BulkMemtable {
|
||||
.map(|part_wrapper| part_wrapper.part.estimated_series_count())
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Returns whether the memtable should be compacted.
|
||||
fn should_compact(&self) -> bool {
|
||||
let parts = self.parts.read().unwrap();
|
||||
parts.should_merge_bulk_parts() || parts.should_merge_encoded_parts()
|
||||
}
|
||||
|
||||
/// Schedules a compaction task using the CompactDispatcher.
|
||||
fn schedule_compact(&self) {
|
||||
if let Some(dispatcher) = &self.compact_dispatcher {
|
||||
let task = MemCompactTask {
|
||||
metadata: self.metadata.clone(),
|
||||
parts: self.parts.clone(),
|
||||
flat_arrow_schema: self.flat_arrow_schema.clone(),
|
||||
compactor: self.compactor.clone(),
|
||||
append_mode: self.append_mode,
|
||||
merge_mode: self.merge_mode,
|
||||
};
|
||||
|
||||
dispatcher.dispatch_compact(task);
|
||||
} else {
|
||||
// Uses synchronous compaction if no dispatcher is available.
|
||||
if let Err(e) = self.compact(false) {
|
||||
common_telemetry::error!(e; "Failed to compact table");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator builder for bulk range
|
||||
@@ -373,6 +628,10 @@ impl IterBuilder for BulkRangeIterBuilder {
|
||||
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
|
||||
fn encoded_range(&self) -> Option<EncodedRange> {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Iterator builder for encoded bulk range
|
||||
@@ -407,6 +666,13 @@ impl IterBuilder for EncodedBulkRangeIterBuilder {
|
||||
Ok(Box::new(std::iter::empty()))
|
||||
}
|
||||
}
|
||||
|
||||
fn encoded_range(&self) -> Option<EncodedRange> {
|
||||
Some(EncodedRange {
|
||||
data: self.part.data().clone(),
|
||||
sst_info: self.part.to_sst_info(self.file_id),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct BulkPartWrapper {
|
||||
@@ -414,6 +680,8 @@ struct BulkPartWrapper {
|
||||
/// The unique file id for this part in memtable.
|
||||
#[allow(dead_code)]
|
||||
file_id: FileId,
|
||||
/// Whether this part is currently being merged.
|
||||
merging: bool,
|
||||
}
|
||||
|
||||
struct EncodedPartWrapper {
|
||||
@@ -421,21 +689,405 @@ struct EncodedPartWrapper {
|
||||
/// The unique file id for this part in memtable.
|
||||
#[allow(dead_code)]
|
||||
file_id: FileId,
|
||||
/// Whether this part is currently being merged.
|
||||
merging: bool,
|
||||
}
|
||||
|
||||
/// Enum to wrap different types of parts for unified merging.
|
||||
#[derive(Clone)]
|
||||
enum PartToMerge {
|
||||
/// Raw bulk part.
|
||||
Bulk { part: BulkPart, file_id: FileId },
|
||||
/// Encoded bulk part.
|
||||
Encoded {
|
||||
part: EncodedBulkPart,
|
||||
file_id: FileId,
|
||||
},
|
||||
}
|
||||
|
||||
impl PartToMerge {
|
||||
/// Gets the file ID of this part.
|
||||
fn file_id(&self) -> FileId {
|
||||
match self {
|
||||
PartToMerge::Bulk { file_id, .. } => *file_id,
|
||||
PartToMerge::Encoded { file_id, .. } => *file_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the minimum timestamp of this part.
|
||||
fn min_timestamp(&self) -> i64 {
|
||||
match self {
|
||||
PartToMerge::Bulk { part, .. } => part.min_timestamp,
|
||||
PartToMerge::Encoded { part, .. } => part.metadata().min_timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the maximum timestamp of this part.
|
||||
fn max_timestamp(&self) -> i64 {
|
||||
match self {
|
||||
PartToMerge::Bulk { part, .. } => part.max_timestamp,
|
||||
PartToMerge::Encoded { part, .. } => part.metadata().max_timestamp,
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the number of rows in this part.
|
||||
fn num_rows(&self) -> usize {
|
||||
match self {
|
||||
PartToMerge::Bulk { part, .. } => part.num_rows(),
|
||||
PartToMerge::Encoded { part, .. } => part.metadata().num_rows,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a record batch iterator for this part.
|
||||
fn create_iterator(
|
||||
self,
|
||||
context: Arc<BulkIterContext>,
|
||||
) -> Result<Option<BoxedRecordBatchIterator>> {
|
||||
match self {
|
||||
PartToMerge::Bulk { part, .. } => {
|
||||
let iter = BulkPartRecordBatchIter::new(
|
||||
part.batch, context, None, // No sequence filter for merging
|
||||
);
|
||||
Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
|
||||
}
|
||||
PartToMerge::Encoded { part, .. } => part.read(context, None),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct MemtableCompactor {
|
||||
region_id: RegionId,
|
||||
memtable_id: MemtableId,
|
||||
}
|
||||
|
||||
impl MemtableCompactor {
|
||||
/// Creates a new MemtableCompactor.
|
||||
fn new(region_id: RegionId, memtable_id: MemtableId) -> Self {
|
||||
Self {
|
||||
region_id,
|
||||
memtable_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Merges bulk parts and then encodes the result to an [EncodedBulkPart].
|
||||
fn merge_bulk_parts(
|
||||
&mut self,
|
||||
arrow_schema: &SchemaRef,
|
||||
bulk_parts: &RwLock<BulkParts>,
|
||||
metadata: &RegionMetadataRef,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
// Collects unmerged parts and mark them as being merged
|
||||
let parts_to_merge = bulk_parts.write().unwrap().collect_bulk_parts_to_merge();
|
||||
if parts_to_merge.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let merged_file_ids: HashSet<FileId> =
|
||||
parts_to_merge.iter().map(|part| part.file_id()).collect();
|
||||
let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, false);
|
||||
|
||||
// Sorts parts by row count (ascending) to merge parts with similar row counts.
|
||||
let mut sorted_parts = parts_to_merge;
|
||||
sorted_parts.sort_unstable_by_key(|part| part.num_rows());
|
||||
|
||||
// Groups parts into chunks for concurrent processing.
|
||||
let part_groups: Vec<Vec<PartToMerge>> = sorted_parts
|
||||
.chunks(16)
|
||||
.map(|chunk| chunk.to_vec())
|
||||
.collect();
|
||||
|
||||
let total_groups = part_groups.len();
|
||||
let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
|
||||
let merged_parts = part_groups
|
||||
.into_par_iter()
|
||||
.map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
|
||||
.collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
|
||||
|
||||
// Installs merged parts.
|
||||
let total_output_rows = {
|
||||
let mut parts = bulk_parts.write().unwrap();
|
||||
parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, false)
|
||||
};
|
||||
|
||||
guard.mark_success();
|
||||
|
||||
common_telemetry::debug!(
|
||||
"BulkMemtable {} {} concurrent compact {} groups, {} bulk parts, {} rows, cost: {:?}",
|
||||
self.region_id,
|
||||
self.memtable_id,
|
||||
total_groups,
|
||||
total_parts_to_merge,
|
||||
total_output_rows,
|
||||
start.elapsed()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Merges encoded parts and then encodes the result to an [EncodedBulkPart].
|
||||
fn merge_encoded_parts(
|
||||
&mut self,
|
||||
arrow_schema: &SchemaRef,
|
||||
bulk_parts: &RwLock<BulkParts>,
|
||||
metadata: &RegionMetadataRef,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
) -> Result<()> {
|
||||
let start = Instant::now();
|
||||
|
||||
// Collects unmerged encoded parts within size threshold and mark them as being merged.
|
||||
let parts_to_merge = {
|
||||
let mut parts = bulk_parts.write().unwrap();
|
||||
parts.collect_encoded_parts_to_merge()
|
||||
};
|
||||
|
||||
if parts_to_merge.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let merged_file_ids: HashSet<FileId> =
|
||||
parts_to_merge.iter().map(|part| part.file_id()).collect();
|
||||
let mut guard = MergingFlagsGuard::new(bulk_parts, &merged_file_ids, true);
|
||||
|
||||
if parts_to_merge.len() == 1 {
|
||||
// Only 1 part, don't have to merge - the guard will automatically reset the flag
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Groups parts into chunks for concurrent processing.
|
||||
let part_groups: Vec<Vec<PartToMerge>> = parts_to_merge
|
||||
.chunks(16)
|
||||
.map(|chunk| chunk.to_vec())
|
||||
.collect();
|
||||
|
||||
let total_groups = part_groups.len();
|
||||
let total_parts_to_merge: usize = part_groups.iter().map(|group| group.len()).sum();
|
||||
|
||||
let merged_parts = part_groups
|
||||
.into_par_iter()
|
||||
.map(|group| Self::merge_parts_group(group, arrow_schema, metadata, dedup, merge_mode))
|
||||
.collect::<Result<Vec<Option<EncodedBulkPart>>>>()?;
|
||||
|
||||
// Installs merged parts using iterator and get total output rows
|
||||
let total_output_rows = {
|
||||
let mut parts = bulk_parts.write().unwrap();
|
||||
parts.install_merged_parts(merged_parts.into_iter().flatten(), &merged_file_ids, true)
|
||||
};
|
||||
|
||||
// Marks the operation as successful to prevent flag reset
|
||||
guard.mark_success();
|
||||
|
||||
common_telemetry::debug!(
|
||||
"BulkMemtable {} {} concurrent compact {} groups, {} encoded parts, {} rows, cost: {:?}",
|
||||
self.region_id,
|
||||
self.memtable_id,
|
||||
total_groups,
|
||||
total_parts_to_merge,
|
||||
total_output_rows,
|
||||
start.elapsed()
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Merges a group of parts into a single encoded part.
|
||||
fn merge_parts_group(
|
||||
parts_to_merge: Vec<PartToMerge>,
|
||||
arrow_schema: &SchemaRef,
|
||||
metadata: &RegionMetadataRef,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
) -> Result<Option<EncodedBulkPart>> {
|
||||
if parts_to_merge.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Calculates timestamp bounds for merged data
|
||||
let min_timestamp = parts_to_merge
|
||||
.iter()
|
||||
.map(|p| p.min_timestamp())
|
||||
.min()
|
||||
.unwrap_or(i64::MAX);
|
||||
let max_timestamp = parts_to_merge
|
||||
.iter()
|
||||
.map(|p| p.max_timestamp())
|
||||
.max()
|
||||
.unwrap_or(i64::MIN);
|
||||
|
||||
let context = Arc::new(BulkIterContext::new(
|
||||
metadata.clone(),
|
||||
&None, // No column projection for merging
|
||||
None, // No predicate for merging
|
||||
));
|
||||
|
||||
// Creates iterators for all parts to merge.
|
||||
let iterators: Vec<BoxedRecordBatchIterator> = parts_to_merge
|
||||
.into_iter()
|
||||
.filter_map(|part| part.create_iterator(context.clone()).ok().flatten())
|
||||
.collect();
|
||||
|
||||
if iterators.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let merged_iter =
|
||||
FlatMergeIterator::new(arrow_schema.clone(), iterators, DEFAULT_READ_BATCH_SIZE)?;
|
||||
|
||||
let boxed_iter: BoxedRecordBatchIterator = if dedup {
|
||||
// Applies deduplication based on merge mode
|
||||
match merge_mode {
|
||||
MergeMode::LastRow => {
|
||||
let dedup_iter = FlatDedupIterator::new(merged_iter, FlatLastRow::new(false));
|
||||
Box::new(dedup_iter)
|
||||
}
|
||||
MergeMode::LastNonNull => {
|
||||
// Calculates field column start: total columns - fixed columns - field columns
|
||||
// Field column count = total metadata columns - time index column - primary key columns
|
||||
let field_column_count =
|
||||
metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
|
||||
let total_columns = arrow_schema.fields().len();
|
||||
let field_column_start =
|
||||
total_columns - FIXED_POS_COLUMN_NUM - field_column_count;
|
||||
|
||||
let dedup_iter = FlatDedupIterator::new(
|
||||
merged_iter,
|
||||
FlatLastNonNull::new(field_column_start, false),
|
||||
);
|
||||
Box::new(dedup_iter)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Box::new(merged_iter)
|
||||
};
|
||||
|
||||
// Encodes the merged iterator
|
||||
let encoder = BulkPartEncoder::new(metadata.clone(), DEFAULT_ROW_GROUP_SIZE)?;
|
||||
let mut metrics = BulkPartEncodeMetrics::default();
|
||||
let encoded_part = encoder.encode_record_batch_iter(
|
||||
boxed_iter,
|
||||
arrow_schema.clone(),
|
||||
min_timestamp,
|
||||
max_timestamp,
|
||||
&mut metrics,
|
||||
)?;
|
||||
|
||||
common_telemetry::trace!("merge_parts_group metrics: {:?}", metrics);
|
||||
|
||||
Ok(encoded_part)
|
||||
}
|
||||
}
|
||||
|
||||
/// A memtable compact task to run in background.
|
||||
struct MemCompactTask {
|
||||
metadata: RegionMetadataRef,
|
||||
parts: Arc<RwLock<BulkParts>>,
|
||||
|
||||
/// Cached flat SST arrow schema
|
||||
flat_arrow_schema: SchemaRef,
|
||||
/// Compactor for merging bulk parts
|
||||
compactor: Arc<Mutex<MemtableCompactor>>,
|
||||
/// Whether the append mode is enabled
|
||||
append_mode: bool,
|
||||
/// Mode to handle duplicate rows while merging
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl MemCompactTask {
|
||||
fn compact(&self) -> Result<()> {
|
||||
let mut compactor = self.compactor.lock().unwrap();
|
||||
|
||||
// Tries to merge regular parts first
|
||||
let should_merge = self.parts.read().unwrap().should_merge_bulk_parts();
|
||||
if should_merge {
|
||||
compactor.merge_bulk_parts(
|
||||
&self.flat_arrow_schema,
|
||||
&self.parts,
|
||||
&self.metadata,
|
||||
!self.append_mode,
|
||||
self.merge_mode,
|
||||
)?;
|
||||
}
|
||||
|
||||
// Then tries to merge encoded parts
|
||||
let should_merge = self.parts.read().unwrap().should_merge_encoded_parts();
|
||||
if should_merge {
|
||||
compactor.merge_encoded_parts(
|
||||
&self.flat_arrow_schema,
|
||||
&self.parts,
|
||||
&self.metadata,
|
||||
!self.append_mode,
|
||||
self.merge_mode,
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// Scheduler to run compact tasks in background.
|
||||
#[derive(Debug)]
|
||||
pub struct CompactDispatcher {
|
||||
semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl CompactDispatcher {
|
||||
/// Creates a new dispatcher with the given number of max concurrent tasks.
|
||||
pub fn new(permits: usize) -> Self {
|
||||
Self {
|
||||
semaphore: Arc::new(Semaphore::new(permits)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Dispatches a compact task to run in background.
|
||||
fn dispatch_compact(&self, task: MemCompactTask) {
|
||||
let semaphore = self.semaphore.clone();
|
||||
common_runtime::spawn_global(async move {
|
||||
let Ok(_permit) = semaphore.acquire().await else {
|
||||
return;
|
||||
};
|
||||
|
||||
common_runtime::spawn_blocking_global(move || {
|
||||
if let Err(e) = task.compact() {
|
||||
common_telemetry::error!(e; "Failed to compact memtable, region: {}", task.metadata.region_id);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Builder to build a [BulkMemtable].
|
||||
#[derive(Debug, Default)]
|
||||
pub struct BulkMemtableBuilder {
|
||||
write_buffer_manager: Option<WriteBufferManagerRef>,
|
||||
compact_dispatcher: Option<Arc<CompactDispatcher>>,
|
||||
append_mode: bool,
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl BulkMemtableBuilder {
|
||||
/// Creates a new builder with specific `write_buffer_manager`.
|
||||
pub fn new(write_buffer_manager: Option<WriteBufferManagerRef>) -> Self {
|
||||
pub fn new(
|
||||
write_buffer_manager: Option<WriteBufferManagerRef>,
|
||||
append_mode: bool,
|
||||
merge_mode: MergeMode,
|
||||
) -> Self {
|
||||
Self {
|
||||
write_buffer_manager,
|
||||
compact_dispatcher: None,
|
||||
append_mode,
|
||||
merge_mode,
|
||||
}
|
||||
}
|
||||
|
||||
/// Sets the compact dispatcher.
|
||||
pub fn with_compact_dispatcher(mut self, compact_dispatcher: Arc<CompactDispatcher>) -> Self {
|
||||
self.compact_dispatcher = Some(compact_dispatcher);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl MemtableBuilder for BulkMemtableBuilder {
|
||||
@@ -444,6 +1096,9 @@ impl MemtableBuilder for BulkMemtableBuilder {
|
||||
id,
|
||||
metadata.clone(),
|
||||
self.write_buffer_manager.clone(),
|
||||
self.compact_dispatcher.clone(),
|
||||
self.append_mode,
|
||||
self.merge_mode,
|
||||
))
|
||||
}
|
||||
|
||||
@@ -497,7 +1152,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_bulk_memtable_write_read() {
|
||||
let metadata = metadata_for_test();
|
||||
let memtable = BulkMemtable::new(999, metadata.clone(), None);
|
||||
let memtable =
|
||||
BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
|
||||
|
||||
let test_data = vec![
|
||||
(
|
||||
@@ -559,7 +1215,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_bulk_memtable_ranges_with_projection() {
|
||||
let metadata = metadata_for_test();
|
||||
let memtable = BulkMemtable::new(111, metadata.clone(), None);
|
||||
let memtable =
|
||||
BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
|
||||
|
||||
let bulk_part = create_bulk_part_with_converter(
|
||||
"projection_test",
|
||||
@@ -597,7 +1254,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_bulk_memtable_unsupported_operations() {
|
||||
let metadata = metadata_for_test();
|
||||
let memtable = BulkMemtable::new(111, metadata.clone(), None);
|
||||
let memtable =
|
||||
BulkMemtable::new(111, metadata.clone(), None, None, false, MergeMode::LastRow);
|
||||
|
||||
let key_values = build_key_values_with_ts_seq_values(
|
||||
&metadata,
|
||||
@@ -619,7 +1277,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_bulk_memtable_freeze() {
|
||||
let metadata = metadata_for_test();
|
||||
let memtable = BulkMemtable::new(222, metadata.clone(), None);
|
||||
let memtable =
|
||||
BulkMemtable::new(222, metadata.clone(), None, None, false, MergeMode::LastRow);
|
||||
|
||||
let bulk_part = create_bulk_part_with_converter(
|
||||
"freeze_test",
|
||||
@@ -640,7 +1299,8 @@ mod tests {
|
||||
#[test]
|
||||
fn test_bulk_memtable_fork() {
|
||||
let metadata = metadata_for_test();
|
||||
let original_memtable = BulkMemtable::new(333, metadata.clone(), None);
|
||||
let original_memtable =
|
||||
BulkMemtable::new(333, metadata.clone(), None, None, false, MergeMode::LastRow);
|
||||
|
||||
let bulk_part =
|
||||
create_bulk_part_with_converter("fork_test", 15, vec![15000], vec![Some(150.0)], 1500)
|
||||
@@ -657,4 +1317,120 @@ mod tests {
|
||||
assert!(!original_memtable.is_empty());
|
||||
assert_eq!(1, original_memtable.stats().num_rows);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bulk_memtable_ranges_multiple_parts() {
|
||||
let metadata = metadata_for_test();
|
||||
let memtable =
|
||||
BulkMemtable::new(777, metadata.clone(), None, None, false, MergeMode::LastRow);
|
||||
|
||||
let parts_data = vec![
|
||||
(
|
||||
"part1",
|
||||
1u32,
|
||||
vec![1000i64, 1100i64],
|
||||
vec![Some(10.0), Some(11.0)],
|
||||
100u64,
|
||||
),
|
||||
(
|
||||
"part2",
|
||||
2u32,
|
||||
vec![2000i64, 2100i64],
|
||||
vec![Some(20.0), Some(21.0)],
|
||||
200u64,
|
||||
),
|
||||
("part3", 3u32, vec![3000i64], vec![Some(30.0)], 300u64),
|
||||
];
|
||||
|
||||
for (k0, k1, timestamps, values, seq) in parts_data {
|
||||
let part = create_bulk_part_with_converter(k0, k1, timestamps, values, seq).unwrap();
|
||||
memtable.write_bulk(part).unwrap();
|
||||
}
|
||||
|
||||
let predicate_group = PredicateGroup::new(&metadata, &[]);
|
||||
let ranges = memtable.ranges(None, predicate_group, None).unwrap();
|
||||
|
||||
assert_eq!(3, ranges.ranges.len());
|
||||
assert_eq!(5, ranges.stats.num_rows);
|
||||
assert_eq!(3, ranges.stats.num_ranges);
|
||||
|
||||
for (range_id, range) in ranges.ranges.iter() {
|
||||
assert!(*range_id < 3);
|
||||
assert!(range.num_rows() > 0);
|
||||
assert!(range.is_record_batch());
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bulk_memtable_ranges_with_sequence_filter() {
|
||||
let metadata = metadata_for_test();
|
||||
let memtable =
|
||||
BulkMemtable::new(888, metadata.clone(), None, None, false, MergeMode::LastRow);
|
||||
|
||||
let part = create_bulk_part_with_converter(
|
||||
"seq_test",
|
||||
1,
|
||||
vec![1000, 2000, 3000],
|
||||
vec![Some(10.0), Some(20.0), Some(30.0)],
|
||||
500,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
memtable.write_bulk(part).unwrap();
|
||||
|
||||
let predicate_group = PredicateGroup::new(&metadata, &[]);
|
||||
let sequence_filter = Some(400u64); // Filters out rows with sequence > 400
|
||||
let ranges = memtable
|
||||
.ranges(None, predicate_group, sequence_filter)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(1, ranges.ranges.len());
|
||||
let range = ranges.ranges.get(&0).unwrap();
|
||||
|
||||
let mut record_batch_iter = range.build_record_batch_iter(None).unwrap();
|
||||
assert!(record_batch_iter.next().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_bulk_memtable_ranges_with_encoded_parts() {
|
||||
let metadata = metadata_for_test();
|
||||
let memtable =
|
||||
BulkMemtable::new(999, metadata.clone(), None, None, false, MergeMode::LastRow);
|
||||
|
||||
// Adds enough bulk parts to trigger encoding
|
||||
for i in 0..10 {
|
||||
let part = create_bulk_part_with_converter(
|
||||
&format!("key_{}", i),
|
||||
i,
|
||||
vec![1000 + i as i64 * 100],
|
||||
vec![Some(i as f64 * 10.0)],
|
||||
100 + i as u64,
|
||||
)
|
||||
.unwrap();
|
||||
memtable.write_bulk(part).unwrap();
|
||||
}
|
||||
|
||||
memtable.compact(false).unwrap();
|
||||
|
||||
let predicate_group = PredicateGroup::new(&metadata, &[]);
|
||||
let ranges = memtable.ranges(None, predicate_group, None).unwrap();
|
||||
|
||||
// Should have ranges for both bulk parts and encoded parts
|
||||
assert_eq!(3, ranges.ranges.len());
|
||||
assert_eq!(10, ranges.stats.num_rows);
|
||||
|
||||
for (_range_id, range) in ranges.ranges.iter() {
|
||||
assert!(range.num_rows() > 0);
|
||||
assert!(range.is_record_batch());
|
||||
|
||||
let record_batch_iter = range.build_record_batch_iter(None).unwrap();
|
||||
let mut total_rows = 0;
|
||||
for batch_result in record_batch_iter {
|
||||
let batch = batch_result.unwrap();
|
||||
total_rows += batch.num_rows();
|
||||
assert!(batch.num_rows() > 0);
|
||||
}
|
||||
assert_eq!(total_rows, range.num_rows());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
use std::collections::VecDeque;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use api::helper::{ColumnDataTypeWrapper, value_to_grpc_value};
|
||||
use api::v1::bulk_wal_entry::Body;
|
||||
@@ -23,6 +24,7 @@ use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType, bulk_wal_entry};
|
||||
use bytes::Bytes;
|
||||
use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
|
||||
use common_recordbatch::DfRecordBatch as RecordBatch;
|
||||
use common_time::Timestamp;
|
||||
use common_time::timestamp::TimeUnit;
|
||||
use datatypes::arrow;
|
||||
use datatypes::arrow::array::{
|
||||
@@ -55,24 +57,28 @@ use table::predicate::Predicate;
|
||||
|
||||
use crate::error::{
|
||||
self, ColumnNotFoundSnafu, ComputeArrowSnafu, DataTypeMismatchSnafu, EncodeMemtableSnafu,
|
||||
EncodeSnafu, NewRecordBatchSnafu, Result,
|
||||
EncodeSnafu, InvalidMetadataSnafu, NewRecordBatchSnafu, Result,
|
||||
};
|
||||
use crate::memtable::BoxedRecordBatchIterator;
|
||||
use crate::memtable::bulk::context::BulkIterContextRef;
|
||||
use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
|
||||
use crate::memtable::time_series::{ValueBuilder, Values};
|
||||
use crate::sst::file::FileId;
|
||||
use crate::sst::index::IndexOutput;
|
||||
use crate::sst::parquet::flat_format::primary_key_column_index;
|
||||
use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder, ReadFormat};
|
||||
use crate::sst::parquet::helper::parse_parquet_metadata;
|
||||
use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
|
||||
use crate::sst::to_sst_arrow_schema;
|
||||
|
||||
const INIT_DICT_VALUE_CAPACITY: usize = 8;
|
||||
|
||||
/// A raw bulk part in the memtable.
|
||||
#[derive(Clone)]
|
||||
pub struct BulkPart {
|
||||
pub batch: RecordBatch,
|
||||
pub max_ts: i64,
|
||||
pub min_ts: i64,
|
||||
pub max_timestamp: i64,
|
||||
pub min_timestamp: i64,
|
||||
pub sequence: u64,
|
||||
pub timestamp_index: usize,
|
||||
pub raw_data: Option<ArrowIpc>,
|
||||
@@ -91,8 +97,8 @@ impl TryFrom<BulkWalEntry> for BulkPart {
|
||||
.context(error::ConvertBulkWalEntrySnafu)?;
|
||||
Ok(Self {
|
||||
batch,
|
||||
max_ts: value.max_ts,
|
||||
min_ts: value.min_ts,
|
||||
max_timestamp: value.max_ts,
|
||||
min_timestamp: value.min_ts,
|
||||
sequence: value.sequence,
|
||||
timestamp_index: value.timestamp_index as usize,
|
||||
raw_data: Some(ipc),
|
||||
@@ -109,8 +115,8 @@ impl TryFrom<&BulkPart> for BulkWalEntry {
|
||||
if let Some(ipc) = &value.raw_data {
|
||||
Ok(BulkWalEntry {
|
||||
sequence: value.sequence,
|
||||
max_ts: value.max_ts,
|
||||
min_ts: value.min_ts,
|
||||
max_ts: value.max_timestamp,
|
||||
min_ts: value.min_timestamp,
|
||||
timestamp_index: value.timestamp_index as u32,
|
||||
body: Some(Body::ArrowIpc(ipc.clone())),
|
||||
})
|
||||
@@ -130,8 +136,8 @@ impl TryFrom<&BulkPart> for BulkWalEntry {
|
||||
})?;
|
||||
Ok(BulkWalEntry {
|
||||
sequence: value.sequence,
|
||||
max_ts: value.max_ts,
|
||||
min_ts: value.min_ts,
|
||||
max_ts: value.max_timestamp,
|
||||
min_ts: value.min_timestamp,
|
||||
timestamp_index: value.timestamp_index as u32,
|
||||
body: Some(Body::ArrowIpc(ArrowIpc {
|
||||
schema: schema_bytes,
|
||||
@@ -145,12 +151,7 @@ impl TryFrom<&BulkPart> for BulkWalEntry {
|
||||
|
||||
impl BulkPart {
|
||||
pub(crate) fn estimated_size(&self) -> usize {
|
||||
self.batch
|
||||
.columns()
|
||||
.iter()
|
||||
// If can not get slice memory size, assume 0 here.
|
||||
.map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
|
||||
.sum()
|
||||
record_batch_estimated_size(&self.batch)
|
||||
}
|
||||
|
||||
/// Returns the estimated series count in this BulkPart.
|
||||
@@ -230,6 +231,16 @@ impl BulkPart {
|
||||
}
|
||||
}
|
||||
|
||||
/// More accurate estimation of the size of a record batch.
|
||||
pub(crate) fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
|
||||
batch
|
||||
.columns()
|
||||
.iter()
|
||||
// If can not get slice memory size, assume 0 here.
|
||||
.map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
|
||||
.sum()
|
||||
}
|
||||
|
||||
/// Primary key column builder for handling strings specially.
|
||||
enum PrimaryKeyColumnBuilder {
|
||||
/// String dictionary builder for string types.
|
||||
@@ -435,8 +446,8 @@ impl BulkPartConverter {
|
||||
|
||||
Ok(BulkPart {
|
||||
batch,
|
||||
max_ts: self.max_ts,
|
||||
min_ts: self.min_ts,
|
||||
max_timestamp: self.max_ts,
|
||||
min_timestamp: self.min_ts,
|
||||
sequence: self.max_sequence,
|
||||
timestamp_index,
|
||||
raw_data: None,
|
||||
@@ -517,6 +528,39 @@ impl EncodedBulkPart {
|
||||
&self.metadata
|
||||
}
|
||||
|
||||
/// Returns the size of the encoded data in bytes
|
||||
pub(crate) fn size_bytes(&self) -> usize {
|
||||
self.data.len()
|
||||
}
|
||||
|
||||
/// Returns the encoded data.
|
||||
pub(crate) fn data(&self) -> &Bytes {
|
||||
&self.data
|
||||
}
|
||||
|
||||
/// Converts this `EncodedBulkPart` to `SstInfo`.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `file_id` - The SST file ID to assign to this part
|
||||
///
|
||||
/// # Returns
|
||||
/// Returns a `SstInfo` instance with information derived from this bulk part's metadata
|
||||
pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
|
||||
let unit = self.metadata.region_metadata.time_index_type().unit();
|
||||
SstInfo {
|
||||
file_id,
|
||||
time_range: (
|
||||
Timestamp::new(self.metadata.min_timestamp, unit),
|
||||
Timestamp::new(self.metadata.max_timestamp, unit),
|
||||
),
|
||||
file_size: self.data.len() as u64,
|
||||
num_rows: self.metadata.num_rows,
|
||||
num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
|
||||
file_metadata: Some(self.metadata.parquet_metadata.clone()),
|
||||
index_metadata: IndexOutput::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn read(
|
||||
&self,
|
||||
context: BulkIterContextRef,
|
||||
@@ -555,6 +599,21 @@ pub struct BulkPartMeta {
|
||||
pub region_metadata: RegionMetadataRef,
|
||||
}
|
||||
|
||||
/// Metrics for encoding a part.
|
||||
#[derive(Default, Debug)]
|
||||
pub struct BulkPartEncodeMetrics {
|
||||
/// Cost of iterating over the data.
|
||||
pub iter_cost: Duration,
|
||||
/// Cost of writing the data.
|
||||
pub write_cost: Duration,
|
||||
/// Size of data before encoding.
|
||||
pub raw_size: usize,
|
||||
/// Size of data after encoding.
|
||||
pub encoded_size: usize,
|
||||
/// Number of rows in part.
|
||||
pub num_rows: usize,
|
||||
}
|
||||
|
||||
pub struct BulkPartEncoder {
|
||||
metadata: RegionMetadataRef,
|
||||
row_group_size: usize,
|
||||
@@ -562,22 +621,91 @@ pub struct BulkPartEncoder {
|
||||
}
|
||||
|
||||
impl BulkPartEncoder {
|
||||
pub(crate) fn new(metadata: RegionMetadataRef, row_group_size: usize) -> BulkPartEncoder {
|
||||
pub(crate) fn new(
|
||||
metadata: RegionMetadataRef,
|
||||
row_group_size: usize,
|
||||
) -> Result<BulkPartEncoder> {
|
||||
// TODO(yingwen): Skip arrow schema if needed.
|
||||
let json = metadata.to_json().context(InvalidMetadataSnafu)?;
|
||||
let key_value_meta =
|
||||
parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
|
||||
|
||||
// TODO(yingwen): Do we need compression?
|
||||
let writer_props = Some(
|
||||
WriterProperties::builder()
|
||||
.set_key_value_metadata(Some(vec![key_value_meta]))
|
||||
.set_write_batch_size(row_group_size)
|
||||
.set_max_row_group_size(row_group_size)
|
||||
.build(),
|
||||
);
|
||||
Self {
|
||||
|
||||
Ok(Self {
|
||||
metadata,
|
||||
row_group_size,
|
||||
writer_props,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl BulkPartEncoder {
|
||||
/// Encodes [BoxedRecordBatchIterator] into [EncodedBulkPart] with min/max timestamps.
|
||||
pub fn encode_record_batch_iter(
|
||||
&self,
|
||||
iter: BoxedRecordBatchIterator,
|
||||
arrow_schema: SchemaRef,
|
||||
min_timestamp: i64,
|
||||
max_timestamp: i64,
|
||||
metrics: &mut BulkPartEncodeMetrics,
|
||||
) -> Result<Option<EncodedBulkPart>> {
|
||||
let mut buf = Vec::with_capacity(4096);
|
||||
let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
|
||||
.context(EncodeMemtableSnafu)?;
|
||||
let mut total_rows = 0;
|
||||
|
||||
// Process each batch from the iterator
|
||||
let mut iter_start = Instant::now();
|
||||
for batch_result in iter {
|
||||
metrics.iter_cost += iter_start.elapsed();
|
||||
let batch = batch_result?;
|
||||
if batch.num_rows() == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
metrics.raw_size += record_batch_estimated_size(&batch);
|
||||
let write_start = Instant::now();
|
||||
writer.write(&batch).context(EncodeMemtableSnafu)?;
|
||||
metrics.write_cost += write_start.elapsed();
|
||||
total_rows += batch.num_rows();
|
||||
iter_start = Instant::now();
|
||||
}
|
||||
metrics.iter_cost += iter_start.elapsed();
|
||||
iter_start = Instant::now();
|
||||
|
||||
if total_rows == 0 {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let close_start = Instant::now();
|
||||
let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
|
||||
metrics.write_cost += close_start.elapsed();
|
||||
metrics.encoded_size += buf.len();
|
||||
metrics.num_rows += total_rows;
|
||||
|
||||
let buf = Bytes::from(buf);
|
||||
let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
|
||||
|
||||
Ok(Some(EncodedBulkPart {
|
||||
data: buf,
|
||||
metadata: BulkPartMeta {
|
||||
num_rows: total_rows,
|
||||
max_timestamp,
|
||||
min_timestamp,
|
||||
parquet_metadata,
|
||||
region_metadata: self.metadata.clone(),
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
/// Encodes bulk part to a [EncodedBulkPart], returns the encoded data.
|
||||
fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
|
||||
if part.batch.num_rows() == 0 {
|
||||
@@ -602,8 +730,8 @@ impl BulkPartEncoder {
|
||||
data: buf,
|
||||
metadata: BulkPartMeta {
|
||||
num_rows: part.batch.num_rows(),
|
||||
max_timestamp: part.max_ts,
|
||||
min_timestamp: part.min_ts,
|
||||
max_timestamp: part.max_timestamp,
|
||||
min_timestamp: part.min_timestamp,
|
||||
parquet_metadata,
|
||||
region_metadata: self.metadata.clone(),
|
||||
},
|
||||
@@ -1208,7 +1336,7 @@ mod tests {
|
||||
converter.append_key_values(&kv).unwrap();
|
||||
}
|
||||
let part = converter.convert().unwrap();
|
||||
let encoder = BulkPartEncoder::new(metadata, 1024);
|
||||
let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
|
||||
encoder.encode_part(&part).unwrap().unwrap()
|
||||
}
|
||||
|
||||
@@ -1287,7 +1415,7 @@ mod tests {
|
||||
converter.append_key_values(&kv).unwrap();
|
||||
}
|
||||
let part = converter.convert().unwrap();
|
||||
let encoder = BulkPartEncoder::new(metadata, 1024);
|
||||
let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
|
||||
encoder.encode_part(&part).unwrap().unwrap()
|
||||
}
|
||||
|
||||
@@ -1417,8 +1545,8 @@ mod tests {
|
||||
let bulk_part = converter.convert().unwrap();
|
||||
|
||||
assert_eq!(bulk_part.num_rows(), 3);
|
||||
assert_eq!(bulk_part.min_ts, 1000);
|
||||
assert_eq!(bulk_part.max_ts, 2000);
|
||||
assert_eq!(bulk_part.min_timestamp, 1000);
|
||||
assert_eq!(bulk_part.max_timestamp, 2000);
|
||||
assert_eq!(bulk_part.sequence, 2);
|
||||
assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
|
||||
|
||||
@@ -1535,8 +1663,8 @@ mod tests {
|
||||
let bulk_part = converter.convert().unwrap();
|
||||
|
||||
assert_eq!(bulk_part.num_rows(), 0);
|
||||
assert_eq!(bulk_part.min_ts, i64::MAX);
|
||||
assert_eq!(bulk_part.max_ts, i64::MIN);
|
||||
assert_eq!(bulk_part.min_timestamp, i64::MAX);
|
||||
assert_eq!(bulk_part.max_timestamp, i64::MIN);
|
||||
assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
|
||||
|
||||
// Validate primary key columns are present in schema even for empty batch
|
||||
@@ -1597,8 +1725,8 @@ mod tests {
|
||||
let bulk_part = converter.convert().unwrap();
|
||||
|
||||
assert_eq!(bulk_part.num_rows(), 3);
|
||||
assert_eq!(bulk_part.min_ts, 1000);
|
||||
assert_eq!(bulk_part.max_ts, 2000);
|
||||
assert_eq!(bulk_part.min_timestamp, 1000);
|
||||
assert_eq!(bulk_part.max_timestamp, 2000);
|
||||
assert_eq!(bulk_part.sequence, 2);
|
||||
assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
|
||||
|
||||
@@ -1801,8 +1929,8 @@ mod tests {
|
||||
let bulk_part = converter.convert().unwrap();
|
||||
|
||||
assert_eq!(bulk_part.num_rows(), 3);
|
||||
assert_eq!(bulk_part.min_ts, 1000);
|
||||
assert_eq!(bulk_part.max_ts, 2000);
|
||||
assert_eq!(bulk_part.min_timestamp, 1000);
|
||||
assert_eq!(bulk_part.max_timestamp, 2000);
|
||||
assert_eq!(bulk_part.sequence, 2);
|
||||
assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
|
||||
|
||||
|
||||
@@ -205,8 +205,8 @@ impl Memtable for SimpleBulkMemtable {
|
||||
self.update_stats(WriteMetrics {
|
||||
key_bytes: 0,
|
||||
value_bytes: part.estimated_size(),
|
||||
min_ts: part.min_ts,
|
||||
max_ts: part.max_ts,
|
||||
min_ts: part.min_timestamp,
|
||||
max_ts: part.max_timestamp,
|
||||
num_rows: part.num_rows(),
|
||||
max_sequence: sequence,
|
||||
});
|
||||
@@ -717,8 +717,8 @@ mod tests {
|
||||
let part = BulkPart {
|
||||
batch: rb,
|
||||
sequence: 1,
|
||||
min_ts: 1,
|
||||
max_ts: 2,
|
||||
min_timestamp: 1,
|
||||
max_timestamp: 2,
|
||||
timestamp_index: 0,
|
||||
raw_data: None,
|
||||
};
|
||||
@@ -883,8 +883,8 @@ mod tests {
|
||||
memtable
|
||||
.write_bulk(BulkPart {
|
||||
batch: rb_with_large_string(0, i32::MAX, ®ion_meta),
|
||||
max_ts: 0,
|
||||
min_ts: 0,
|
||||
max_timestamp: 0,
|
||||
min_timestamp: 0,
|
||||
sequence: 0,
|
||||
timestamp_index: 1,
|
||||
raw_data: None,
|
||||
@@ -895,8 +895,8 @@ mod tests {
|
||||
memtable
|
||||
.write_bulk(BulkPart {
|
||||
batch: rb_with_large_string(1, 3, ®ion_meta),
|
||||
max_ts: 1,
|
||||
min_ts: 1,
|
||||
max_timestamp: 1,
|
||||
min_timestamp: 1,
|
||||
sequence: 1,
|
||||
timestamp_index: 1,
|
||||
raw_data: None,
|
||||
|
||||
@@ -189,8 +189,8 @@ pub fn filter_record_batch(part: &BulkPart, min: i64, max: i64) -> Result<Option
|
||||
.context(error::NewRecordBatchSnafu)?;
|
||||
Ok(Some(BulkPart {
|
||||
batch,
|
||||
max_ts,
|
||||
min_ts,
|
||||
max_timestamp: max_ts,
|
||||
min_timestamp: min_ts,
|
||||
sequence: part.sequence,
|
||||
timestamp_index: part.timestamp_index,
|
||||
raw_data: None,
|
||||
@@ -305,8 +305,8 @@ impl TimePartitions {
|
||||
let (matching_parts, missing_parts) = self.find_partitions_by_time_range(
|
||||
part.timestamps(),
|
||||
&parts,
|
||||
time_type.create_timestamp(part.min_ts),
|
||||
time_type.create_timestamp(part.max_ts),
|
||||
time_type.create_timestamp(part.min_timestamp),
|
||||
time_type.create_timestamp(part.max_timestamp),
|
||||
)?;
|
||||
|
||||
if matching_parts.len() == 1 && missing_parts.is_empty() {
|
||||
@@ -1195,8 +1195,8 @@ mod tests {
|
||||
let min_ts = ts.iter().min().copied().unwrap();
|
||||
BulkPart {
|
||||
batch,
|
||||
max_ts,
|
||||
min_ts,
|
||||
max_timestamp: max_ts,
|
||||
min_timestamp: min_ts,
|
||||
sequence,
|
||||
timestamp_index: 0,
|
||||
raw_data: None,
|
||||
@@ -1308,8 +1308,8 @@ mod tests {
|
||||
|
||||
let part = BulkPart {
|
||||
batch,
|
||||
max_ts: 8000,
|
||||
min_ts: 1000,
|
||||
max_timestamp: 8000,
|
||||
min_timestamp: 1000,
|
||||
sequence: 0,
|
||||
timestamp_index: 0,
|
||||
raw_data: None,
|
||||
@@ -1319,16 +1319,16 @@ mod tests {
|
||||
assert!(result.is_some());
|
||||
let filtered = result.unwrap();
|
||||
assert_eq!(filtered.num_rows(), 1);
|
||||
assert_eq!(filtered.min_ts, 1000);
|
||||
assert_eq!(filtered.max_ts, 1000);
|
||||
assert_eq!(filtered.min_timestamp, 1000);
|
||||
assert_eq!(filtered.max_timestamp, 1000);
|
||||
|
||||
// Test splitting with range [3000, 6000)
|
||||
let result = filter_record_batch(&part, 3000, 6000).unwrap();
|
||||
assert!(result.is_some());
|
||||
let filtered = result.unwrap();
|
||||
assert_eq!(filtered.num_rows(), 1);
|
||||
assert_eq!(filtered.min_ts, 5000);
|
||||
assert_eq!(filtered.max_ts, 5000);
|
||||
assert_eq!(filtered.min_timestamp, 5000);
|
||||
assert_eq!(filtered.max_timestamp, 5000);
|
||||
|
||||
// Test splitting with range that includes no points
|
||||
let result = filter_record_batch(&part, 3000, 4000).unwrap();
|
||||
@@ -1339,7 +1339,7 @@ mod tests {
|
||||
assert!(result.is_some());
|
||||
let filtered = result.unwrap();
|
||||
assert_eq!(filtered.num_rows(), 5);
|
||||
assert_eq!(filtered.min_ts, 1000);
|
||||
assert_eq!(filtered.max_ts, 8000);
|
||||
assert_eq!(filtered.min_timestamp, 1000);
|
||||
assert_eq!(filtered.max_timestamp, 8000);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -260,8 +260,8 @@ impl Memtable for TimeSeriesMemtable {
|
||||
}
|
||||
|
||||
metrics.max_sequence = part.sequence;
|
||||
metrics.max_ts = part.max_ts;
|
||||
metrics.min_ts = part.min_ts;
|
||||
metrics.max_ts = part.max_timestamp;
|
||||
metrics.min_ts = part.min_timestamp;
|
||||
metrics.num_rows = part.num_rows();
|
||||
self.update_stats(metrics);
|
||||
Ok(())
|
||||
|
||||
@@ -80,8 +80,8 @@ impl<S: LogStore> RegionWorkerLoop<S> {
|
||||
|
||||
let part = BulkPart {
|
||||
batch,
|
||||
max_ts,
|
||||
min_ts,
|
||||
max_timestamp: max_ts,
|
||||
min_timestamp: min_ts,
|
||||
sequence: 0,
|
||||
timestamp_index: ts_index,
|
||||
raw_data: Some(request.raw_data),
|
||||
|
||||
Reference in New Issue
Block a user