feat: remove memtable request (#2307)

* refactor: remove scan request from memtable API

* docs: Update comment

---------

Co-authored-by: Yingwen <realevenyag@gmail.com>
This commit is contained in:
Lei, HUANG
2023-09-03 13:24:29 +08:00
committed by Ruihang Xia
parent 271f80daad
commit 43fdff3639
4 changed files with 17 additions and 19 deletions

View File

@@ -20,7 +20,7 @@ use std::sync::Arc;
use common_query::Output;
use common_telemetry::{error, info};
use snafu::ResultExt;
use store_api::storage::{RegionId, ScanRequest};
use store_api::storage::RegionId;
use tokio::sync::{mpsc, oneshot};
use crate::access_layer::AccessLayerRef;
@@ -190,7 +190,7 @@ impl RegionFlushTask {
}
let file_id = FileId::random();
let iter = mem.iter(ScanRequest::default());
let iter = mem.iter(None, &[]);
let source = Source::Iter(iter);
let mut writer = self
.access_layer

View File

@@ -23,10 +23,11 @@ use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
use std::sync::Arc;
use common_query::logical_plan::Expr;
use common_time::Timestamp;
use metrics::{decrement_gauge, increment_gauge};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ScanRequest;
use store_api::storage::ColumnId;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
@@ -63,8 +64,10 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Write key values into the memtable.
fn write(&self, kvs: &KeyValues) -> Result<()>;
/// Scans the memtable for `req`.
fn iter(&self, req: ScanRequest) -> BoxedBatchIterator;
/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.
/// `filters` are the predicates to be pushed down to memtable.
fn iter(&self, projection: Option<&[ColumnId]>, filters: &[Expr]) -> BoxedBatchIterator;
/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;
@@ -110,7 +113,7 @@ impl Memtable for EmptyMemtable {
Ok(())
}
fn iter(&self, _req: ScanRequest) -> BoxedBatchIterator {
fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator {
Box::new(std::iter::empty())
}

View File

@@ -19,6 +19,7 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use api::v1::OpType;
use common_query::logical_plan::Expr;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::data_type::DataType;
@@ -29,7 +30,7 @@ use datatypes::vectors::{
};
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::{ColumnId, ScanRequest};
use store_api::storage::ColumnId;
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::memtable::{
@@ -178,12 +179,9 @@ impl Memtable for TimeSeriesMemtable {
Ok(())
}
fn iter(&self, req: ScanRequest) -> BoxedBatchIterator {
let projection = if let Some(projection) = &req.projection {
projection
.iter()
.map(|idx| self.region_metadata.column_metadatas[*idx].column_id)
.collect()
fn iter(&self, projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
self.region_metadata
.field_columns()
@@ -883,7 +881,7 @@ mod tests {
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<HashSet<_>>();
let iter = memtable.iter(ScanRequest::default());
let iter = memtable.iter(None, &[]);
let read = iter
.flat_map(|batch| {
batch
@@ -909,10 +907,7 @@ mod tests {
let memtable = TimeSeriesMemtable::new(schema, 42);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(ScanRequest {
projection: Some(vec![3]), // k0, k1, ts, v0, v1, only take v0
..Default::default()
});
let iter = memtable.iter(Some(&[3]), &[]);
let mut v0_all = vec![];

View File

@@ -107,7 +107,7 @@ impl SeqScan {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut builder = MergeReaderBuilder::new();
for mem in &self.memtables {
let iter = mem.iter(self.request.clone());
let iter = mem.iter(Some(self.mapper.column_ids()), &self.request.filters);
builder.push_batch_iter(iter);
}
for file in &self.files {