mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-05-26 09:50:40 +00:00
refactor: remove Memtable::iter (#7809)
* refactor: remove Memtable::iter Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> * fix: review comments Signed-off-by: Lei, HUANG <mrsatangel@gmail.com> --------- Signed-off-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
@@ -28,7 +28,7 @@ use mito2::memtable::bulk::part_reader::BulkPartBatchIter;
|
||||
use mito2::memtable::bulk::{BulkMemtable, BulkMemtableConfig};
|
||||
use mito2::memtable::partition_tree::{PartitionTreeConfig, PartitionTreeMemtable};
|
||||
use mito2::memtable::time_series::TimeSeriesMemtable;
|
||||
use mito2::memtable::{KeyValues, Memtable, RangesOptions};
|
||||
use mito2::memtable::{IterBuilder, KeyValues, Memtable, RangesOptions};
|
||||
use mito2::read::flat_merge::FlatMergeIterator;
|
||||
use mito2::read::scan_region::PredicateGroup;
|
||||
use mito2::region::options::MergeMode;
|
||||
@@ -105,7 +105,11 @@ fn full_scan(c: &mut Criterion) {
|
||||
}
|
||||
|
||||
b.iter(|| {
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
for batch in iter {
|
||||
let _batch = batch.unwrap();
|
||||
}
|
||||
@@ -145,7 +149,17 @@ fn filter_1_host(c: &mut Criterion) {
|
||||
let predicate = generator.random_host_filter();
|
||||
|
||||
b.iter(|| {
|
||||
let iter = memtable.iter(None, Some(predicate.clone()), None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(
|
||||
None,
|
||||
RangesOptions {
|
||||
predicate: PredicateGroup::new(&metadata, predicate.exprs()).unwrap(),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
for batch in iter {
|
||||
let _batch = batch.unwrap();
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ use criterion::{Criterion, criterion_group, criterion_main};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::schema::ColumnSchema;
|
||||
use mito2::memtable::simple_bulk_memtable::SimpleBulkMemtable;
|
||||
use mito2::memtable::{KeyValues, Memtable, MemtableRanges, RangesOptions};
|
||||
use mito2::memtable::{IterBuilder, KeyValues, Memtable, MemtableRanges, RangesOptions};
|
||||
use mito2::read;
|
||||
use mito2::read::Source;
|
||||
use mito2::read::dedup::DedupReader;
|
||||
@@ -156,7 +156,11 @@ async fn flush(mem: &SimpleBulkMemtable) {
|
||||
}
|
||||
|
||||
async fn flush_original(mem: &SimpleBulkMemtable) {
|
||||
let iter = mem.iter(None, None, None).unwrap();
|
||||
let iter = mem
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
for b in iter {
|
||||
black_box(b.unwrap());
|
||||
}
|
||||
|
||||
@@ -28,6 +28,7 @@ use mito_codec::key_values::KeyValue;
|
||||
pub use mito_codec::key_values::KeyValues;
|
||||
use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use snafu::ensure;
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceNumber, SequenceRange};
|
||||
|
||||
@@ -231,10 +232,17 @@ impl MemtableRanges {
|
||||
|
||||
impl IterBuilder for MemtableRanges {
|
||||
fn build(&self, _metrics: Option<MemScanMetrics>) -> Result<BoxedBatchIterator> {
|
||||
UnsupportedOperationSnafu {
|
||||
err_msg: "MemtableRanges does not support build iterator",
|
||||
}
|
||||
.fail()
|
||||
ensure!(
|
||||
self.ranges.len() == 1,
|
||||
UnsupportedOperationSnafu {
|
||||
err_msg: format!(
|
||||
"Building an iterator from MemtableRanges expects 1 range, but got {}",
|
||||
self.ranges.len()
|
||||
),
|
||||
}
|
||||
);
|
||||
|
||||
self.ranges.values().next().unwrap().build_iter()
|
||||
}
|
||||
|
||||
fn is_record_batch(&self) -> bool {
|
||||
@@ -256,20 +264,6 @@ pub trait Memtable: Send + Sync + fmt::Debug {
|
||||
/// Writes an encoded batch of into memtable.
|
||||
fn write_bulk(&self, part: crate::memtable::bulk::part::BulkPart) -> Result<()>;
|
||||
|
||||
/// Scans the memtable.
|
||||
/// `projection` selects columns to read, `None` means reading all columns.
|
||||
/// `filters` are the predicates to be pushed down to memtable.
|
||||
///
|
||||
/// # Note
|
||||
/// This method should only be used for tests.
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<table::predicate::Predicate>,
|
||||
sequence: Option<SequenceRange>,
|
||||
) -> Result<BoxedBatchIterator>;
|
||||
|
||||
/// Returns the ranges in the memtable.
|
||||
///
|
||||
/// The returned map contains the range id and the range after applying the predicate.
|
||||
|
||||
@@ -462,16 +462,6 @@ impl Memtable for BulkMemtable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<table::predicate::Predicate>,
|
||||
_sequence: Option<SequenceRange>,
|
||||
) -> Result<crate::memtable::BoxedBatchIterator> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
|
||||
@@ -177,16 +177,6 @@ impl Memtable for PartitionTreeMemtable {
|
||||
.fail()
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
predicate: Option<Predicate>,
|
||||
sequence: Option<SequenceRange>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
self.tree.read(projection, predicate, sequence, None)
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
@@ -396,8 +386,6 @@ mod tests {
|
||||
use api::v1::{Mutation, OpType, Rows, SemanticType};
|
||||
use common_query::prelude::{greptime_timestamp, greptime_value};
|
||||
use common_time::Timestamp;
|
||||
use datafusion_common::Column;
|
||||
use datafusion_expr::{BinaryExpr, Expr, Literal, Operator};
|
||||
use datatypes::data_type::ConcreteDataType;
|
||||
use datatypes::prelude::Vector;
|
||||
use datatypes::scalars::ScalarVector;
|
||||
@@ -548,7 +536,10 @@ mod tests {
|
||||
let expect = (0..100).collect::<Vec<_>>();
|
||||
let kvs = memtable_util::build_key_values(&metadata, "hello".to_string(), 10, &expect, 1);
|
||||
memtable.write(&kvs).unwrap();
|
||||
let iter = memtable.iter(Some(&[3]), None, None).unwrap();
|
||||
let ranges = memtable
|
||||
.ranges(Some(&[3]), RangesOptions::default())
|
||||
.unwrap();
|
||||
let iter = ranges.build(None).unwrap();
|
||||
|
||||
let mut v0_all = vec![];
|
||||
for res in iter {
|
||||
@@ -625,41 +616,6 @@ mod tests {
|
||||
assert_eq!(expect, read);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_memtable_filter() {
|
||||
let metadata = Arc::new(memtable_util::metadata_with_primary_key(vec![0, 1], false));
|
||||
// Try to build a memtable via the builder.
|
||||
let memtable = PartitionTreeMemtableBuilder::new(
|
||||
PartitionTreeConfig {
|
||||
index_max_keys_per_shard: 40,
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
)
|
||||
.build(1, &metadata);
|
||||
|
||||
for i in 0..100 {
|
||||
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
|
||||
let kvs =
|
||||
memtable_util::build_key_values(&metadata, "hello".to_string(), i, ×tamps, 1);
|
||||
memtable.write(&kvs).unwrap();
|
||||
}
|
||||
|
||||
for i in 0..100 {
|
||||
let timestamps: Vec<_> = (0..10).map(|v| i as i64 * 1000 + v).collect();
|
||||
let expr = Expr::BinaryExpr(BinaryExpr {
|
||||
left: Box::new(Expr::Column(Column::from_name("k1"))),
|
||||
op: Operator::Eq,
|
||||
right: Box::new((i as u32).lit()),
|
||||
});
|
||||
let iter = memtable
|
||||
.iter(None, Some(Predicate::new(vec![expr])), None)
|
||||
.unwrap();
|
||||
let read = collect_iter_timestamps(iter);
|
||||
assert_eq!(timestamps, read);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_deserialize_config() {
|
||||
let config = PartitionTreeConfig {
|
||||
@@ -811,7 +767,11 @@ mod tests {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut reader = new_memtable.iter(None, None, None).unwrap();
|
||||
let mut reader = new_memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = reader.next().unwrap().unwrap();
|
||||
let pk = codec.decode(batch.primary_key()).unwrap().into_dense();
|
||||
if let Value::String(s) = &pk[2] {
|
||||
@@ -916,7 +876,14 @@ mod tests {
|
||||
.unwrap();
|
||||
memtable.freeze().unwrap();
|
||||
assert_eq!(
|
||||
collect_kvs(memtable.iter(None, None, None).unwrap(), &metadata),
|
||||
collect_kvs(
|
||||
memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap(),
|
||||
&metadata
|
||||
),
|
||||
('a'..'h').map(|c| (c.to_string(), c.to_string())).collect()
|
||||
);
|
||||
let forked = memtable.fork(2, &metadata);
|
||||
@@ -925,7 +892,14 @@ mod tests {
|
||||
forked.write(&key_values(&metadata, keys.iter())).unwrap();
|
||||
forked.freeze().unwrap();
|
||||
assert_eq!(
|
||||
collect_kvs(forked.iter(None, None, None).unwrap(), &metadata),
|
||||
collect_kvs(
|
||||
forked
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap(),
|
||||
&metadata
|
||||
),
|
||||
keys.iter()
|
||||
.map(|c| (c.to_string(), c.to_string()))
|
||||
.collect()
|
||||
@@ -936,7 +910,14 @@ mod tests {
|
||||
let keys = ["g", "e", "a", "f", "b", "c", "h"];
|
||||
forked2.write(&key_values(&metadata, keys.iter())).unwrap();
|
||||
|
||||
let kvs = collect_kvs(forked2.iter(None, None, None).unwrap(), &metadata);
|
||||
let kvs = collect_kvs(
|
||||
forked2
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap(),
|
||||
&metadata,
|
||||
);
|
||||
let expected = keys
|
||||
.iter()
|
||||
.map(|c| (c.to_string(), c.to_string()))
|
||||
|
||||
@@ -213,22 +213,6 @@ impl Memtable for SimpleBulkMemtable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
_predicate: Option<table::predicate::Predicate>,
|
||||
sequence: Option<store_api::storage::SequenceRange>,
|
||||
) -> error::Result<BoxedBatchIterator> {
|
||||
let iter = self.create_iter(projection, sequence)?.build(None)?;
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
Ok(Box::new(iter))
|
||||
} else {
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
@@ -526,7 +510,11 @@ mod tests {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(2, batch.num_rows());
|
||||
assert_eq!(2, batch.fields().len());
|
||||
@@ -551,7 +539,11 @@ mod tests {
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(1, batch.num_rows());
|
||||
assert_eq!(2, batch.fields().len());
|
||||
@@ -565,7 +557,11 @@ mod tests {
|
||||
|
||||
// Only project column 2 (f1)
|
||||
let projection = vec![2];
|
||||
let mut iter = memtable.iter(Some(&projection), None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(Some(&projection), RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
|
||||
assert_eq!(1, batch.num_rows());
|
||||
@@ -592,7 +588,11 @@ mod tests {
|
||||
OpType::Put,
|
||||
))
|
||||
.unwrap();
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
|
||||
assert_eq!(1, batch.num_rows()); // deduped to 1 row
|
||||
@@ -611,7 +611,11 @@ mod tests {
|
||||
let kv = kvs.iter().next().unwrap();
|
||||
memtable.write_one(kv).unwrap();
|
||||
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(1, batch.num_rows());
|
||||
}
|
||||
@@ -745,7 +749,11 @@ mod tests {
|
||||
};
|
||||
memtable.write_bulk(part).unwrap();
|
||||
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(2, batch.num_rows());
|
||||
|
||||
@@ -764,7 +772,11 @@ mod tests {
|
||||
OpType::Put,
|
||||
);
|
||||
memtable.write(&kvs).unwrap();
|
||||
let mut iter = memtable.iter(None, None, None).unwrap();
|
||||
let mut iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(3, batch.num_rows());
|
||||
assert_eq!(
|
||||
@@ -854,7 +866,15 @@ mod tests {
|
||||
|
||||
// Filter with sequence 0 should only return first write
|
||||
let mut iter = memtable
|
||||
.iter(None, None, Some(SequenceRange::LtEq { max: 0 }))
|
||||
.ranges(
|
||||
None,
|
||||
RangesOptions {
|
||||
sequence: Some(SequenceRange::LtEq { max: 0 }),
|
||||
..Default::default()
|
||||
},
|
||||
)
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let batch = iter.next().unwrap().unwrap();
|
||||
assert_eq!(1, batch.num_rows());
|
||||
|
||||
@@ -12,98 +12,12 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::collections::HashSet;
|
||||
use std::time::Instant;
|
||||
|
||||
use store_api::metadata::RegionMetadataRef;
|
||||
use store_api::storage::{ColumnId, SequenceRange};
|
||||
|
||||
use crate::error;
|
||||
use crate::memtable::simple_bulk_memtable::{Iter, SimpleBulkMemtable};
|
||||
use crate::memtable::time_series::Values;
|
||||
use crate::memtable::{BoxedBatchIterator, IterBuilder, MemScanMetrics};
|
||||
use crate::read::dedup::LastNonNullIter;
|
||||
use crate::region::options::MergeMode;
|
||||
use crate::memtable::simple_bulk_memtable::SimpleBulkMemtable;
|
||||
|
||||
impl SimpleBulkMemtable {
|
||||
pub fn region_metadata(&self) -> RegionMetadataRef {
|
||||
self.region_metadata.clone()
|
||||
}
|
||||
|
||||
pub(crate) fn create_iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
sequence: Option<SequenceRange>,
|
||||
) -> error::Result<BatchIterBuilderDeprecated> {
|
||||
let mut series = self.series.write().unwrap();
|
||||
|
||||
let values = if series.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(series.compact(&self.region_metadata)?.clone())
|
||||
};
|
||||
let projection = self.build_projection(projection);
|
||||
Ok(BatchIterBuilderDeprecated {
|
||||
region_metadata: self.region_metadata.clone(),
|
||||
values,
|
||||
projection,
|
||||
dedup: self.dedup,
|
||||
sequence,
|
||||
merge_mode: self.merge_mode,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct BatchIterBuilderDeprecated {
|
||||
region_metadata: RegionMetadataRef,
|
||||
values: Option<Values>,
|
||||
projection: HashSet<ColumnId>,
|
||||
sequence: Option<SequenceRange>,
|
||||
dedup: bool,
|
||||
merge_mode: MergeMode,
|
||||
}
|
||||
|
||||
impl IterBuilder for BatchIterBuilderDeprecated {
|
||||
fn build(&self, metrics: Option<MemScanMetrics>) -> error::Result<BoxedBatchIterator> {
|
||||
let start_time = Instant::now();
|
||||
let Some(values) = self.values.clone() else {
|
||||
return Ok(Box::new(Iter { batch: None }));
|
||||
};
|
||||
|
||||
let maybe_batch = values
|
||||
.to_batch(
|
||||
&[],
|
||||
&self.region_metadata,
|
||||
&self.projection,
|
||||
self.sequence,
|
||||
self.dedup,
|
||||
self.merge_mode,
|
||||
)
|
||||
.map(Some)
|
||||
.transpose();
|
||||
|
||||
// Collect metrics from the batch
|
||||
if let Some(metrics) = metrics {
|
||||
let (num_rows, num_batches) = match &maybe_batch {
|
||||
Some(Ok(batch)) => (batch.num_rows(), 1),
|
||||
_ => (0, 0),
|
||||
};
|
||||
let inner = crate::memtable::MemScanMetricsData {
|
||||
total_series: 1,
|
||||
num_rows,
|
||||
num_batches,
|
||||
scan_cost: start_time.elapsed(),
|
||||
};
|
||||
metrics.merge_inner(&inner);
|
||||
}
|
||||
|
||||
let iter = Iter { batch: maybe_batch };
|
||||
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
Ok(Box::new(LastNonNullIter::new(iter)))
|
||||
} else {
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -827,6 +827,7 @@ mod tests {
|
||||
use super::*;
|
||||
use crate::memtable::partition_tree::PartitionTreeMemtableBuilder;
|
||||
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
|
||||
use crate::memtable::{IterBuilder, RangesOptions};
|
||||
use crate::test_util::memtable_util::{self, collect_iter_timestamps};
|
||||
|
||||
#[test]
|
||||
@@ -852,7 +853,11 @@ mod tests {
|
||||
partitions.list_memtables(&mut memtables);
|
||||
assert_eq!(0, memtables[0].id());
|
||||
|
||||
let iter = memtables[0].iter(None, None, None).unwrap();
|
||||
let iter = memtables[0]
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 3000, 5000, 6000, 7000], ×tamps[..]);
|
||||
}
|
||||
@@ -890,7 +895,11 @@ mod tests {
|
||||
|
||||
let mut memtables = Vec::new();
|
||||
partitions.list_memtables(&mut memtables);
|
||||
let iter = memtables[0].iter(None, None, None).unwrap();
|
||||
let iter = memtables[0]
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[0, 2000, 3000, 4000, 5000, 7000], ×tamps[..]);
|
||||
let parts = partitions.list_partitions();
|
||||
@@ -943,7 +952,12 @@ mod tests {
|
||||
let partitions = new_multi_partitions(&metadata);
|
||||
|
||||
let parts = partitions.list_partitions();
|
||||
let iter = parts[0].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[0]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(0, parts[0].memtable.id());
|
||||
assert_eq!(
|
||||
@@ -955,7 +969,12 @@ mod tests {
|
||||
parts[0].time_range.max_timestamp
|
||||
);
|
||||
assert_eq!(&[0, 2000, 3000, 4000], ×tamps[..]);
|
||||
let iter = parts[1].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[1]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
assert_eq!(1, parts[1].memtable.id());
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[5000, 7000], ×tamps[..]);
|
||||
@@ -1273,7 +1292,12 @@ mod tests {
|
||||
|
||||
let parts = partitions.list_partitions();
|
||||
assert_eq!(1, parts.len());
|
||||
let iter = parts[0].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[0]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 2000, 3000], ×tamps[..]);
|
||||
|
||||
@@ -1284,11 +1308,21 @@ mod tests {
|
||||
let parts = partitions.list_partitions();
|
||||
assert_eq!(2, parts.len());
|
||||
// Check first partition [0, 5000)
|
||||
let iter = parts[0].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[0]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 2000, 3000, 4000], ×tamps[..]);
|
||||
// Check second partition [5000, 10000)
|
||||
let iter = parts[1].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[1]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[5000, 6000], ×tamps[..]);
|
||||
|
||||
@@ -1301,7 +1335,12 @@ mod tests {
|
||||
assert_eq!(3, parts.len());
|
||||
|
||||
// Check new partition [10000, 15000)
|
||||
let iter = parts[2].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[2]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[11000, 12000], ×tamps[..]);
|
||||
|
||||
@@ -1314,7 +1353,12 @@ mod tests {
|
||||
|
||||
let parts = partitions.list_partitions();
|
||||
assert_eq!(1, parts.len());
|
||||
let iter = parts[0].memtable.iter(None, None, None).unwrap();
|
||||
let iter = parts[0]
|
||||
.memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let timestamps = collect_iter_timestamps(iter);
|
||||
assert_eq!(&[1000, 5000, 9000], ×tamps[..]);
|
||||
}
|
||||
|
||||
@@ -267,39 +267,6 @@ impl Memtable for TimeSeriesMemtable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
filters: Option<Predicate>,
|
||||
sequence: Option<SequenceRange>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
let projection = if let Some(projection) = projection {
|
||||
projection.iter().copied().collect()
|
||||
} else {
|
||||
self.region_metadata
|
||||
.field_columns()
|
||||
.map(|c| c.column_id)
|
||||
.collect()
|
||||
};
|
||||
|
||||
let iter = self.series_set.iter_series(
|
||||
projection,
|
||||
filters,
|
||||
self.dedup,
|
||||
self.merge_mode,
|
||||
sequence,
|
||||
None,
|
||||
)?;
|
||||
|
||||
if self.merge_mode == MergeMode::LastNonNull {
|
||||
let iter = LastNonNullIter::new(iter);
|
||||
Ok(Box::new(iter))
|
||||
} else {
|
||||
Ok(Box::new(iter))
|
||||
}
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
projection: Option<&[ColumnId]>,
|
||||
@@ -1798,7 +1765,9 @@ mod tests {
|
||||
*expected_ts.entry(ts).or_default() += if dedup { 1 } else { 2 };
|
||||
}
|
||||
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let ranges = memtable.ranges(None, RangesOptions::default()).unwrap();
|
||||
let range = ranges.ranges.into_values().next().unwrap();
|
||||
let iter = range.build_iter().unwrap();
|
||||
let mut read = HashMap::new();
|
||||
|
||||
for ts in iter
|
||||
@@ -1838,7 +1807,11 @@ mod tests {
|
||||
let memtable = TimeSeriesMemtable::new(schema, 42, None, true, MergeMode::LastRow);
|
||||
memtable.write(&kvs).unwrap();
|
||||
|
||||
let iter = memtable.iter(Some(&[3]), None, None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(Some(&[3]), RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
|
||||
let mut v0_all = vec![];
|
||||
|
||||
@@ -1917,7 +1890,11 @@ mod tests {
|
||||
barrier.wait();
|
||||
|
||||
for _ in 0..10 {
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
for batch_result in iter {
|
||||
let _ = batch_result.unwrap();
|
||||
}
|
||||
@@ -1936,7 +1913,11 @@ mod tests {
|
||||
handle.join().unwrap();
|
||||
}
|
||||
|
||||
let iter = memtable.iter(None, None, None).unwrap();
|
||||
let iter = memtable
|
||||
.ranges(None, RangesOptions::default())
|
||||
.unwrap()
|
||||
.build(None)
|
||||
.unwrap();
|
||||
let mut series_count = 0;
|
||||
let mut row_count = 0;
|
||||
|
||||
|
||||
@@ -83,16 +83,6 @@ impl Memtable for EmptyMemtable {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "test"))]
|
||||
fn iter(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
_filters: Option<Predicate>,
|
||||
_sequence: Option<SequenceRange>,
|
||||
) -> Result<BoxedBatchIterator> {
|
||||
Ok(Box::new(std::iter::empty()))
|
||||
}
|
||||
|
||||
fn ranges(
|
||||
&self,
|
||||
_projection: Option<&[ColumnId]>,
|
||||
|
||||
Reference in New Issue
Block a user