diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index bd81f17d66..7cf5dc4e26 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -28,7 +28,6 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use common_error::ext::{BoxedError, ErrorExt}; use common_error::status_code::StatusCode; -use common_query::Output; use common_recordbatch::SendableRecordBatchStream; use mito2::engine::MitoEngine; use store_api::metadata::RegionMetadataRef; @@ -133,7 +132,7 @@ impl RegionEngine for MetricEngine { RegionRequest::Flush(_) => todo!(), RegionRequest::Compact(_) => todo!(), RegionRequest::Truncate(_) => todo!(), - /// It always Ok(0), all data is latest. + // It always Ok(0), all data is the latest. RegionRequest::Catchup(_) => Ok(0), }; diff --git a/src/metric-engine/src/engine/close.rs b/src/metric-engine/src/engine/close.rs index c708453edb..ad89fce36a 100644 --- a/src/metric-engine/src/engine/close.rs +++ b/src/metric-engine/src/engine/close.rs @@ -14,25 +14,15 @@ //! Close a metric region -use mito2::engine::MITO_ENGINE_NAME; -use object_store::util::join_dir; -use snafu::{OptionExt, ResultExt}; -use store_api::metric_engine_consts::{ - DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY, -}; +use snafu::ResultExt; use store_api::region_engine::RegionEngine; -use store_api::region_request::{ - AffectedRows, RegionCloseRequest, RegionOpenRequest, RegionRequest, -}; +use store_api::region_request::{AffectedRows, RegionCloseRequest, RegionRequest}; use store_api::storage::RegionId; use super::MetricEngineInner; -use crate::error::{ - CloseMitoRegionSnafu, Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu, - PhysicalRegionNotFoundSnafu, Result, -}; +use crate::error::{CloseMitoRegionSnafu, LogicalRegionNotFoundSnafu, Result}; use crate::metrics::PHYSICAL_REGION_COUNT; -use crate::{metadata_region, utils}; +use crate::utils; impl MetricEngineInner { pub async fn close_region( diff --git a/src/metric-engine/src/engine/drop.rs b/src/metric-engine/src/engine/drop.rs index 9399844720..03c13d2109 100644 --- a/src/metric-engine/src/engine/drop.rs +++ b/src/metric-engine/src/engine/drop.rs @@ -14,25 +14,17 @@ //! Drop a metric region -use mito2::engine::MITO_ENGINE_NAME; -use object_store::util::join_dir; -use snafu::{OptionExt, ResultExt}; -use store_api::metric_engine_consts::{ - DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR, PHYSICAL_TABLE_METADATA_KEY, -}; +use snafu::ResultExt; use store_api::region_engine::RegionEngine; -use store_api::region_request::{ - AffectedRows, RegionDropRequest, RegionOpenRequest, RegionRequest, -}; +use store_api::region_request::{AffectedRows, RegionDropRequest, RegionRequest}; use store_api::storage::RegionId; use super::MetricEngineInner; use crate::error::{ - CloseMitoRegionSnafu, Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu, - PhysicalRegionBusySnafu, PhysicalRegionNotFoundSnafu, Result, + CloseMitoRegionSnafu, LogicalRegionNotFoundSnafu, PhysicalRegionBusySnafu, Result, }; use crate::metrics::PHYSICAL_REGION_COUNT; -use crate::{metadata_region, utils}; +use crate::utils; impl MetricEngineInner { pub async fn drop_region( diff --git a/src/metric-engine/src/engine/open.rs b/src/metric-engine/src/engine/open.rs index fda8807a7b..df41d1cf12 100644 --- a/src/metric-engine/src/engine/open.rs +++ b/src/metric-engine/src/engine/open.rs @@ -26,7 +26,7 @@ use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::RegionId; use super::MetricEngineInner; -use crate::error::{Error, LogicalRegionNotFoundSnafu, OpenMitoRegionSnafu, Result}; +use crate::error::{OpenMitoRegionSnafu, Result}; use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT}; use crate::utils; diff --git a/src/metric-engine/src/engine/put.rs b/src/metric-engine/src/engine/put.rs index 6e9a32e18c..5ce2bf227b 100644 --- a/src/metric-engine/src/engine/put.rs +++ b/src/metric-engine/src/engine/put.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::hash::{BuildHasher, Hash, Hasher}; +use std::hash::Hash; use api::v1::value::ValueData; use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType}; @@ -29,7 +29,7 @@ use crate::error::{ ColumnNotFoundSnafu, ForbiddenPhysicalAlterSnafu, LogicalRegionNotFoundSnafu, Result, }; use crate::metrics::{FORBIDDEN_OPERATION_COUNT, MITO_OPERATION_ELAPSED}; -use crate::utils::{to_data_region_id, to_metadata_region_id}; +use crate::utils::to_data_region_id; // A random number const TSID_HASH_SEED: u32 = 846793005; @@ -84,6 +84,7 @@ impl MetricEngineInner { .await?; // write to data region + // TODO: retrieve table name self.modify_rows(logical_region_id.table_id(), &mut request.rows)?; self.data_region.write_data(data_region_id, request).await @@ -101,7 +102,6 @@ impl MetricEngineInner { request: &RegionPutRequest, ) -> Result<()> { // check if the region exists - let metadata_region_id = to_metadata_region_id(physical_region_id); let data_region_id = to_data_region_id(physical_region_id); let state = self.state.read().unwrap(); if !state.is_logical_region_exist(logical_region_id) { diff --git a/src/metric-engine/src/engine/read.rs b/src/metric-engine/src/engine/read.rs index cba9fbd343..154a9a61b0 100644 --- a/src/metric-engine/src/engine/read.rs +++ b/src/metric-engine/src/engine/read.rs @@ -20,10 +20,9 @@ use common_recordbatch::SendableRecordBatchStream; use common_telemetry::{error, info, tracing}; use datafusion::logical_expr; use snafu::{OptionExt, ResultExt}; -use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef}; +use store_api::metadata::{RegionMetadataBuilder, RegionMetadataRef}; use store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME; use store_api::region_engine::RegionEngine; -use store_api::storage::consts::ReservedColumnId; use store_api::storage::{RegionId, ScanRequest}; use crate::engine::MetricEngineInner; @@ -259,7 +258,6 @@ mod test { use store_api::region_request::RegionRequest; use super::*; - use crate::engine::alter; use crate::test_util::{ alter_logical_region_add_tag_columns, create_logical_region_request, TestEnv, }; @@ -271,7 +269,6 @@ mod test { let logical_region_id = env.default_logical_region_id(); let physical_region_id = env.default_physical_region_id(); - let data_region_id = utils::to_data_region_id(physical_region_id); // create another logical region let logical_region_id2 = RegionId::new(1112345678, 999); @@ -291,7 +288,7 @@ mod test { .unwrap(); // check explicit projection - let mut scan_req = ScanRequest { + let scan_req = ScanRequest { projection: Some(vec![0, 1, 2, 3, 4, 5, 6]), filters: vec![], ..Default::default() @@ -314,7 +311,7 @@ mod test { ); // check default projection - let mut scan_req = ScanRequest::default(); + let scan_req = ScanRequest::default(); let scan_req = env .metric() .inner diff --git a/src/metric-engine/src/engine/region_metadata.rs b/src/metric-engine/src/engine/region_metadata.rs index 5a3979f99c..150d1a324c 100644 --- a/src/metric-engine/src/engine/region_metadata.rs +++ b/src/metric-engine/src/engine/region_metadata.rs @@ -14,11 +14,7 @@ //! Implementation of retrieving logical region's region metadata. -use std::collections::{HashMap, HashSet}; - -use api::v1::SemanticType; -use store_api::metadata::{ColumnMetadata, RegionMetadata}; -use store_api::storage::consts::ReservedColumnId; +use store_api::metadata::ColumnMetadata; use store_api::storage::RegionId; use crate::engine::MetricEngineInner; diff --git a/src/metric-engine/src/lib.rs b/src/metric-engine/src/lib.rs index 9c2e269b73..3fe4640da4 100644 --- a/src/metric-engine/src/lib.rs +++ b/src/metric-engine/src/lib.rs @@ -53,7 +53,6 @@ #![feature(let_chains)] mod data_region; -#[allow(unused)] pub mod engine; pub mod error; mod metadata_region; diff --git a/src/metric-engine/src/metadata_region.rs b/src/metric-engine/src/metadata_region.rs index 97b73d7a9b..918f7f1023 100644 --- a/src/metric-engine/src/metadata_region.rs +++ b/src/metric-engine/src/metadata_region.rs @@ -229,13 +229,11 @@ impl MetadataRegion { format!("{COLUMN_PREFIX}{}_", region_id.as_u64()) } - #[allow(dead_code)] pub fn parse_region_key(key: &str) -> Option<&str> { key.strip_prefix(REGION_PREFIX) } /// Parse column key to (logical_region_id, column_name) - #[allow(dead_code)] pub fn parse_column_key(key: &str) -> Result> { if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) { let mut iter = stripped.split('_'); @@ -271,7 +269,6 @@ impl MetadataRegion { // simulate to `KvBackend` // // methods in this block assume the given region id is transformed. -#[allow(unused_variables)] impl MetadataRegion { /// Put if not exist, return if this put operation is successful (error other /// than "key already exist" will be wrapped in [Err]). diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs index afebe927fe..3c86d5cd3b 100644 --- a/src/mito2/src/memtable/time_series.rs +++ b/src/mito2/src/memtable/time_series.rs @@ -20,17 +20,14 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use api::v1::OpType; -use common_telemetry::{debug, error, trace}; +use common_recordbatch::filter::SimpleFilterEvaluator; +use common_telemetry::{debug, error}; use common_time::Timestamp; -use datafusion::physical_plan::PhysicalExpr; -use datafusion_common::ScalarValue; -use datafusion_expr::ColumnarValue; use datatypes::arrow; -use datatypes::arrow::array::{ArrayRef, BooleanArray}; -use datatypes::arrow::record_batch::RecordBatch; +use datatypes::arrow::array::ArrayRef; use datatypes::data_type::{ConcreteDataType, DataType}; use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector, VectorRef}; -use datatypes::value::ValueRef; +use datatypes::value::{Value, ValueRef}; use datatypes::vectors::{ Helper, UInt64Vector, UInt64VectorBuilder, UInt8Vector, UInt8VectorBuilder, }; @@ -39,10 +36,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::error::{ - ComputeArrowSnafu, ConvertVectorSnafu, NewRecordBatchSnafu, PrimaryKeyLengthMismatchSnafu, - Result, -}; +use crate::error::{ComputeArrowSnafu, ConvertVectorSnafu, PrimaryKeyLengthMismatchSnafu, Result}; use crate::flush::WriteBufferManagerRef; use crate::memtable::{ AllocTracker, BoxedBatchIterator, KeyValues, Memtable, MemtableBuilder, MemtableId, @@ -305,48 +299,39 @@ impl SeriesSet { /// Iterates all series in [SeriesSet]. fn iter_series(&self, projection: HashSet, predicate: Option) -> Iter { - let (primary_key_builders, primary_key_schema) = - primary_key_builders(&self.region_metadata, 1); + let primary_key_schema = primary_key_schema(&self.region_metadata); + let primary_key_datatypes = self + .region_metadata + .primary_key_columns() + .map(|pk| pk.column_schema.data_type.clone()) + .collect(); - let physical_exprs: Vec<_> = predicate - .and_then(|p| p.to_physical_exprs(&primary_key_schema).ok()) - .unwrap_or_default(); - - Iter { - metadata: self.region_metadata.clone(), - series: self.series.clone(), + Iter::new( + self.region_metadata.clone(), + self.series.clone(), projection, - last_key: None, - predicate: physical_exprs, - pk_schema: primary_key_schema, - primary_key_builders, - codec: self.codec.clone(), - metrics: Metrics::default(), - } + predicate, + primary_key_schema, + primary_key_datatypes, + self.codec.clone(), + ) } } -/// Creates primary key array builders and arrow's schema for primary keys of given region schema. -fn primary_key_builders( - region_metadata: &RegionMetadataRef, - num_pk_rows: usize, -) -> (Vec>, arrow::datatypes::SchemaRef) { - let (builders, fields): (_, Vec<_>) = region_metadata +/// Creates an arrow [SchemaRef](arrow::datatypes::SchemaRef) that only contains primary keys +/// of given region schema +fn primary_key_schema(region_metadata: &RegionMetadataRef) -> arrow::datatypes::SchemaRef { + let fields = region_metadata .primary_key_columns() .map(|pk| { - ( - pk.column_schema - .data_type - .create_mutable_vector(num_pk_rows), - arrow::datatypes::Field::new( - pk.column_schema.name.clone(), - pk.column_schema.data_type.as_arrow_type(), - pk.column_schema.is_nullable(), - ), + arrow::datatypes::Field::new( + pk.column_schema.name.clone(), + pk.column_schema.data_type.as_arrow_type(), + pk.column_schema.is_nullable(), ) }) - .unzip(); - (builders, Arc::new(arrow::datatypes::Schema::new(fields))) + .collect::>(); + Arc::new(arrow::datatypes::Schema::new(fields)) } /// Metrics for reading the memtable. @@ -369,13 +354,45 @@ struct Iter { series: Arc, projection: HashSet, last_key: Option>, - predicate: Vec>, + predicate: Vec, pk_schema: arrow::datatypes::SchemaRef, - primary_key_builders: Vec>, + pk_datatypes: Vec, codec: Arc, metrics: Metrics, } +impl Iter { + pub(crate) fn new( + metadata: RegionMetadataRef, + series: Arc, + projection: HashSet, + predicate: Option, + pk_schema: arrow::datatypes::SchemaRef, + pk_datatypes: Vec, + codec: Arc, + ) -> Self { + let simple_filters = predicate + .map(|p| { + p.exprs() + .iter() + .filter_map(|f| SimpleFilterEvaluator::try_new(f.df_expr())) + .collect::>() + }) + .unwrap_or_default(); + Self { + metadata, + series, + projection, + last_key: None, + predicate: simple_filters, + pk_schema, + pk_datatypes, + codec, + metrics: Metrics::default(), + } + } +} + impl Drop for Iter { fn drop(&mut self) { debug!( @@ -415,7 +432,7 @@ impl Iterator for Iter { &self.codec, primary_key.as_slice(), &mut series, - &mut self.primary_key_builders, + &self.pk_datatypes, self.pk_schema.clone(), &self.predicate, ) @@ -446,88 +463,48 @@ fn prune_primary_key( codec: &Arc, pk: &[u8], series: &mut Series, - builders: &mut [Box], + datatypes: &[ConcreteDataType], pk_schema: arrow::datatypes::SchemaRef, - predicate: &[Arc], + predicates: &[SimpleFilterEvaluator], ) -> bool { // no primary key, we simply return true. if pk_schema.fields().is_empty() { return true; } - if let Some(rb) = series.pk_cache.as_ref() { - prune_inner(predicate, rb).unwrap_or(true) + // retrieve primary key values from cache or decode from bytes. + let pk_values = if let Some(pk_values) = series.pk_cache.as_ref() { + pk_values } else { - let rb = match pk_to_record_batch(codec, pk, builders, pk_schema) { - Ok(rb) => rb, - Err(e) => { - error!(e; "Failed to build record batch from primary keys"); - return true; - } - }; - let res = prune_inner(predicate, &rb).unwrap_or(true); - series.update_pk_cache(rb); - res - } -} + let pk_values = codec.decode(pk); + if let Err(e) = pk_values { + error!(e; "Failed to decode primary key"); + return true; + } + series.update_pk_cache(pk_values.unwrap()); + series.pk_cache.as_ref().unwrap() + }; -fn prune_inner(predicates: &[Arc], primary_key: &RecordBatch) -> Result { - for expr in predicates { - // evaluate every filter against primary key - let Ok(eva) = expr.evaluate(primary_key) else { + // evaluate predicates against primary key values + let mut result = true; + for predicate in predicates { + // ignore predicates that are not referencing primary key columns + let Ok(index) = pk_schema.index_of(predicate.column_name()) else { continue; }; - let result = match eva { - ColumnarValue::Array(array) => { - let predicate_array = array.as_any().downcast_ref::().unwrap(); - predicate_array - .into_iter() - .map(|x| x.unwrap_or(true)) - .next() - .unwrap_or(true) - } - // result was a column - ColumnarValue::Scalar(ScalarValue::Boolean(v)) => v.unwrap_or(true), - _ => { - unreachable!("Unexpected primary key record batch evaluation result: {:?}, primary key: {:?}", eva, primary_key); - } - }; - trace!( - "Evaluate primary key {:?} against filter: {:?}, result: {:?}", - primary_key, - expr, - result - ); - if !result { - return Ok(false); - } + // Safety: arrow schema and datatypes are constructed from the same source. + let scalar_value = pk_values[index] + .try_to_scalar_value(&datatypes[index]) + .unwrap(); + result &= predicate.evaluate_scalar(&scalar_value).unwrap_or(true); } - Ok(true) -} -fn pk_to_record_batch( - codec: &Arc, - bytes: &[u8], - builders: &mut [Box], - pk_schema: arrow::datatypes::SchemaRef, -) -> Result { - let pk_values = codec.decode(bytes).unwrap(); - - let arrays = builders - .iter_mut() - .zip(pk_values.iter()) - .map(|(builder, pk_value)| { - builder.push_value_ref(pk_value.as_value_ref()); - builder.to_vector().to_arrow_array() - }) - .collect(); - - RecordBatch::try_new(pk_schema, arrays).context(NewRecordBatchSnafu) + result } /// A `Series` holds a list of field values of some given primary key. struct Series { - pk_cache: Option, + pk_cache: Option>, active: ValueBuilder, frozen: Vec, } @@ -546,8 +523,8 @@ impl Series { self.active.push(ts, sequence, op_type as u8, values); } - fn update_pk_cache(&mut self, pk_batch: RecordBatch) { - self.pk_cache = Some(pk_batch); + fn update_pk_cache(&mut self, pk_values: Vec) { + self.pk_cache = Some(pk_values); } /// Freezes the active part and push it to `frozen`. diff --git a/src/mito2/src/worker/handle_write.rs b/src/mito2/src/worker/handle_write.rs index e10012d574..495af6cb05 100644 --- a/src/mito2/src/worker/handle_write.rs +++ b/src/mito2/src/worker/handle_write.rs @@ -60,7 +60,13 @@ impl RegionWorkerLoop { return; } - let mut region_ctxs = self.prepare_region_write_ctx(write_requests); + // Prepare write context. + let mut region_ctxs = { + let _timer = WRITE_STAGE_ELAPSED + .with_label_values(&["prepare_ctx"]) + .start_timer(); + self.prepare_region_write_ctx(write_requests) + }; // Write to WAL. {