feat: memtable filter push down (#2539)

* feat: memtable support filter pushdown to prune primary keys

* fix: switch to next time series when pk not selected

* fix: allow predicate evaluation failure

* fix: some clippy warnings

* fix: panic when no primary key in schema

* feat: cache decoded record batch for primary key

* refactor: use arcswap instead of rwlock

* fix: format toml
This commit is contained in:
Lei, HUANG
2023-10-10 12:03:10 +08:00
committed by GitHub
parent d4577e7372
commit 8bdef9a348
8 changed files with 257 additions and 40 deletions

1
Cargo.lock generated
View File

@@ -5500,6 +5500,7 @@ dependencies = [
"anymap",
"api",
"aquamarine",
"arc-swap",
"async-channel",
"async-compat",
"async-stream",

View File

@@ -12,6 +12,7 @@ test = ["common-test-util"]
anymap = "1.0.0-beta.2"
api.workspace = true
aquamarine.workspace = true
arc-swap = "1.6"
async-channel = "1.9"
async-compat = "0.2"
async-stream.workspace = true

View File

@@ -307,7 +307,7 @@ impl RegionFlushTask {
}
let file_id = FileId::random();
let iter = mem.iter(None, &[]);
let iter = mem.iter(None, None);
let source = Source::Iter(iter);
let mut writer = self
.access_layer

View File

@@ -23,11 +23,11 @@ use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use common_query::logical_plan::Expr;
use common_time::Timestamp;
use metrics::{decrement_gauge, increment_gauge};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
@@ -73,7 +73,11 @@ pub trait Memtable: Send + Sync + fmt::Debug {
/// Scans the memtable.
/// `projection` selects columns to read, `None` means reading all columns.
/// `filters` are the predicates to be pushed down to memtable.
fn iter(&self, projection: Option<&[ColumnId]>, filters: &[Expr]) -> BoxedBatchIterator;
fn iter(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> BoxedBatchIterator;
/// Returns true if the memtable is empty.
fn is_empty(&self) -> bool;

View File

@@ -19,9 +19,11 @@ use std::sync::atomic::{AtomicI64, AtomicU32, Ordering};
use std::sync::{Arc, RwLock};
use api::v1::OpType;
use common_query::logical_plan::Expr;
use arc_swap::ArcSwapOption;
use common_telemetry::debug;
use datatypes::arrow;
use datatypes::arrow::array::ArrayRef;
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::data_type::DataType;
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
use datatypes::value::ValueRef;
@@ -31,8 +33,12 @@ use datatypes::vectors::{
use snafu::{ensure, ResultExt};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::error::{
ComputeArrowSnafu, ConvertVectorSnafu, NewRecordBatchSnafu, PrimaryKeyLengthMismatchSnafu,
Result,
};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
@@ -76,7 +82,7 @@ impl MemtableBuilder for TimeSeriesMemtableBuilder {
pub struct TimeSeriesMemtable {
id: MemtableId,
region_metadata: RegionMetadataRef,
row_codec: McmpRowCodec,
row_codec: Arc<McmpRowCodec>,
series_set: SeriesSet,
alloc_tracker: AllocTracker,
max_timestamp: AtomicI64,
@@ -89,13 +95,13 @@ impl TimeSeriesMemtable {
id: MemtableId,
write_buffer_manager: Option<WriteBufferManagerRef>,
) -> Self {
let row_codec = McmpRowCodec::new(
let row_codec = Arc::new(McmpRowCodec::new(
region_metadata
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
);
let series_set = SeriesSet::new(region_metadata.clone());
));
let series_set = SeriesSet::new(region_metadata.clone(), row_codec.clone());
Self {
id,
region_metadata,
@@ -177,7 +183,7 @@ impl Memtable for TimeSeriesMemtable {
actual: kv.num_primary_keys()
}
);
let primary_key_encoded = self.row_codec.encode(kv.primary_keys())?;
let primary_key_encoded = PrimaryKey::new(self.row_codec.encode(kv.primary_keys())?);
let fields = kv.fields().collect::<Vec<_>>();
allocated += fields.len() * std::mem::size_of::<ValueRef>();
@@ -200,7 +206,11 @@ impl Memtable for TimeSeriesMemtable {
Ok(())
}
fn iter(&self, projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator {
fn iter(
&self,
projection: Option<&[ColumnId]>,
predicate: Option<Predicate>,
) -> BoxedBatchIterator {
let projection = if let Some(projection) = projection {
projection.iter().copied().collect()
} else {
@@ -210,7 +220,7 @@ impl Memtable for TimeSeriesMemtable {
.collect()
};
Box::new(self.series_set.iter_series(projection))
Box::new(self.series_set.iter_series(projection, predicate))
}
fn is_empty(&self) -> bool {
@@ -248,18 +258,76 @@ impl Memtable for TimeSeriesMemtable {
}
}
type SeriesRwLockMap = RwLock<BTreeMap<Vec<u8>, Arc<RwLock<Series>>>>;
struct PrimaryKey {
bytes: Vec<u8>,
record_batch: ArcSwapOption<RecordBatch>,
}
impl Clone for PrimaryKey {
fn clone(&self) -> Self {
Self {
bytes: self.bytes.clone(),
record_batch: Default::default(),
}
}
}
impl PrimaryKey {
fn new(bytes: Vec<u8>) -> Self {
Self {
bytes,
record_batch: ArcSwapOption::empty(),
}
}
fn get_or_update_record_batch_with<F: FnMut() -> Result<RecordBatch>>(
&self,
mut f: F,
) -> Result<Arc<RecordBatch>> {
if let Some(rb) = self.record_batch.load_full() {
return Ok(rb);
}
let batch = Arc::new(f()?);
self.record_batch.store(Some(batch.clone()));
Ok(batch)
}
}
impl Eq for PrimaryKey {}
impl PartialEq<Self> for PrimaryKey {
fn eq(&self, other: &Self) -> bool {
self.bytes.eq(&other.bytes)
}
}
impl PartialOrd<Self> for PrimaryKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for PrimaryKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.bytes.cmp(&other.bytes)
}
}
type SeriesRwLockMap = RwLock<BTreeMap<PrimaryKey, Arc<RwLock<Series>>>>;
struct SeriesSet {
region_metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
codec: Arc<McmpRowCodec>,
}
impl SeriesSet {
fn new(region_metadata: RegionMetadataRef) -> Self {
fn new(region_metadata: RegionMetadataRef, codec: Arc<McmpRowCodec>) -> Self {
Self {
region_metadata,
series: Default::default(),
codec,
}
}
}
@@ -267,7 +335,7 @@ impl SeriesSet {
impl SeriesSet {
/// Returns the series for given primary key, or create a new series if not already exist,
/// along with the allocated memory footprint for primary keys.
fn get_or_add_series(&self, primary_key: Vec<u8>) -> (Arc<RwLock<Series>>, usize) {
fn get_or_add_series(&self, primary_key: PrimaryKey) -> (Arc<RwLock<Series>>, usize) {
if let Some(series) = self.series.read().unwrap().get(&primary_key) {
return (series.clone(), 0);
};
@@ -275,7 +343,7 @@ impl SeriesSet {
let mut indices = self.series.write().unwrap();
match indices.entry(primary_key) {
Entry::Vacant(v) => {
let key_len = v.key().len();
let key_len = v.key().bytes.len();
v.insert(s.clone());
(s, key_len)
}
@@ -285,21 +353,55 @@ impl SeriesSet {
}
/// Iterates all series in [SeriesSet].
fn iter_series(&self, projection: HashSet<ColumnId>) -> Iter {
fn iter_series(&self, projection: HashSet<ColumnId>, predicate: Option<Predicate>) -> Iter {
let (primary_key_builders, primary_key_schema) =
primary_key_builders(&self.region_metadata, 1);
Iter {
metadata: self.region_metadata.clone(),
series: self.series.clone(),
projection,
last_key: None,
predicate,
pk_schema: primary_key_schema,
primary_key_builders,
codec: self.codec.clone(),
}
}
}
/// Creates primary key array builders and arrow's schema for primary keys of given region schema.
fn primary_key_builders(
region_metadata: &RegionMetadataRef,
num_pk_rows: usize,
) -> (Vec<Box<dyn MutableVector>>, arrow::datatypes::SchemaRef) {
let (builders, fields): (_, Vec<_>) = region_metadata
.primary_key_columns()
.map(|pk| {
(
pk.column_schema
.data_type
.create_mutable_vector(num_pk_rows),
arrow::datatypes::Field::new(
pk.column_schema.name.clone(),
pk.column_schema.data_type.as_arrow_type(),
pk.column_schema.is_nullable(),
),
)
})
.unzip();
(builders, Arc::new(arrow::datatypes::Schema::new(fields)))
}
struct Iter {
metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
projection: HashSet<ColumnId>,
last_key: Option<Vec<u8>>,
last_key: Option<PrimaryKey>,
predicate: Option<Predicate>,
pk_schema: arrow::datatypes::SchemaRef,
primary_key_builders: Vec<Box<dyn MutableVector>>,
codec: Arc<McmpRowCodec>,
}
impl Iterator for Iter {
@@ -307,23 +409,84 @@ impl Iterator for Iter {
fn next(&mut self) -> Option<Self::Item> {
let map = self.series.read().unwrap();
let mut range = match &self.last_key {
None => map.range::<Vec<u8>, _>(..),
let range = match &self.last_key {
None => map.range::<PrimaryKey, _>(..),
Some(last_key) => {
map.range::<Vec<u8>, _>((Bound::Excluded(last_key), Bound::Unbounded))
map.range::<PrimaryKey, _>((Bound::Excluded(last_key), Bound::Unbounded))
}
};
if let Some((primary_key, series)) = range.next() {
// TODO(hl): maybe yield more than one time series to amortize range overhead.
for (primary_key, series) in range {
if let Some(predicate) = &self.predicate {
if !prune_primary_key(
&self.codec,
primary_key,
&mut self.primary_key_builders,
&self.pk_schema,
predicate,
) {
// read next series
continue;
}
}
self.last_key = Some(primary_key.clone());
let values = series.write().unwrap().compact(&self.metadata);
Some(values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)))
} else {
None
return Some(
values.and_then(|v| v.to_batch(primary_key, &self.metadata, &self.projection)),
);
}
None
}
}
fn prune_primary_key(
codec: &Arc<McmpRowCodec>,
pk: &PrimaryKey,
builders: &mut Vec<Box<dyn MutableVector>>,
pk_schema: &arrow::datatypes::SchemaRef,
predicate: &Predicate,
) -> bool {
// no primary key, we simply return true.
if pk_schema.fields().is_empty() {
return true;
}
let Ok(pk_record_batch) = pk.get_or_update_record_batch_with(move || {
pk_to_record_batch(codec, &pk.bytes, builders, pk_schema)
}) else {
return true;
};
let result = predicate.prune_primary_key(&pk_record_batch);
debug!(
"Prune primary key: {:?}, res: {:?}",
pk_record_batch, result
);
result.unwrap_or(true)
}
fn pk_to_record_batch(
codec: &Arc<McmpRowCodec>,
bytes: &[u8],
builders: &mut Vec<Box<dyn MutableVector>>,
pk_schema: &arrow::datatypes::SchemaRef,
) -> Result<RecordBatch> {
let pk_values = codec.decode(bytes).unwrap();
assert_eq!(builders.len(), pk_values.len());
let arrays = builders
.iter_mut()
.zip(pk_values.iter())
.map(|(builder, pk_value)| {
builder.push_value_ref(pk_value.as_value_ref());
builder.to_vector().to_arrow_array()
})
.collect();
RecordBatch::try_new(pk_schema.clone(), arrays).context(NewRecordBatchSnafu)
}
/// A `Series` holds a list of field values of some given primary key.
struct Series {
active: ValueBuilder,
@@ -461,12 +624,12 @@ impl Values {
/// keeps only the latest row for the same timestamp.
pub fn to_batch(
&self,
primary_key: &[u8],
primary_key: &PrimaryKey,
metadata: &RegionMetadataRef,
projection: &HashSet<ColumnId>,
) -> Result<Batch> {
let builder = BatchBuilder::with_required_columns(
primary_key.to_vec(),
primary_key.bytes.clone(),
self.timestamp.clone(),
self.sequence.clone(),
self.op_type.clone(),
@@ -699,7 +862,11 @@ mod tests {
};
let batch = values
.to_batch(b"test", &schema, &[0, 1, 2, 3, 4].into_iter().collect())
.to_batch(
&PrimaryKey::new(b"test".to_vec()),
&schema,
&[0, 1, 2, 3, 4].into_iter().collect(),
)
.unwrap();
check_value(
&batch,
@@ -784,7 +951,13 @@ mod tests {
#[test]
fn test_series_set_concurrency() {
let schema = schema_for_test();
let set = Arc::new(SeriesSet::new(schema.clone()));
let row_codec = Arc::new(McmpRowCodec::new(
schema
.primary_key_columns()
.map(|c| SortField::new(c.column_schema.data_type.clone()))
.collect(),
));
let set = Arc::new(SeriesSet::new(schema.clone(), row_codec));
let concurrency = 32;
let pk_num = concurrency * 2;
@@ -795,7 +968,7 @@ mod tests {
for j in i * 100..(i + 1) * 100 {
let pk = j % pk_num;
let primary_key = format!("pk-{}", pk).as_bytes().to_vec();
let (series, _) = set.get_or_add_series(primary_key);
let (series, _) = set.get_or_add_series(PrimaryKey::new(primary_key));
let mut guard = series.write().unwrap();
guard.push(
ts_value_ref(j as i64),
@@ -818,7 +991,7 @@ mod tests {
for i in 0..pk_num {
let pk = format!("pk-{}", i).as_bytes().to_vec();
let (series, _) = set.get_or_add_series(pk);
let (series, _) = set.get_or_add_series(PrimaryKey::new(pk));
let mut guard = series.write().unwrap();
let values = guard.compact(&schema).unwrap();
timestamps.extend(values.sequence.iter_data().map(|v| v.unwrap() as i64));
@@ -866,7 +1039,7 @@ mod tests {
.map(|kv| kv.timestamp().as_timestamp().unwrap().unwrap().value())
.collect::<HashSet<_>>();
let iter = memtable.iter(None, &[]);
let iter = memtable.iter(None, None);
let read = iter
.flat_map(|batch| {
batch
@@ -892,7 +1065,7 @@ mod tests {
let memtable = TimeSeriesMemtable::new(schema, 42, None);
memtable.write(&kvs).unwrap();
let iter = memtable.iter(Some(&[3]), &[]);
let iter = memtable.iter(Some(&[3]), None);
let mut v0_all = vec![];

View File

@@ -135,8 +135,7 @@ impl SeqScan {
// Scans all memtables and SSTs. Builds a merge reader to merge results.
let mut builder = MergeReaderBuilder::new();
for mem in &self.memtables {
// TODO(hl): pass filters once memtable supports filter pushdown.
let iter = mem.iter(Some(self.mapper.column_ids()), &[]);
let iter = mem.iter(Some(self.mapper.column_ids()), self.predicate.clone());
builder.push_batch_iter(iter);
}
for file in &self.files {

View File

@@ -17,9 +17,9 @@
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use common_query::logical_plan::Expr;
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::Result;
use crate::memtable::{
@@ -50,7 +50,11 @@ impl Memtable for EmptyMemtable {
Ok(())
}
fn iter(&self, _projection: Option<&[ColumnId]>, _filters: &[Expr]) -> BoxedBatchIterator {
fn iter(
&self,
_projection: Option<&[ColumnId]>,
_filters: Option<Predicate>,
) -> BoxedBatchIterator {
Box::new(std::iter::empty())
}

View File

@@ -15,17 +15,19 @@
use std::sync::Arc;
use common_query::logical_plan::{DfExpr, Expr};
use common_telemetry::{error, warn};
use common_telemetry::{debug, error, warn};
use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::parquet::file::metadata::RowGroupMetaData;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion_common::ToDFSchema;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::expr::InList;
use datafusion_expr::{Between, BinaryExpr, Operator};
use datafusion_expr::{Between, BinaryExpr, ColumnarValue, Operator};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use datatypes::arrow::array::BooleanArray;
use datatypes::schema::SchemaRef;
use datatypes::value::scalar_value_to_timestamp;
use snafu::ResultExt;
@@ -119,6 +121,39 @@ impl Predicate {
res
}
/// Prunes primary keys
pub fn prune_primary_key(&self, primary_key: &RecordBatch) -> error::Result<bool> {
for expr in &self.exprs {
// evaluate every filter against primary key
let Ok(eva) = expr.evaluate(primary_key) else {
continue;
};
let result = match eva {
ColumnarValue::Array(array) => {
let predicate_array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
predicate_array
.into_iter()
.map(|x| x.unwrap_or(true))
.next()
.unwrap_or(true)
}
// result was a column
ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true),
_ => {
unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key);
}
};
debug!(
"Evaluate primary key {:?} against filter: {:?}, result: {:?}",
primary_key, expr, result
);
if !result {
return Ok(false);
}
}
Ok(true)
}
/// Evaluates the predicate against the `stats`.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_with_stats<S: PruningStatistics>(&self, stats: &S) -> Vec<bool> {