feat: use simple filter to prune memtable (#3269)

* switch on clippy warnings

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* feat: use simple filter to prune memtable

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* remove deadcode

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* refine util function

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
This commit is contained in:
Ruihang Xia
2024-02-04 19:35:55 +08:00
committed by GitHub
parent 902570abf6
commit 51feec2579
11 changed files with 115 additions and 162 deletions

View File

@@ -28,7 +28,6 @@ use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use common_error::ext::{BoxedError, ErrorExt};
use common_error::status_code::StatusCode;
use common_query::Output;
use common_recordbatch::SendableRecordBatchStream;
use mito2::engine::MitoEngine;
use store_api::metadata::RegionMetadataRef;
@@ -133,7 +132,7 @@ impl RegionEngine for MetricEngine {
RegionRequest::Flush(_) => todo!(),
RegionRequest::Compact(_) => todo!(),
RegionRequest::Truncate(_) => todo!(),
/// It always Ok(0), all data is latest.
// It always Ok(0), all data is the latest.
RegionRequest::Catchup(_) => Ok(0),
};

View File

@@ -14,25 +14,15 @@
//! Close a metric region
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{
DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY,
};
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest,
};
use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest};
use store_api::storage::RegionId;
use super::MetricEngineInner;
use crate::error::{
CloseMitoRegionSnafu, Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu,
PhysicalRegionNotFoundSnafu, Result,
};
use crate::error::{CloseMitoRegionSnafu, LogicalRegionNotFoundSnafu, Result};
use crate::metrics::PHYSICAL_REGION_COUNT;
use crate::{metadata_region, utils};
use crate::utils;
impl MetricEngineInner {
pub async fn close_region(

View File

@@ -14,25 +14,17 @@
//! Drop a metric region
use mito2::engine::MITO_ENGINE_NAME;
use object_store::util::join_dir;
use snafu::{OptionExt, ResultExt};
use store_api::metric_engine_consts::{
DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY,
};
use snafu::ResultExt;
use store_api::region_engine::RegionEngine;
use store_api::region_request::{
AffectedRows, RegionDropRequest, RegionOpenRequest, RegionRequest,
};
use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest};
use store_api::storage::RegionId;
use super::MetricEngineInner;
use crate::error::{
CloseMitoRegionSnafu, Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu,
PhysicalRegionBusySnafu, PhysicalRegionNotFoundSnafu, Result,
CloseMitoRegionSnafu, LogicalRegionNotFoundSnafu, PhysicalRegionBusySnafu, Result,
};
use crate::metrics::PHYSICAL_REGION_COUNT;
use crate::{metadata_region, utils};
use crate::utils;
impl MetricEngineInner {
pub async fn drop_region(

View File

@@ -26,7 +26,7 @@ use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::RegionId;
use super::MetricEngineInner;
use crate::error::{Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu, Result};
use crate::error::{OpenMitoRegionSnafu, Result};
use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
use crate::utils;

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::hash::{BuildHasher, Hash, Hasher};
use std::hash::Hash;
use api::v1::value::ValueData;
use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType};
@@ -29,7 +29,7 @@ use crate::error::{
ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result,
};
use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED};
use crate::utils::{to_data_region_id, to_metadata_region_id};
use crate::utils::to_data_region_id;
// A random number
const TSID_HASH_SEED: u32 = 846793005;
@@ -84,6 +84,7 @@ impl MetricEngineInner {
.await?;
// write to data region
// TODO: retrieve table name
self.modify_rows(logical_region_id.table_id(), &mut request.rows)?;
self.data_region.write_data(data_region_id, request).await
@@ -101,7 +102,6 @@ impl MetricEngineInner {
request: &RegionPutRequest,
) -> Result<()> {
// check if the region exists
let metadata_region_id = to_metadata_region_id(physical_region_id);
let data_region_id = to_data_region_id(physical_region_id);
let state = self.state.read().unwrap();
if !state.is_logical_region_exist(logical_region_id) {

View File

@@ -20,10 +20,9 @@ use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{error, info, tracing};
use datafusion::logical_expr;
use snafu::{OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef};
use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME;
use store_api::region_engine::RegionEngine;
use store_api::storage::consts::ReservedColumnId;
use store_api::storage::{RegionId, ScanRequest};
use crate::engine::MetricEngineInner;
@@ -259,7 +258,6 @@ mod test {
use store_api::region_request::RegionRequest;
use super::*;
use crate::engine::alter;
use crate::test_util::{
alter_logical_region_add_tag_columns, create_logical_region_request, TestEnv,
};
@@ -271,7 +269,6 @@ mod test {
let logical_region_id = env.default_logical_region_id();
let physical_region_id = env.default_physical_region_id();
let data_region_id = utils::to_data_region_id(physical_region_id);
// create another logical region
let logical_region_id2 = RegionId::new(1112345678, 999);
@@ -291,7 +288,7 @@ mod test {
.unwrap();
// check explicit projection
let mut scan_req = ScanRequest {
let scan_req = ScanRequest {
projection: Some(vec![0, 1, 2, 3, 4, 5, 6]),
filters: vec![],
..Default::default()
@@ -314,7 +311,7 @@ mod test {
);
// check default projection
let mut scan_req = ScanRequest::default();
let scan_req = ScanRequest::default();
let scan_req = env
.metric()
.inner

View File

@@ -14,11 +14,7 @@
//! Implementation of retrieving logical region's region metadata.
use std::collections::{HashMap, HashSet};
use api::v1::SemanticType;
use store_api::metadata::{ColumnMetadata, RegionMetadata};
use store_api::storage::consts::ReservedColumnId;
use store_api::metadata::ColumnMetadata;
use store_api::storage::RegionId;
use crate::engine::MetricEngineInner;

View File

@@ -53,7 +53,6 @@
#![feature(let_chains)]
mod data_region;
#[allow(unused)]
pub mod engine;
pub mod error;
mod metadata_region;

View File

@@ -229,13 +229,11 @@ impl MetadataRegion {
format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
}
#[allow(dead_code)]
pub fn parse_region_key(key: &str) -> Option<&str> {
key.strip_prefix(REGION_PREFIX)
}
/// Parse column key to (logical_region_id, column_name)
#[allow(dead_code)]
pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
let mut iter = stripped.split('_');
@@ -271,7 +269,6 @@ impl MetadataRegion {
// simulate to `KvBackend`
//
// methods in this block assume the given region id is transformed.
#[allow(unused_variables)]
impl MetadataRegion {
/// Put if not exist, return if this put operation is successful (error other
/// than "key already exist" will be wrapped in [Err]).

View File

@@ -20,17 +20,14 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use api::v1::OpType;
use common_telemetry::{debug, error, trace};
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_telemetry::{debug, error};
use common_time::Timestamp;
use datafusion::physical_plan::PhysicalExpr;
use datafusion_common::ScalarValue;
use datafusion_expr::ColumnarValue;
use datatypes::arrow;
use datatypes::arrow::array::{ArrayRef, BooleanArray};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::arrow::array::ArrayRef;
use datatypes::data_type::{ConcreteDataType, DataType};
use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef};
use datatypes::value::ValueRef;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::{
Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder,
};
@@ -39,10 +36,7 @@ use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;
use crate::error::{
ComputeArrowSnafu, ConvertVectorSnafu, NewRecordBatchSnafu, PrimaryKeyLengthMismatchSnafu,
Result,
};
use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result};
use crate::flush::WriteBufferManagerRef;
use crate::memtable::{
AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId,
@@ -305,48 +299,39 @@ impl SeriesSet {
/// Iterates all series in [SeriesSet].
fn iter_series(&self, projection: HashSet<ColumnId>, predicate: Option<Predicate>) -> Iter {
let (primary_key_builders, primary_key_schema) =
primary_key_builders(&self.region_metadata, 1);
let primary_key_schema = primary_key_schema(&self.region_metadata);
let primary_key_datatypes = self
.region_metadata
.primary_key_columns()
.map(|pk| pk.column_schema.data_type.clone())
.collect();
let physical_exprs: Vec<_> = predicate
.and_then(|p| p.to_physical_exprs(&primary_key_schema).ok())
.unwrap_or_default();
Iter {
metadata: self.region_metadata.clone(),
series: self.series.clone(),
Iter::new(
self.region_metadata.clone(),
self.series.clone(),
projection,
last_key: None,
predicate: physical_exprs,
pk_schema: primary_key_schema,
primary_key_builders,
codec: self.codec.clone(),
metrics: Metrics::default(),
}
predicate,
primary_key_schema,
primary_key_datatypes,
self.codec.clone(),
)
}
}
/// Creates primary key array builders and arrow's schema for primary keys of given region schema.
fn primary_key_builders(
region_metadata: &RegionMetadataRef,
num_pk_rows: usize,
) -> (Vec<Box<dyn MutableVector>>, arrow::datatypes::SchemaRef) {
let (builders, fields): (_, Vec<_>) = region_metadata
/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys
/// of given region schema
fn primary_key_schema(region_metadata: &RegionMetadataRef) -> arrow::datatypes::SchemaRef {
let fields = region_metadata
.primary_key_columns()
.map(|pk| {
(
pk.column_schema
.data_type
.create_mutable_vector(num_pk_rows),
arrow::datatypes::Field::new(
pk.column_schema.name.clone(),
pk.column_schema.data_type.as_arrow_type(),
pk.column_schema.is_nullable(),
),
arrow::datatypes::Field::new(
pk.column_schema.name.clone(),
pk.column_schema.data_type.as_arrow_type(),
pk.column_schema.is_nullable(),
)
})
.unzip();
(builders, Arc::new(arrow::datatypes::Schema::new(fields)))
.collect::<Vec<_>>();
Arc::new(arrow::datatypes::Schema::new(fields))
}
/// Metrics for reading the memtable.
@@ -369,13 +354,45 @@ struct Iter {
series: Arc<SeriesRwLockMap>,
projection: HashSet<ColumnId>,
last_key: Option<Vec<u8>>,
predicate: Vec<Arc<dyn PhysicalExpr>>,
predicate: Vec<SimpleFilterEvaluator>,
pk_schema: arrow::datatypes::SchemaRef,
primary_key_builders: Vec<Box<dyn MutableVector>>,
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<McmpRowCodec>,
metrics: Metrics,
}
impl Iter {
pub(crate) fn new(
metadata: RegionMetadataRef,
series: Arc<SeriesRwLockMap>,
projection: HashSet<ColumnId>,
predicate: Option<Predicate>,
pk_schema: arrow::datatypes::SchemaRef,
pk_datatypes: Vec<ConcreteDataType>,
codec: Arc<McmpRowCodec>,
) -> Self {
let simple_filters = predicate
.map(|p| {
p.exprs()
.iter()
.filter_map(|f| SimpleFilterEvaluator::try_new(f.df_expr()))
.collect::<Vec<_>>()
})
.unwrap_or_default();
Self {
metadata,
series,
projection,
last_key: None,
predicate: simple_filters,
pk_schema,
pk_datatypes,
codec,
metrics: Metrics::default(),
}
}
}
impl Drop for Iter {
fn drop(&mut self) {
debug!(
@@ -415,7 +432,7 @@ impl Iterator for Iter {
&self.codec,
primary_key.as_slice(),
&mut series,
&mut self.primary_key_builders,
&self.pk_datatypes,
self.pk_schema.clone(),
&self.predicate,
)
@@ -446,88 +463,48 @@ fn prune_primary_key(
codec: &Arc<McmpRowCodec>,
pk: &[u8],
series: &mut Series,
builders: &mut [Box<dyn MutableVector>],
datatypes: &[ConcreteDataType],
pk_schema: arrow::datatypes::SchemaRef,
predicate: &[Arc<dyn PhysicalExpr>],
predicates: &[SimpleFilterEvaluator],
) -> bool {
// no primary key, we simply return true.
if pk_schema.fields().is_empty() {
return true;
}
if let Some(rb) = series.pk_cache.as_ref() {
prune_inner(predicate, rb).unwrap_or(true)
// retrieve primary key values from cache or decode from bytes.
let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() {
pk_values
} else {
let rb = match pk_to_record_batch(codec, pk, builders, pk_schema) {
Ok(rb) => rb,
Err(e) => {
error!(e; "Failed to build record batch from primary keys");
return true;
}
};
let res = prune_inner(predicate, &rb).unwrap_or(true);
series.update_pk_cache(rb);
res
}
}
let pk_values = codec.decode(pk);
if let Err(e) = pk_values {
error!(e; "Failed to decode primary key");
return true;
}
series.update_pk_cache(pk_values.unwrap());
series.pk_cache.as_ref().unwrap()
};
fn prune_inner(predicates: &[Arc<dyn PhysicalExpr>], primary_key: &RecordBatch) -> Result<bool> {
for expr in predicates {
// evaluate every filter against primary key
let Ok(eva) = expr.evaluate(primary_key) else {
// evaluate predicates against primary key values
let mut result = true;
for predicate in predicates {
// ignore predicates that are not referencing primary key columns
let Ok(index) = pk_schema.index_of(predicate.column_name()) else {
continue;
};
let result = match eva {
ColumnarValue::Array(array) => {
let predicate_array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
predicate_array
.into_iter()
.map(|x| x.unwrap_or(true))
.next()
.unwrap_or(true)
}
// result was a column
ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true),
_ => {
unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key);
}
};
trace!(
"Evaluate primary key {:?} against filter: {:?}, result: {:?}",
primary_key,
expr,
result
);
if !result {
return Ok(false);
}
// Safety: arrow schema and datatypes are constructed from the same source.
let scalar_value = pk_values[index]
.try_to_scalar_value(&datatypes[index])
.unwrap();
result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true);
}
Ok(true)
}
fn pk_to_record_batch(
codec: &Arc<McmpRowCodec>,
bytes: &[u8],
builders: &mut [Box<dyn MutableVector>],
pk_schema: arrow::datatypes::SchemaRef,
) -> Result<RecordBatch> {
let pk_values = codec.decode(bytes).unwrap();
let arrays = builders
.iter_mut()
.zip(pk_values.iter())
.map(|(builder, pk_value)| {
builder.push_value_ref(pk_value.as_value_ref());
builder.to_vector().to_arrow_array()
})
.collect();
RecordBatch::try_new(pk_schema, arrays).context(NewRecordBatchSnafu)
result
}
/// A `Series` holds a list of field values of some given primary key.
struct Series {
pk_cache: Option<RecordBatch>,
pk_cache: Option<Vec<Value>>,
active: ValueBuilder,
frozen: Vec<Values>,
}
@@ -546,8 +523,8 @@ impl Series {
self.active.push(ts, sequence, op_type as u8, values);
}
fn update_pk_cache(&mut self, pk_batch: RecordBatch) {
self.pk_cache = Some(pk_batch);
fn update_pk_cache(&mut self, pk_values: Vec<Value>) {
self.pk_cache = Some(pk_values);
}
/// Freezes the active part and push it to `frozen`.

View File

@@ -60,7 +60,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
return;
}
let mut region_ctxs = self.prepare_region_write_ctx(write_requests);
// Prepare write context.
let mut region_ctxs = {
let _timer = WRITE_STAGE_ELAPSED
.with_label_values(&["prepare_ctx"])
.start_timer();
self.prepare_region_write_ctx(write_requests)
};
// Write to WAL.
{