mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-07 05:42:57 +00:00
fix: prune batches from memtable by time range (#4913)
* feat: add an iter to prune by time range * feat: filter rows from mem range
This commit is contained in:
@@ -259,3 +259,56 @@ async fn test_prune_memtable_complex_expr() {
|
||||
+-------+---------+---------------------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_mem_range_prune() {
|
||||
let mut env = TestEnv::new();
|
||||
let engine = env.create_engine(MitoConfig::default()).await;
|
||||
|
||||
let region_id = RegionId::new(1, 1);
|
||||
let request = CreateRequestBuilder::new().build();
|
||||
|
||||
let column_schemas = rows_schema(&request);
|
||||
|
||||
engine
|
||||
.handle_request(region_id, RegionRequest::Create(request))
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows(5, 8),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
// Starts scan and gets the memtable time range.
|
||||
let stream = engine
|
||||
.scan_to_stream(region_id, ScanRequest::default())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
put_rows(
|
||||
&engine,
|
||||
region_id,
|
||||
Rows {
|
||||
schema: column_schemas.clone(),
|
||||
rows: build_rows(10, 12),
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
||||
let batches = RecordBatches::try_collect(stream).await.unwrap();
|
||||
let expected = "\
|
||||
+-------+---------+---------------------+
|
||||
| tag_0 | field_0 | ts |
|
||||
+-------+---------+---------------------+
|
||||
| 5 | 5.0 | 1970-01-01T00:00:05 |
|
||||
| 6 | 6.0 | 1970-01-01T00:00:06 |
|
||||
| 7 | 7.0 | 1970-01-01T00:00:07 |
|
||||
+-------+---------+---------------------+";
|
||||
assert_eq!(expected, batches.pretty_print().unwrap());
|
||||
}
|
||||
|
||||
@@ -34,8 +34,10 @@ pub use crate::memtable::key_values::KeyValues;
|
||||
use crate::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtableBuilder};
|
||||
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
|
||||
use crate::metrics::WRITE_BUFFER_BYTES;
|
||||
use crate::read::prune::PruneTimeIterator;
|
||||
use crate::read::Batch;
|
||||
use crate::region::options::{MemtableOptions, MergeMode};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
|
||||
pub mod bulk;
|
||||
pub mod key_values;
|
||||
@@ -355,8 +357,10 @@ impl MemtableRange {
|
||||
}
|
||||
|
||||
/// Builds an iterator to read the range.
|
||||
pub fn build_iter(&self) -> Result<BoxedBatchIterator> {
|
||||
self.context.builder.build()
|
||||
/// Filters the result by the specific time range.
|
||||
pub fn build_iter(&self, time_range: FileTimeRange) -> Result<BoxedBatchIterator> {
|
||||
let iter = self.context.builder.build()?;
|
||||
Ok(Box::new(PruneTimeIterator::new(iter, time_range)))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,9 +12,15 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use common_time::Timestamp;
|
||||
use datatypes::scalars::ScalarVectorBuilder;
|
||||
use datatypes::vectors::BooleanVectorBuilder;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::memtable::BoxedBatchIterator;
|
||||
use crate::read::last_row::RowGroupLastRowCachedReader;
|
||||
use crate::read::{Batch, BatchReader};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
use crate::sst::parquet::file_range::FileRangeContextRef;
|
||||
use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
|
||||
|
||||
@@ -112,3 +118,214 @@ impl PruneReader {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An iterator that prunes batches by time range.
|
||||
pub(crate) struct PruneTimeIterator {
|
||||
iter: BoxedBatchIterator,
|
||||
time_range: FileTimeRange,
|
||||
}
|
||||
|
||||
impl PruneTimeIterator {
|
||||
/// Creates a new `PruneTimeIterator` with the given iterator and time range.
|
||||
pub(crate) fn new(iter: BoxedBatchIterator, time_range: FileTimeRange) -> Self {
|
||||
Self { iter, time_range }
|
||||
}
|
||||
|
||||
/// Prune batch by time range.
|
||||
fn prune(&self, mut batch: Batch) -> Result<Batch> {
|
||||
if batch.is_empty() {
|
||||
return Ok(batch);
|
||||
}
|
||||
|
||||
// fast path, the batch is within the time range.
|
||||
// Note that the time range is inclusive.
|
||||
if self.time_range.0 <= batch.first_timestamp().unwrap()
|
||||
&& batch.last_timestamp().unwrap() <= self.time_range.1
|
||||
{
|
||||
return Ok(batch);
|
||||
}
|
||||
|
||||
// slow path, prune the batch by time range.
|
||||
// Note that the timestamp precision may be different from the time range.
|
||||
// Safety: We know this is the timestamp type.
|
||||
let unit = batch
|
||||
.timestamps()
|
||||
.data_type()
|
||||
.as_timestamp()
|
||||
.unwrap()
|
||||
.unit();
|
||||
let mut filter_builder = BooleanVectorBuilder::with_capacity(batch.timestamps().len());
|
||||
let timestamps = batch.timestamps_native().unwrap();
|
||||
for ts in timestamps {
|
||||
let ts = Timestamp::new(*ts, unit);
|
||||
if self.time_range.0 <= ts && ts <= self.time_range.1 {
|
||||
filter_builder.push(Some(true));
|
||||
} else {
|
||||
filter_builder.push(Some(false));
|
||||
}
|
||||
}
|
||||
let filter = filter_builder.finish();
|
||||
|
||||
batch.filter(&filter)?;
|
||||
Ok(batch)
|
||||
}
|
||||
|
||||
// Prune and return the next non-empty batch.
|
||||
fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
|
||||
while let Some(batch) = self.iter.next() {
|
||||
let batch = batch?;
|
||||
let pruned_batch = self.prune(batch)?;
|
||||
if !pruned_batch.is_empty() {
|
||||
return Ok(Some(pruned_batch));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
impl Iterator for PruneTimeIterator {
|
||||
type Item = Result<Batch>;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.next_non_empty_batch().transpose()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use api::v1::OpType;
|
||||
|
||||
use super::*;
|
||||
use crate::test_util::new_batch;
|
||||
|
||||
#[test]
|
||||
fn test_prune_time_iter_empty() {
|
||||
let input = [];
|
||||
let iter = input.into_iter().map(Ok);
|
||||
let iter = PruneTimeIterator::new(
|
||||
Box::new(iter),
|
||||
(
|
||||
Timestamp::new_millisecond(0),
|
||||
Timestamp::new_millisecond(1000),
|
||||
),
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert!(actual.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prune_time_iter_filter() {
|
||||
let input = [
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[10, 11],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[110, 111],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[15, 16],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[115, 116],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[17, 18],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[117, 118],
|
||||
),
|
||||
];
|
||||
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let iter = PruneTimeIterator::new(
|
||||
Box::new(iter),
|
||||
(
|
||||
Timestamp::new_millisecond(10),
|
||||
Timestamp::new_millisecond(15),
|
||||
),
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert_eq!(
|
||||
actual,
|
||||
[
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[10, 11],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[110, 111],
|
||||
),
|
||||
new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
|
||||
]
|
||||
);
|
||||
|
||||
let iter = input.clone().into_iter().map(Ok);
|
||||
let iter = PruneTimeIterator::new(
|
||||
Box::new(iter),
|
||||
(
|
||||
Timestamp::new_millisecond(11),
|
||||
Timestamp::new_millisecond(20),
|
||||
),
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert_eq!(
|
||||
actual,
|
||||
[
|
||||
new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[15, 16],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[115, 116],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[17, 18],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[117, 118],
|
||||
),
|
||||
]
|
||||
);
|
||||
|
||||
let iter = input.into_iter().map(Ok);
|
||||
let iter = PruneTimeIterator::new(
|
||||
Box::new(iter),
|
||||
(
|
||||
Timestamp::new_millisecond(10),
|
||||
Timestamp::new_millisecond(18),
|
||||
),
|
||||
);
|
||||
let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
|
||||
assert_eq!(
|
||||
actual,
|
||||
[
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[10, 11],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[110, 111],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[15, 16],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[115, 116],
|
||||
),
|
||||
new_batch(
|
||||
b"k1",
|
||||
&[17, 18],
|
||||
&[20, 20],
|
||||
&[OpType::Put, OpType::Put],
|
||||
&[117, 118],
|
||||
),
|
||||
]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ use crate::error::Result;
|
||||
use crate::read::range::RowGroupIndex;
|
||||
use crate::read::scan_region::StreamContext;
|
||||
use crate::read::{Batch, ScannerMetrics, Source};
|
||||
use crate::sst::file::FileTimeRange;
|
||||
use crate::sst::parquet::reader::ReaderMetrics;
|
||||
|
||||
struct PartitionMetricsInner {
|
||||
@@ -128,13 +129,14 @@ pub(crate) fn scan_mem_ranges(
|
||||
stream_ctx: Arc<StreamContext>,
|
||||
part_metrics: PartitionMetrics,
|
||||
index: RowGroupIndex,
|
||||
time_range: FileTimeRange,
|
||||
) -> impl Stream<Item = Result<Batch>> {
|
||||
try_stream! {
|
||||
let ranges = stream_ctx.build_mem_ranges(index);
|
||||
part_metrics.inc_num_mem_ranges(ranges.len());
|
||||
for range in ranges {
|
||||
let build_reader_start = Instant::now();
|
||||
let iter = range.build_iter()?;
|
||||
let iter = range.build_iter(time_range)?;
|
||||
part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
|
||||
|
||||
let mut source = Source::Iter(iter);
|
||||
|
||||
@@ -355,7 +355,12 @@ fn build_sources(
|
||||
sources.reserve(range_meta.row_group_indices.len());
|
||||
for index in &range_meta.row_group_indices {
|
||||
let stream = if stream_ctx.is_mem_range_index(*index) {
|
||||
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
|
||||
let stream = scan_mem_ranges(
|
||||
stream_ctx.clone(),
|
||||
part_metrics.clone(),
|
||||
*index,
|
||||
range_meta.time_range,
|
||||
);
|
||||
Box::pin(stream) as _
|
||||
} else {
|
||||
let read_type = if compaction {
|
||||
|
||||
@@ -89,7 +89,7 @@ impl UnorderedScan {
|
||||
let range_meta = &stream_ctx.ranges[part_range_id];
|
||||
for index in &range_meta.row_group_indices {
|
||||
if stream_ctx.is_mem_range_index(*index) {
|
||||
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
|
||||
let stream = scan_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index, range_meta.time_range);
|
||||
for await batch in stream {
|
||||
yield batch;
|
||||
}
|
||||
|
||||
@@ -124,16 +124,6 @@ impl MemtableBuilder for EmptyMemtableBuilder {
|
||||
}
|
||||
}
|
||||
|
||||
/// Empty iterator builder.
|
||||
#[derive(Default)]
|
||||
pub(crate) struct EmptyIterBuilder {}
|
||||
|
||||
impl IterBuilder for EmptyIterBuilder {
|
||||
fn build(&self) -> Result<BoxedBatchIterator> {
|
||||
Ok(Box::new(std::iter::empty()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a region metadata to test memtable with default pk.
|
||||
///
|
||||
/// The schema is `k0, k1, ts, v0, v1` and pk is `k0, k1`.
|
||||
|
||||
Reference in New Issue
Block a user