mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 22:02:56 +00:00
feat: do not keep MemtableRefs in ScanInput (#5184)
This commit is contained in:
@@ -110,6 +110,15 @@ impl MemtableStats {
|
||||
|
||||
pub type BoxedBatchIterator = Box<dyn Iterator<Item = Result<Batch>> + Send>;
|
||||
|
||||
/// Ranges in a memtable.
|
||||
#[derive(Default)]
|
||||
pub struct MemtableRanges {
|
||||
/// Range IDs and ranges.
|
||||
pub ranges: BTreeMap<usize, MemtableRange>,
|
||||
/// Statistics of the memtable at the query time.
|
||||
pub stats: MemtableStats,
|
||||
}
|
||||
|
||||
/// In memory write buffer.
|
||||
pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
/// Returns the id of this memtable.
|
||||
@@ -139,7 +148,7 @@ pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
) -> BTreeMap<usize, MemtableRange>;
|
||||
) -> MemtableRanges;
|
||||
|
||||
/// Returns true if the memtable is empty.
|
||||
fn is_empty(&self) -> bool;
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
|
||||
//! Memtable implementation for bulk load
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
@@ -25,7 +24,7 @@ use crate::error::Result;
|
||||
use crate::memtable::bulk::part::BulkPart;
|
||||
use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::{
|
||||
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRange, MemtableRef, MemtableStats,
|
||||
BoxedBatchIterator, KeyValues, Memtable, MemtableId, MemtableRanges, MemtableRef, MemtableStats,
|
||||
};
|
||||
|
||||
#[allow(unused)]
|
||||
@@ -68,7 +67,7 @@ impl Memtable for BulkMemtable {
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<Predicate>,
|
||||
) -> BTreeMap<usize, MemtableRange> {
|
||||
) -> MemtableRanges {
|
||||
todo!()
|
||||
}
|
||||
|
||||
|
||||
@@ -23,7 +23,6 @@ mod shard;
|
||||
mod shard_builder;
|
||||
mod tree;
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
@@ -41,7 +40,7 @@ use crate::memtable::partition_tree::tree::PartitionTree;
|
||||
use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
|
||||
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
|
||||
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
|
||||
};
|
||||
use crate::region::options::MergeMode;
|
||||
|
||||
@@ -176,7 +175,7 @@ impl Memtable for PartitionTreeMemtable {
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
) -> BTreeMap<usize, MemtableRange> {
|
||||
) -> MemtableRanges {
|
||||
let projection = projection.map(|ids| ids.to_vec());
|
||||
let builder = Box::new(PartitionTreeIterBuilder {
|
||||
tree: self.tree.clone(),
|
||||
@@ -185,7 +184,10 @@ impl Memtable for PartitionTreeMemtable {
|
||||
});
|
||||
let context = Arc::new(MemtableRangeContext::new(self.id, builder));
|
||||
|
||||
[(0, MemtableRange::new(context))].into()
|
||||
MemtableRanges {
|
||||
ranges: [(0, MemtableRange::new(context))].into(),
|
||||
stats: self.stats(),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
|
||||
@@ -45,7 +45,7 @@ use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::stats::WriteMetrics;
|
||||
use crate::memtable::{
|
||||
AllocTracker, BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder,
|
||||
MemtableId, MemtableRange, MemtableRangeContext, MemtableRef, MemtableStats,
|
||||
MemtableId, MemtableRange, MemtableRangeContext, MemtableRanges, MemtableRef, MemtableStats,
|
||||
};
|
||||
use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
|
||||
use crate::read::dedup::LastNonNullIter;
|
||||
@@ -250,7 +250,7 @@ impl Memtable for TimeSeriesMemtable {
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
) -> BTreeMap<usize, MemtableRange> {
|
||||
) -> MemtableRanges {
|
||||
let projection = if let Some(projection) = projection {
|
||||
projection.iter().copied().collect()
|
||||
} else {
|
||||
@@ -268,7 +268,10 @@ impl Memtable for TimeSeriesMemtable {
|
||||
});
|
||||
let context = Arc::new(MemtableRangeContext::new(self.id, builder));
|
||||
|
||||
[(0, MemtableRange::new(context))].into()
|
||||
MemtableRanges {
|
||||
ranges: [(0, MemtableRange::new(context))].into(),
|
||||
stats: self.stats(),
|
||||
}
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
|
||||
@@ -24,7 +24,7 @@ use store_api::region_engine::PartitionRange;
|
||||
|
||||
use crate::cache::CacheManager;
|
||||
use crate::error::Result;
|
||||
use crate::memtable::{MemtableRange, MemtableRef};
|
||||
use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats};
|
||||
use crate::read::scan_region::ScanInput;
|
||||
use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
|
||||
use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
|
||||
@@ -175,7 +175,7 @@ impl RangeMeta {
|
||||
}
|
||||
}
|
||||
|
||||
fn push_unordered_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec<RangeMeta>) {
|
||||
fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
|
||||
// For append mode, we can parallelize reading memtables.
|
||||
for (memtable_index, memtable) in memtables.iter().enumerate() {
|
||||
let stats = memtable.stats();
|
||||
@@ -270,7 +270,7 @@ impl RangeMeta {
|
||||
}
|
||||
}
|
||||
|
||||
fn push_seq_mem_ranges(memtables: &[MemtableRef], ranges: &mut Vec<RangeMeta>) {
|
||||
fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
|
||||
// For non append-only mode, each range only contains one memtable by default.
|
||||
for (i, memtable) in memtables.iter().enumerate() {
|
||||
let stats = memtable.stats();
|
||||
@@ -421,29 +421,38 @@ impl FileRangeBuilder {
|
||||
/// Builder to create mem ranges.
|
||||
pub(crate) struct MemRangeBuilder {
|
||||
/// Ranges of a memtable.
|
||||
row_groups: BTreeMap<usize, MemtableRange>,
|
||||
ranges: MemtableRanges,
|
||||
}
|
||||
|
||||
impl MemRangeBuilder {
|
||||
/// Builds a mem range builder from row groups.
|
||||
pub(crate) fn new(row_groups: BTreeMap<usize, MemtableRange>) -> Self {
|
||||
Self { row_groups }
|
||||
pub(crate) fn new(ranges: MemtableRanges) -> Self {
|
||||
Self { ranges }
|
||||
}
|
||||
|
||||
/// Builds mem ranges to read in the memtable.
|
||||
/// Negative `row_group_index` indicates all row groups.
|
||||
fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[MemtableRange; 2]>) {
|
||||
pub(crate) fn build_ranges(
|
||||
&self,
|
||||
row_group_index: i64,
|
||||
ranges: &mut SmallVec<[MemtableRange; 2]>,
|
||||
) {
|
||||
if row_group_index >= 0 {
|
||||
let row_group_index = row_group_index as usize;
|
||||
// Scans one row group.
|
||||
let Some(range) = self.row_groups.get(&row_group_index) else {
|
||||
let Some(range) = self.ranges.ranges.get(&row_group_index) else {
|
||||
return;
|
||||
};
|
||||
ranges.push(range.clone());
|
||||
} else {
|
||||
ranges.extend(self.row_groups.values().cloned());
|
||||
ranges.extend(self.ranges.ranges.values().cloned());
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns the statistics of the memtable.
|
||||
pub(crate) fn stats(&self) -> &MemtableStats {
|
||||
&self.ranges.stats
|
||||
}
|
||||
}
|
||||
|
||||
/// List to manages the builders to create file ranges.
|
||||
@@ -451,18 +460,15 @@ impl MemRangeBuilder {
|
||||
/// the list to different streams in the same partition.
|
||||
pub(crate) struct RangeBuilderList {
|
||||
num_memtables: usize,
|
||||
mem_builders: Mutex<Vec<Option<MemRangeBuilder>>>,
|
||||
file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
|
||||
}
|
||||
|
||||
impl RangeBuilderList {
|
||||
/// Creates a new [ReaderBuilderList] with the given number of memtables and files.
|
||||
pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
|
||||
let mem_builders = (0..num_memtables).map(|_| None).collect();
|
||||
let file_builders = (0..num_files).map(|_| None).collect();
|
||||
Self {
|
||||
num_memtables,
|
||||
mem_builders: Mutex::new(mem_builders),
|
||||
file_builders: Mutex::new(file_builders),
|
||||
}
|
||||
}
|
||||
@@ -488,26 +494,6 @@ impl RangeBuilderList {
|
||||
Ok(ranges)
|
||||
}
|
||||
|
||||
/// Builds mem ranges to read the row group at `index`.
|
||||
pub(crate) fn build_mem_ranges(
|
||||
&self,
|
||||
input: &ScanInput,
|
||||
index: RowGroupIndex,
|
||||
) -> SmallVec<[MemtableRange; 2]> {
|
||||
let mut ranges = SmallVec::new();
|
||||
let mut mem_builders = self.mem_builders.lock().unwrap();
|
||||
match &mut mem_builders[index.index] {
|
||||
Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
|
||||
None => {
|
||||
let builder = input.prune_memtable(index.index);
|
||||
builder.build_ranges(index.row_group_index, &mut ranges);
|
||||
mem_builders[index.index] = Some(builder);
|
||||
}
|
||||
}
|
||||
|
||||
ranges
|
||||
}
|
||||
|
||||
fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
|
||||
let file_builders = self.file_builders.lock().unwrap();
|
||||
file_builders[index].clone()
|
||||
|
||||
@@ -24,6 +24,7 @@ use common_recordbatch::SendableRecordBatchStream;
|
||||
use common_telemetry::{debug, error, tracing, warn};
|
||||
use common_time::range::TimestampRange;
|
||||
use datafusion_expr::utils::expr_to_columns;
|
||||
use smallvec::SmallVec;
|
||||
use store_api::region_engine::{PartitionRange, RegionScannerRef};
|
||||
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
|
||||
use table::predicate::{build_time_range_predicate, Predicate};
|
||||
@@ -35,7 +36,7 @@ use crate::cache::file_cache::FileCacheRef;
|
||||
use crate::cache::CacheManagerRef;
|
||||
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
|
||||
use crate::error::Result;
|
||||
use crate::memtable::MemtableRef;
|
||||
use crate::memtable::MemtableRange;
|
||||
use crate::metrics::READ_SST_COUNT;
|
||||
use crate::read::compat::{self, CompatBatch};
|
||||
use crate::read::projection::ProjectionMapper;
|
||||
@@ -328,6 +329,14 @@ impl ScanRegion {
|
||||
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
|
||||
None => ProjectionMapper::all(&self.version.metadata)?,
|
||||
};
|
||||
// Get memtable ranges to scan.
|
||||
let memtables = memtables
|
||||
.into_iter()
|
||||
.map(|mem| {
|
||||
let ranges = mem.ranges(Some(mapper.column_ids()), Some(predicate.clone()));
|
||||
MemRangeBuilder::new(ranges)
|
||||
})
|
||||
.collect();
|
||||
|
||||
let input = ScanInput::new(self.access_layer, mapper)
|
||||
.with_time_range(Some(time_range))
|
||||
@@ -484,8 +493,8 @@ pub(crate) struct ScanInput {
|
||||
time_range: Option<TimestampRange>,
|
||||
/// Predicate to push down.
|
||||
pub(crate) predicate: Option<Predicate>,
|
||||
/// Memtables to scan.
|
||||
pub(crate) memtables: Vec<MemtableRef>,
|
||||
/// Memtable range builders for memtables in the time range..
|
||||
pub(crate) memtables: Vec<MemRangeBuilder>,
|
||||
/// Handles to SST files to scan.
|
||||
pub(crate) files: Vec<FileHandle>,
|
||||
/// Cache.
|
||||
@@ -547,9 +556,9 @@ impl ScanInput {
|
||||
self
|
||||
}
|
||||
|
||||
/// Sets memtables to read.
|
||||
/// Sets memtable range builders.
|
||||
#[must_use]
|
||||
pub(crate) fn with_memtables(mut self, memtables: Vec<MemtableRef>) -> Self {
|
||||
pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
|
||||
self.memtables = memtables;
|
||||
self
|
||||
}
|
||||
@@ -667,11 +676,12 @@ impl ScanInput {
|
||||
Ok(sources)
|
||||
}
|
||||
|
||||
/// Prunes a memtable to scan and returns the builder to build readers.
|
||||
pub(crate) fn prune_memtable(&self, mem_index: usize) -> MemRangeBuilder {
|
||||
let memtable = &self.memtables[mem_index];
|
||||
let row_groups = memtable.ranges(Some(self.mapper.column_ids()), self.predicate.clone());
|
||||
MemRangeBuilder::new(row_groups)
|
||||
/// Builds memtable ranges to scan by `index`.
|
||||
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
|
||||
let memtable = &self.memtables[index.index];
|
||||
let mut ranges = SmallVec::new();
|
||||
memtable.build_ranges(index.row_group_index, &mut ranges);
|
||||
ranges
|
||||
}
|
||||
|
||||
/// Prunes a file to scan and returns the builder to build readers.
|
||||
|
||||
@@ -137,10 +137,9 @@ pub(crate) fn scan_mem_ranges(
|
||||
part_metrics: PartitionMetrics,
|
||||
index: RowGroupIndex,
|
||||
time_range: FileTimeRange,
|
||||
range_builder_list: Arc<RangeBuilderList>,
|
||||
) -> impl Stream<Item = Result<Batch>> {
|
||||
try_stream! {
|
||||
let ranges = range_builder_list.build_mem_ranges(&stream_ctx.input, index);
|
||||
let ranges = stream_ctx.input.build_mem_ranges(index);
|
||||
part_metrics.inc_num_mem_ranges(ranges.len());
|
||||
for range in ranges {
|
||||
let build_reader_start = Instant::now();
|
||||
|
||||
@@ -403,7 +403,6 @@ fn build_sources(
|
||||
part_metrics.clone(),
|
||||
*index,
|
||||
range_meta.time_range,
|
||||
range_builder_list.clone(),
|
||||
);
|
||||
Box::pin(stream) as _
|
||||
} else {
|
||||
|
||||
@@ -97,7 +97,6 @@ impl UnorderedScan {
|
||||
part_metrics.clone(),
|
||||
*index,
|
||||
range_meta.time_range,
|
||||
range_builder_list.clone(),
|
||||
);
|
||||
for await batch in stream {
|
||||
yield batch;
|
||||
|
||||
@@ -35,7 +35,7 @@ use crate::memtable::key_values::KeyValue;
|
||||
use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer};
|
||||
use crate::memtable::{
|
||||
BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange,
|
||||
MemtableRef, MemtableStats,
|
||||
MemtableRanges, MemtableRef, MemtableStats,
|
||||
};
|
||||
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
|
||||
|
||||
@@ -93,8 +93,8 @@ impl Memtable for EmptyMemtable {
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<Predicate>,
|
||||
) -> BTreeMap<usize, MemtableRange> {
|
||||
BTreeMap::new()
|
||||
) -> MemtableRanges {
|
||||
MemtableRanges::default()
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
|
||||
Reference in New Issue
Block a user