fix(mito): pruning for mito2 (#2525)

* fix: pruning for mito2

* chore: refactor projection parameters; add some tests; customize row group size for each flush task.

* chore: pass whole RegionFlushRequest

---------

Co-authored-by: Lei, HUANG <mrsatangel@gmail.com>
This commit is contained in:
Yingwen
2023-10-08 11:45:15 +08:00
committed by GitHub
parent 0292445476
commit 0593c3bde3
18 changed files with 531 additions and 49 deletions

View File

@@ -35,6 +35,8 @@ mod open_test;
#[cfg(test)]
mod projection_test;
#[cfg(test)]
mod prune_test;
#[cfg(test)]
mod truncate_test;
use std::sync::Arc;

View File

@@ -44,7 +44,12 @@ async fn put_and_flush(
put_rows(engine, region_id, rows).await;
let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap()
else {
@@ -79,7 +84,12 @@ async fn delete_and_flush(
assert_eq!(row_cnt, rows_affected);
let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap()
else {

View File

@@ -67,7 +67,7 @@ async fn test_engine_drop_region() {
rows: build_rows_for_key("a", 0, 2, 0),
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id).await;
flush_region(&engine, region_id, None).await;
// drop the created region.
engine

View File

@@ -49,7 +49,7 @@ async fn test_manual_flush() {
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id).await;
flush_region(&engine, region_id, None).await;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
@@ -164,7 +164,7 @@ async fn test_write_stall() {
tokio::spawn(async move {
listener.wait().await;
flush_region(&engine_cloned, region_id).await;
flush_region(&engine_cloned, region_id, None).await;
});
// Triggers write stall.
@@ -212,7 +212,7 @@ async fn test_flush_empty() {
.await
.unwrap();
flush_region(&engine, region_id).await;
flush_region(&engine, region_id, None).await;
let request = ScanRequest::default();
let scanner = engine.scanner(region_id, request).unwrap();
@@ -247,7 +247,7 @@ async fn test_flush_reopen_region() {
};
put_rows(&engine, region_id, rows).await;
flush_region(&engine, region_id).await;
flush_region(&engine, region_id, None).await;
let check_region = || {
let region = engine.get_region(region_id).unwrap();
let version_data = region.version_control.current();

View File

@@ -0,0 +1,102 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use api::v1::Rows;
use common_query::logical_plan::DfExpr;
use common_query::prelude::Expr;
use common_recordbatch::RecordBatches;
use datafusion_common::ScalarValue;
use datafusion_expr::lit;
use store_api::region_engine::RegionEngine;
use store_api::region_request::RegionRequest;
use store_api::storage::{RegionId, ScanRequest};
use crate::config::MitoConfig;
use crate::test_util::{
build_rows, flush_region, put_rows, rows_schema, CreateRequestBuilder, TestEnv,
};
async fn check_prune_row_groups(expr: DfExpr, expected: &str) {
let mut env = TestEnv::new();
let engine = env.create_engine(MitoConfig::default()).await;
let region_id = RegionId::new(1, 1);
let request = CreateRequestBuilder::new().build();
let column_schemas = rows_schema(&request);
engine
.handle_request(region_id, RegionRequest::Create(request))
.await
.unwrap();
put_rows(
&engine,
region_id,
Rows {
schema: column_schemas.clone(),
rows: build_rows(0, 10),
},
)
.await;
flush_region(&engine, region_id, Some(5)).await;
let stream = engine
.handle_query(
region_id,
ScanRequest {
filters: vec![Expr::from(expr)],
..Default::default()
},
)
.await
.unwrap();
let batches = RecordBatches::try_collect(stream).await.unwrap();
assert_eq!(expected, batches.pretty_print().unwrap());
}
#[tokio::test]
async fn test_read_parquet_stats() {
common_telemetry::init_default_ut_logging();
check_prune_row_groups(
datafusion_expr::col("ts").gt(lit(ScalarValue::TimestampMillisecond(Some(4000), None))),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;
check_prune_row_groups(
datafusion_expr::col("tag_0").gt(lit(ScalarValue::Utf8(Some("4".to_string())))),
"\
+-------+---------+---------------------+
| tag_0 | field_0 | ts |
+-------+---------+---------------------+
| 5 | 5.0 | 1970-01-01T00:00:05 |
| 6 | 6.0 | 1970-01-01T00:00:06 |
| 7 | 7.0 | 1970-01-01T00:00:07 |
| 8 | 8.0 | 1970-01-01T00:00:08 |
| 9 | 9.0 | 1970-01-01T00:00:09 |
+-------+---------+---------------------+",
)
.await;
}

View File

@@ -167,7 +167,12 @@ async fn test_engine_truncate_after_flush() {
// Flush the region.
engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
.unwrap();
@@ -304,7 +309,12 @@ async fn test_engine_truncate_during_flush() {
let flush_task = tokio::spawn(async move {
info!("do flush task!!!!");
engine_cloned
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest {
row_group_size: None,
}),
)
.await
});

View File

@@ -190,6 +190,7 @@ pub(crate) struct RegionFlushTask {
pub(crate) memtable_builder: MemtableBuilderRef,
pub(crate) file_purger: FilePurgerRef,
pub(crate) listener: WorkerListener,
pub(crate) row_group_size: Option<usize>,
}
impl RegionFlushTask {
@@ -272,7 +273,10 @@ impl RegionFlushTask {
/// Flushes memtables to level 0 SSTs.
async fn flush_memtables(&self, version: &VersionRef) -> Result<Vec<FileMeta>> {
// TODO(yingwen): Make it configurable.
let write_opts = WriteOptions::default();
let mut write_opts = WriteOptions::default();
if let Some(row_group_size) = self.row_group_size {
write_opts.row_group_size = row_group_size;
}
let memtables = version.memtables.immutables();
let mut file_metas = Vec::with_capacity(memtables.len());
@@ -689,6 +693,7 @@ mod tests {
memtable_builder: builder.memtable_builder(),
file_purger: builder.file_purger(),
listener: WorkerListener::default(),
row_group_size: None,
};
task.push_sender(OptionOutputTx::from(output_tx));
scheduler

View File

@@ -165,8 +165,9 @@ impl ScanRegion {
.collect();
debug!(
"Seq scan region {}, memtables: {}, ssts_to_read: {}, total_ssts: {}",
"Seq scan region {}, request: {:?}, memtables: {}, ssts_to_read: {}, total_ssts: {}",
self.version.metadata.region_id,
self.request,
memtables.len(),
files.len(),
total_ssts

View File

@@ -16,6 +16,7 @@
mod format;
pub mod reader;
mod stats;
pub mod writer;
use common_base::readable_size::ReadableSize;

View File

@@ -30,14 +30,18 @@ use std::collections::HashMap;
use std::sync::Arc;
use api::v1::SemanticType;
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array};
use datafusion_common::ScalarValue;
use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt16Array, UInt64Array};
use datatypes::arrow::datatypes::{
DataType, Field, FieldRef, Fields, Schema, SchemaRef, UInt16Type,
DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef, UInt16Type,
};
use datatypes::arrow::record_batch::RecordBatch;
use datatypes::prelude::DataType;
use datatypes::vectors::{Helper, Vector};
use parquet::file::metadata::RowGroupMetaData;
use parquet::file::statistics::Statistics;
use snafu::{ensure, OptionExt, ResultExt};
use store_api::metadata::{RegionMetadata, RegionMetadataRef};
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
use store_api::storage::consts::{
OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
};
@@ -47,6 +51,7 @@ use crate::error::{
ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
};
use crate::read::{Batch, BatchBuilder, BatchColumn};
use crate::row_converter::{McmpRowCodec, RowCodec, SortField};
/// Number of columns that have fixed positions.
///
@@ -250,6 +255,66 @@ impl ReadFormat {
Ok(())
}
/// Returns min values of specific column in row groups.
pub(crate) fn min_values(
&self,
row_groups: &[RowGroupMetaData],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, true),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, true)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_values(row_groups, column, index, true)
}
}
}
/// Returns max values of specific column in row groups.
pub(crate) fn max_values(
&self,
row_groups: &[RowGroupMetaData],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
match column.semantic_type {
SemanticType::Tag => self.tag_values(row_groups, column, false),
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_values(row_groups, column, *index, false)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_values(row_groups, column, index, false)
}
}
}
/// Returns null counts of specific column in row groups.
pub(crate) fn null_counts(
&self,
row_groups: &[RowGroupMetaData],
column_id: ColumnId,
) -> Option<ArrayRef> {
let column = self.metadata.column_by_id(column_id)?;
match column.semantic_type {
SemanticType::Tag => None,
SemanticType::Field => {
let index = self.field_id_to_index.get(&column_id)?;
Self::column_null_counts(row_groups, *index)
}
SemanticType::Timestamp => {
let index = self.time_index_position();
Self::column_null_counts(row_groups, index)
}
}
}
/// Get fields from `record_batch`.
fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
record_batch
@@ -273,6 +338,148 @@ impl ReadFormat {
})
.collect()
}
/// Returns min/max values of specific tag.
fn tag_values(
&self,
row_groups: &[RowGroupMetaData],
column: &ColumnMetadata,
is_min: bool,
) -> Option<ArrayRef> {
let is_first_tag = self
.metadata
.primary_key
.first()
.map(|id| *id == column.column_id)
.unwrap_or(false);
if !is_first_tag {
// Only the min-max of the first tag is available in the primary key.
return None;
}
let converter =
McmpRowCodec::new(vec![SortField::new(column.column_schema.data_type.clone())]);
let values = row_groups.iter().map(|meta| {
let stats = meta.column(self.primary_key_position()).statistics()?;
if !stats.has_min_max_set() {
return None;
}
match stats {
Statistics::Boolean(_) => None,
Statistics::Int32(_) => None,
Statistics::Int64(_) => None,
Statistics::Int96(_) => None,
Statistics::Float(_) => None,
Statistics::Double(_) => None,
Statistics::ByteArray(s) => {
let bytes = if is_min { s.min_bytes() } else { s.max_bytes() };
let mut values = converter.decode(bytes).ok()?;
values.pop()
}
Statistics::FixedLenByteArray(_) => None,
}
});
let mut builder = column
.column_schema
.data_type
.create_mutable_vector(row_groups.len());
for value_opt in values {
match value_opt {
// Safety: We use the same data type to create the converter.
Some(v) => builder.push_value_ref(v.as_value_ref()),
None => builder.push_null(),
}
}
let vector = builder.to_vector();
Some(vector.to_arrow_array())
}
/// Returns min/max values of specific non-tag columns.
fn column_values(
row_groups: &[RowGroupMetaData],
column: &ColumnMetadata,
column_index: usize,
is_min: bool,
) -> Option<ArrayRef> {
let null_scalar: ScalarValue = column
.column_schema
.data_type
.as_arrow_type()
.try_into()
.ok()?;
let scalar_values = row_groups
.iter()
.map(|meta| {
let stats = meta.column(column_index).statistics()?;
if !stats.has_min_max_set() {
return None;
}
match stats {
Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
*s.min()
} else {
*s.max()
}))),
Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
*s.min()
} else {
*s.max()
}))),
Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
*s.min()
} else {
*s.max()
}))),
Statistics::Int96(_) => None,
Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
*s.min()
} else {
*s.max()
}))),
Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
*s.min()
} else {
*s.max()
}))),
Statistics::ByteArray(s) => {
let bytes = if is_min { s.min_bytes() } else { s.max_bytes() };
let s = String::from_utf8(bytes.to_vec()).ok();
Some(ScalarValue::Utf8(s))
}
Statistics::FixedLenByteArray(_) => None,
}
})
.map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
.collect::<Vec<ScalarValue>>();
debug_assert_eq!(scalar_values.len(), row_groups.len());
ScalarValue::iter_to_array(scalar_values).ok()
}
/// Returns null counts of specific non-tag columns.
fn column_null_counts(
row_groups: &[RowGroupMetaData],
column_index: usize,
) -> Option<ArrayRef> {
let values = row_groups.iter().map(|meta| {
let col = meta.column(column_index);
let stat = col.statistics()?;
Some(stat.null_count())
});
Some(Arc::new(UInt64Array::from_iter(values)))
}
/// Field index of the primary key.
fn primary_key_position(&self) -> usize {
self.arrow_schema.fields.len() - 3
}
/// Field index of the time index.
fn time_index_position(&self) -> usize {
self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
}
}
/// Gets the arrow schema to store in parquet.
@@ -328,12 +535,16 @@ fn internal_fields() -> [FieldRef; 3] {
[
Arc::new(Field::new_dictionary(
PRIMARY_KEY_COLUMN_NAME,
DataType::UInt16,
DataType::Binary,
ArrowDataType::UInt16,
ArrowDataType::Binary,
false,
)),
Arc::new(Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false)),
Arc::new(Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false)),
Arc::new(Field::new(
SEQUENCE_COLUMN_NAME,
ArrowDataType::UInt64,
false,
)),
Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
]
}
@@ -408,20 +619,23 @@ mod tests {
fn build_test_arrow_schema() -> SchemaRef {
let fields = vec![
Field::new("field1", DataType::Int64, true),
Field::new("field0", DataType::Int64, true),
Field::new("field1", ArrowDataType::Int64, true),
Field::new("field0", ArrowDataType::Int64, true),
Field::new(
"ts",
DataType::Timestamp(TimeUnit::Millisecond, None),
ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new(
"__primary_key",
DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Binary)),
ArrowDataType::Dictionary(
Box::new(ArrowDataType::UInt16),
Box::new(ArrowDataType::Binary),
),
false,
),
Field::new("__sequence", DataType::UInt64, false),
Field::new("__op_type", DataType::UInt8, false),
Field::new("__sequence", ArrowDataType::UInt64, false),
Field::new("__op_type", ArrowDataType::UInt8, false),
];
Arc::new(Schema::new(fields))
}

View File

@@ -14,6 +14,7 @@
//! Parquet reader.
use std::collections::HashSet;
use std::ops::Range;
use std::sync::Arc;
@@ -44,6 +45,7 @@ use crate::error::{
use crate::read::{Batch, BatchReader};
use crate::sst::file::{FileHandle, FileId};
use crate::sst::parquet::format::ReadFormat;
use crate::sst::parquet::stats::RowGroupPruningStats;
use crate::sst::parquet::PARQUET_METADATA_KEY;
/// Parquet SST reader builder.
@@ -151,18 +153,17 @@ impl ParquetReaderBuilder {
let key_value_meta = builder.metadata().file_metadata().key_value_metadata();
let region_meta = self.get_region_metadata(file_path, key_value_meta)?;
// Prune row groups by metadata.
if let Some(predicate) = &self.predicate {
// TODO(yingwen): Now we encode tags into the full primary key so we need some approach
// to implement pruning.
let pruned_row_groups = predicate
.prune_row_groups(builder.metadata().row_groups())
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
.collect::<Vec<_>>();
builder = builder.with_row_groups(pruned_row_groups);
}
let column_ids: HashSet<_> = self
.projection
.as_ref()
.map(|p| p.iter().cloned().collect())
.unwrap_or_else(|| {
region_meta
.column_metadatas
.iter()
.map(|c| c.column_id)
.collect()
});
let read_format = ReadFormat::new(Arc::new(region_meta));
// The arrow schema converted from the region meta should be the same as parquet's.
@@ -179,6 +180,22 @@ impl ParquetReaderBuilder {
}
);
// Prune row groups by metadata.
if let Some(predicate) = &self.predicate {
let stats = RowGroupPruningStats::new(
builder.metadata().row_groups(),
&read_format,
column_ids,
);
let pruned_row_groups = predicate
.prune_with_stats(&stats)
.into_iter()
.enumerate()
.filter_map(|(idx, valid)| if valid { Some(idx) } else { None })
.collect::<Vec<_>>();
builder = builder.with_row_groups(pruned_row_groups);
}
let parquet_schema_desc = builder.metadata().file_metadata().schema_descr();
if let Some(column_ids) = self.projection.as_ref() {
let indices = read_format.projection_indices(column_ids.iter().copied());

View File

@@ -0,0 +1,83 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Statistics of parquet SSTs.
use std::collections::HashSet;
use datafusion::physical_optimizer::pruning::PruningStatistics;
use datafusion_common::Column;
use datatypes::arrow::array::ArrayRef;
use parquet::file::metadata::RowGroupMetaData;
use store_api::storage::ColumnId;
use crate::sst::parquet::format::ReadFormat;
/// Statistics for pruning row groups.
pub(crate) struct RowGroupPruningStats<'a> {
/// Metadata of SST row groups.
row_groups: &'a [RowGroupMetaData],
/// Helper to read the SST.
read_format: &'a ReadFormat,
/// Projected column ids to read.
///
/// We need column ids to distinguish different columns with the same name.
/// e.g. Drops and then adds a column again.
column_ids: HashSet<ColumnId>,
}
impl<'a> RowGroupPruningStats<'a> {
/// Creates a new statistics to prune specific `row_groups`.
pub(crate) fn new(
row_groups: &'a [RowGroupMetaData],
read_format: &'a ReadFormat,
column_ids: HashSet<ColumnId>,
) -> Self {
Self {
row_groups,
read_format,
column_ids,
}
}
/// Returns the column id of specific column name if we need to read it.
fn column_id_to_prune(&self, name: &str) -> Option<ColumnId> {
// Only use stats when the column to read has the same id as the column in the SST.
self.read_format
.metadata()
.column_by_name(name)
.and_then(|col| self.column_ids.get(&col.column_id).copied())
}
}
impl<'a> PruningStatistics for RowGroupPruningStats<'a> {
fn min_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
self.read_format.min_values(self.row_groups, column_id)
}
fn max_values(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
self.read_format.max_values(self.row_groups, column_id)
}
fn num_containers(&self) -> usize {
self.row_groups.len()
}
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let column_id = self.column_id_to_prune(&column.name)?;
self.read_format.null_counts(self.row_groups, column_id)
}
}

View File

@@ -575,9 +575,12 @@ pub async fn delete_rows(engine: &MitoEngine, region_id: RegionId, rows: Rows) {
}
/// Flush a region manually.
pub async fn flush_region(engine: &MitoEngine, region_id: RegionId) {
pub async fn flush_region(engine: &MitoEngine, region_id: RegionId, row_group_size: Option<usize>) {
let Output::AffectedRows(rows) = engine
.handle_request(region_id, RegionRequest::Flush(RegionFlushRequest {}))
.handle_request(
region_id,
RegionRequest::Flush(RegionFlushRequest { row_group_size }),
)
.await
.unwrap()
else {

View File

@@ -534,8 +534,9 @@ impl<S: LogStore> RegionWorkerLoop<S> {
.await;
continue;
}
DdlRequest::Flush(_) => {
self.handle_flush_request(ddl.region_id, ddl.sender).await;
DdlRequest::Flush(req) => {
self.handle_flush_request(ddl.region_id, req, ddl.sender)
.await;
continue;
}
DdlRequest::Compact(_) => {

View File

@@ -80,7 +80,7 @@ impl<S> RegionWorkerLoop<S> {
info!("Flush region: {} before alteration", region_id);
// Try to submit a flush task.
let task = self.new_flush_task(&region, FlushReason::Alter);
let task = self.new_flush_task(&region, FlushReason::Alter, None);
if let Err(e) =
self.flush_scheduler
.schedule_flush(region.region_id, &region.version_control, task)

View File

@@ -17,6 +17,7 @@
use common_telemetry::{error, info, warn};
use common_time::util::current_time_millis;
use store_api::logstore::LogStore;
use store_api::region_request::RegionFlushRequest;
use store_api::storage::RegionId;
use crate::error::{RegionTruncatedSnafu, Result};
@@ -31,13 +32,14 @@ impl<S> RegionWorkerLoop<S> {
pub(crate) async fn handle_flush_request(
&mut self,
region_id: RegionId,
request: RegionFlushRequest,
mut sender: OptionOutputTx,
) {
let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
return;
};
let mut task = self.new_flush_task(&region, FlushReason::Manual);
let mut task = self.new_flush_task(&region, FlushReason::Manual, request.row_group_size);
task.push_sender(sender);
if let Err(e) =
self.flush_scheduler
@@ -92,7 +94,7 @@ impl<S> RegionWorkerLoop<S> {
if region.last_flush_millis() < min_last_flush_time {
// If flush time of this region is earlier than `min_last_flush_time`, we can flush this region.
let task = self.new_flush_task(region, FlushReason::EngineFull);
let task = self.new_flush_task(region, FlushReason::EngineFull, None);
self.flush_scheduler.schedule_flush(
region.region_id,
&region.version_control,
@@ -105,7 +107,7 @@ impl<S> RegionWorkerLoop<S> {
// TODO(yingwen): Maybe flush more tables to reduce write buffer size.
if let Some(region) = max_mem_region {
if !self.flush_scheduler.is_flush_requested(region.region_id) {
let task = self.new_flush_task(region, FlushReason::EngineFull);
let task = self.new_flush_task(region, FlushReason::EngineFull, None);
self.flush_scheduler.schedule_flush(
region.region_id,
&region.version_control,
@@ -122,6 +124,7 @@ impl<S> RegionWorkerLoop<S> {
&self,
region: &MitoRegionRef,
reason: FlushReason,
row_group_size: Option<usize>,
) -> RegionFlushTask {
// TODO(yingwen): metrics for flush requested.
RegionFlushTask {
@@ -133,6 +136,7 @@ impl<S> RegionWorkerLoop<S> {
memtable_builder: self.memtable_builder.clone(),
file_purger: region.file_purger.clone(),
listener: self.listener.clone(),
row_group_size,
}
}
}

View File

@@ -110,7 +110,9 @@ impl RegionRequest {
)]),
region_request::Body::Flush(flush) => Ok(vec![(
flush.region_id.into(),
Self::Flush(RegionFlushRequest {}),
Self::Flush(RegionFlushRequest {
row_group_size: None,
}),
)]),
region_request::Body::Compact(compact) => Ok(vec![(
compact.region_id.into(),
@@ -415,8 +417,10 @@ impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
}
}
#[derive(Debug)]
pub struct RegionFlushRequest {}
#[derive(Debug, Default)]
pub struct RegionFlushRequest {
pub row_group_size: Option<usize>,
}
#[derive(Debug)]
pub struct RegionCompactRequest {}

View File

@@ -20,7 +20,7 @@ use common_time::range::TimestampRange;
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;
use datafusion::parquet::file::metadata::RowGroupMetaData;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion_common::ToDFSchema;
use datafusion_expr::expr::InList;
use datafusion_expr::{Between, BinaryExpr, Operator};
@@ -37,7 +37,7 @@ mod stats;
#[derive(Clone)]
pub struct Predicate {
/// The schema of underlying storage.
/// The schema of the table that the expressions being applied.
schema: SchemaRef,
/// Physical expressions of this predicate.
exprs: Vec<Arc<dyn PhysicalExpr>>,
@@ -118,6 +118,31 @@ impl Predicate {
}
res
}
/// Evaluates the predicate against the `stats`.
/// Returns a vector of boolean values, among which `false` means the row group can be skipped.
pub fn prune_with_stats<S: PruningStatistics>(&self, stats: &S) -> Vec<bool> {
let mut res = vec![true; stats.num_containers()];
let arrow_schema = self.schema.arrow_schema();
for expr in &self.exprs {
match PruningPredicate::try_new(expr.clone(), arrow_schema.clone()) {
Ok(p) => match p.prune(stats) {
Ok(r) => {
for (curr_val, res) in r.into_iter().zip(res.iter_mut()) {
*res &= curr_val
}
}
Err(e) => {
warn!("Failed to prune row groups, error: {:?}", e);
}
},
Err(e) => {
error!("Failed to create predicate for expr, error: {:?}", e);
}
}
}
res
}
}
// tests for `TimeRangePredicateBuilder` locates in src/query/tests/time_range_filter_test.rs