diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 2f9bdff7b0..2b54fc3d87 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -26,7 +26,7 @@ pub(crate) mod scan_util; pub(crate) mod seq_scan; pub(crate) mod unordered_scan; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::time::Duration; @@ -40,7 +40,7 @@ use datatypes::arrow::compute::SortOptions; use datatypes::arrow::row::{RowConverter, SortField}; use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector}; use datatypes::types::TimestampType; -use datatypes::value::ValueRef; +use datatypes::value::{Value, ValueRef}; use datatypes::vectors::{ BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector, @@ -58,7 +58,7 @@ use crate::error::{ use crate::memtable::BoxedBatchIterator; use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED}; use crate::read::prune::PruneReader; -use crate::row_converter::CompositeValues; +use crate::row_converter::{CompositeValues, PrimaryKeyCodec}; /// Storage internal representation of a batch of rows for a primary key (time series). /// @@ -82,6 +82,8 @@ pub struct Batch { op_types: Arc, /// Fields organized in columnar format. fields: Vec, + /// Cache for field index lookup. + fields_idx: Option>, } impl Batch { @@ -229,6 +231,7 @@ impl Batch { sequences: Arc::new(self.sequences.get_slice(offset, length)), op_types: Arc::new(self.op_types.get_slice(offset, length)), fields, + fields_idx: self.fields_idx.clone(), } } @@ -588,6 +591,47 @@ impl Batch { other.first_sequence() )) } + + /// Returns the value of the column in the primary key. + /// + /// Lazily decodes the primary key and caches the result. + pub fn pk_col_value( + &mut self, + codec: &dyn PrimaryKeyCodec, + col_idx_in_pk: usize, + column_id: ColumnId, + ) -> Result> { + if self.pk_values.is_none() { + self.pk_values = Some(codec.decode(&self.primary_key)?); + } + + let pk_values = self.pk_values.as_ref().unwrap(); + Ok(match pk_values { + CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v), + CompositeValues::Sparse(values) => values.get(&column_id), + }) + } + + /// Returns values of the field in the batch. + /// + /// Lazily caches the field index. + pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> { + if self.fields_idx.is_none() { + self.fields_idx = Some( + self.fields + .iter() + .enumerate() + .map(|(i, c)| (c.column_id, i)) + .collect(), + ); + } + + self.fields_idx + .as_ref() + .unwrap() + .get(&column_id) + .map(|&idx| &self.fields[idx]) + } } /// A struct to check the batch is monotonic. @@ -876,6 +920,7 @@ impl BatchBuilder { sequences, op_types, fields: self.fields, + fields_idx: None, }) } } @@ -1019,8 +1064,12 @@ impl ScannerMetrics { #[cfg(test)] mod tests { + use store_api::codec::PrimaryKeyEncoding; + use store_api::storage::consts::ReservedColumnId; + use super::*; use crate::error::Error; + use crate::row_converter::{self, build_primary_key_codec_with_fields}; use crate::test_util::new_batch_builder; fn new_batch( @@ -1392,4 +1441,88 @@ mod tests { ); assert_eq!(expect, batch); } + + #[test] + fn test_get_value() { + let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse]; + + for encoding in encodings { + let codec = build_primary_key_codec_with_fields( + encoding, + [ + ( + ReservedColumnId::table_id(), + row_converter::SortField::new(ConcreteDataType::uint32_datatype()), + ), + ( + ReservedColumnId::tsid(), + row_converter::SortField::new(ConcreteDataType::uint64_datatype()), + ), + ( + 100, + row_converter::SortField::new(ConcreteDataType::string_datatype()), + ), + ( + 200, + row_converter::SortField::new(ConcreteDataType::string_datatype()), + ), + ] + .into_iter(), + ); + + let values = [ + Value::UInt32(1000), + Value::UInt64(2000), + Value::String("abcdefgh".into()), + Value::String("zyxwvu".into()), + ]; + let mut buf = vec![]; + codec + .encode_values( + &[ + (ReservedColumnId::table_id(), values[0].clone()), + (ReservedColumnId::tsid(), values[1].clone()), + (100, values[2].clone()), + (200, values[3].clone()), + ], + &mut buf, + ) + .unwrap(); + + let field_col_id = 2; + let mut batch = new_batch_builder( + &buf, + &[1, 2, 3], + &[1, 1, 1], + &[OpType::Put, OpType::Put, OpType::Put], + field_col_id, + &[42, 43, 44], + ) + .build() + .unwrap(); + + let v = batch + .pk_col_value(&*codec, 0, ReservedColumnId::table_id()) + .unwrap() + .unwrap(); + assert_eq!(values[0], *v); + + let v = batch + .pk_col_value(&*codec, 1, ReservedColumnId::tsid()) + .unwrap() + .unwrap(); + assert_eq!(values[1], *v); + + let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap(); + assert_eq!(values[2], *v); + + let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap(); + assert_eq!(values[3], *v); + + let v = batch.field_col_value(field_col_id).unwrap(); + assert_eq!(v.data.get(0), Value::UInt64(42)); + assert_eq!(v.data.get(1), Value::UInt64(43)); + assert_eq!(v.data.get(2), Value::UInt64(44)); + } + } } diff --git a/src/mito2/src/row_converter.rs b/src/mito2/src/row_converter.rs index e9fd1a5539..1cafc3e7ae 100644 --- a/src/mito2/src/row_converter.rs +++ b/src/mito2/src/row_converter.rs @@ -55,6 +55,7 @@ pub trait PrimaryKeyFilter: Send + Sync { fn matches(&mut self, pk: &[u8]) -> bool; } +/// Composite values decoded from primary key bytes. #[derive(Debug, Clone, PartialEq, Eq)] pub enum CompositeValues { Dense(Vec<(ColumnId, Value)>), diff --git a/src/mito2/src/sst/index/bloom_filter/creator.rs b/src/mito2/src/sst/index/bloom_filter/creator.rs index 9dde4d851f..da79677b31 100644 --- a/src/mito2/src/sst/index/bloom_filter/creator.rs +++ b/src/mito2/src/sst/index/bloom_filter/creator.rs @@ -16,7 +16,7 @@ use std::collections::HashMap; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use datatypes::schema::SkippingIndexType; use index::bloom_filter::creator::BloomFilterCreator; use puffin::puffin_manager::{PuffinWriter, PutOptions}; @@ -30,7 +30,7 @@ use crate::error::{ PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result, }; use crate::read::Batch; -use crate::row_converter::{CompositeValues, SortField}; +use crate::row_converter::SortField; use crate::sst::file::FileId; use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE; use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; @@ -195,68 +195,63 @@ impl BloomFilterIndexer { let n = batch.num_rows(); guard.inc_row_count(n); - // TODO(weny, zhenchi): lazy decode - if batch.pk_values().is_none() { - let values = self.codec.decode(batch.primary_key())?; - batch.set_pk_values(values); - } + for (col_id, creator) in &mut self.creators { + match self.codec.pk_col_info(*col_id) { + // tags + Some(col_info) => { + let pk_idx = col_info.idx; + let field = &col_info.field; + let elems = batch + .pk_col_value(self.codec.decoder(), pk_idx, *col_id)? + .filter(|v| !v.is_null()) + .map(|v| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value( + v.as_value_ref(), + field, + &mut buf, + )?; + Ok(buf) + }) + .transpose()?; + creator + .push_n_row_elems(n, elems) + .await + .context(PushBloomFilterValueSnafu)?; + } + // fields + None => { + let Some(values) = batch.field_col_value(*col_id) else { + debug!( + "Column {} not found in the batch during building bloom filter index", + col_id + ); + continue; + }; + let sort_field = SortField::new(values.data.data_type()); + for i in 0..n { + let value = values.data.get_ref(i); + let elems = (!value.is_null()) + .then(|| { + let mut buf = vec![]; + IndexValueCodec::encode_nonnull_value( + value, + &sort_field, + &mut buf, + )?; + Ok(buf) + }) + .transpose()?; - // Safety: the primary key is decoded - let values = batch.pk_values().unwrap(); - // Tags - for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() { - let Some(creator) = self.creators.get_mut(col_id) else { - continue; - }; - - let value = match &values { - CompositeValues::Dense(vec) => { - let value = &vec[idx].1; - if value.is_null() { - None - } else { - Some(value) + creator + .push_row_elems(elems) + .await + .context(PushBloomFilterValueSnafu)?; } } - CompositeValues::Sparse(sparse_values) => sparse_values.get(col_id), - }; - - let elems = value - .map(|v| { - let mut buf = vec![]; - IndexValueCodec::encode_nonnull_value(v.as_value_ref(), field, &mut buf)?; - Ok(buf) - }) - .transpose()?; - creator - .push_n_row_elems(n, elems) - .await - .context(PushBloomFilterValueSnafu)?; - } - - // Fields - for field in batch.fields() { - let Some(creator) = self.creators.get_mut(&field.column_id) else { - continue; - }; - - let sort_field = SortField::new(field.data.data_type()); - for i in 0..n { - let value = field.data.get_ref(i); - let elems = (!value.is_null()) - .then(|| { - let mut buf = vec![]; - IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)?; - Ok(buf) - }) - .transpose()?; - - creator - .push_row_elems(elems) - .await - .context(PushBloomFilterValueSnafu)?; } } + Ok(()) } diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs index 5d08cc7b29..c4ea9941aa 100644 --- a/src/mito2/src/sst/index/codec.rs +++ b/src/mito2/src/sst/index/codec.rs @@ -24,9 +24,7 @@ use store_api::metadata::ColumnMetadata; use store_api::storage::ColumnId; use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result}; -use crate::row_converter::{ - build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec, SortField, -}; +use crate::row_converter::{build_primary_key_codec_with_fields, PrimaryKeyCodec, SortField}; /// Encodes index values according to their data types for sorting and storage use. pub struct IndexValueCodec; @@ -64,13 +62,21 @@ impl IndexValueCodec { } } +pub struct PkColInfo { + pub idx: usize, + pub field: SortField, +} + +impl PkColInfo { + pub fn new(idx: usize, field: SortField) -> Self { + Self { idx, field } + } +} + /// Decodes primary key values into their corresponding column ids, data types and values. pub struct IndexValuesCodec { - /// Tuples containing column id and its corresponding index_name (result of `to_string` on ColumnId), - /// to minimize redundant `to_string` calls. - column_ids: HashMap, - /// The data types of tag columns. - fields: Vec<(ColumnId, SortField)>, + /// Column ids -> column info mapping. + columns_mapping: HashMap, /// The decoder for the primary key. decoder: Arc, } @@ -81,42 +87,31 @@ impl IndexValuesCodec { primary_key_encoding: PrimaryKeyEncoding, tag_columns: impl Iterator, ) -> Self { - let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns - .map(|column| { - ( - (column.column_id, column.column_id.to_string()), - ( - column.column_id, - SortField::new(column.column_schema.data_type.clone()), - ), - ) - }) - .unzip(); + let (columns_mapping, fields): (HashMap, Vec<(ColumnId, SortField)>) = + tag_columns + .enumerate() + .map(|(idx, column)| { + let col_id = column.column_id; + let field = SortField::new(column.column_schema.data_type.clone()); + let pk_col_info = PkColInfo::new(idx, field.clone()); + ((col_id, pk_col_info), (col_id, field)) + }) + .unzip(); - let column_ids = column_ids.into_iter().collect(); - let decoder = - build_primary_key_codec_with_fields(primary_key_encoding, fields.clone().into_iter()); + let decoder = build_primary_key_codec_with_fields(primary_key_encoding, fields.into_iter()); Self { - column_ids, - fields, + columns_mapping, decoder, } } - /// Returns the column ids of the index. - pub fn column_ids(&self) -> &HashMap { - &self.column_ids + pub fn pk_col_info(&self, column_id: ColumnId) -> Option<&PkColInfo> { + self.columns_mapping.get(&column_id) } - /// Returns the fields of the index. - pub fn fields(&self) -> &[(ColumnId, SortField)] { - &self.fields - } - - /// Decodes a primary key into its corresponding column ids, data types and values. - pub fn decode(&self, primary_key: &[u8]) -> Result { - self.decoder.decode(primary_key) + pub fn decoder(&self) -> &dyn PrimaryKeyCodec { + self.decoder.as_ref() } } @@ -185,7 +180,7 @@ mod tests { let codec = IndexValuesCodec::from_tag_columns(PrimaryKeyEncoding::Dense, tag_columns.iter()); - let values = codec.decode(&primary_key).unwrap().into_dense(); + let values = codec.decoder().decode(&primary_key).unwrap().into_dense(); assert_eq!(values.len(), 2); assert_eq!(values[0], Value::Null); diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 71edc56e07..8bb664405a 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -17,7 +17,7 @@ use std::num::NonZeroUsize; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use common_telemetry::warn; +use common_telemetry::{debug, warn}; use index::inverted_index::create::sort::external_sort::ExternalSorter; use index::inverted_index::create::sort_create::SortIndexCreator; use index::inverted_index::create::InvertedIndexCreator; @@ -34,7 +34,7 @@ use crate::error::{ PushIndexValueSnafu, Result, }; use crate::read::Batch; -use crate::row_converter::{CompositeValues, SortField}; +use crate::row_converter::SortField; use crate::sst::file::FileId; use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; use crate::sst::index::intermediate::{ @@ -71,8 +71,8 @@ pub struct InvertedIndexer { /// The memory usage of the index creator. memory_usage: Arc, - /// Ids of indexed columns. - indexed_column_ids: HashSet, + /// Ids of indexed columns and their names (`to_string` of the column id). + indexed_column_ids: Vec<(ColumnId, String)>, } impl InvertedIndexer { @@ -105,6 +105,13 @@ impl InvertedIndexer { metadata.primary_key_encoding, metadata.primary_key_columns(), ); + let indexed_column_ids = indexed_column_ids + .into_iter() + .map(|col_id| { + let col_id_str = col_id.to_string(); + (col_id, col_id_str) + }) + .collect(); Self { codec, index_creator, @@ -183,73 +190,61 @@ impl InvertedIndexer { let n = batch.num_rows(); guard.inc_row_count(n); - // TODO(weny, zhenchi): lazy decode - if batch.pk_values().is_none() { - let values = self.codec.decode(batch.primary_key())?; - batch.set_pk_values(values); - } + for (col_id, col_id_str) in &self.indexed_column_ids { + match self.codec.pk_col_info(*col_id) { + // pk + Some(col_info) => { + let pk_idx = col_info.idx; + let field = &col_info.field; + let value = batch + .pk_col_value(self.codec.decoder(), pk_idx, *col_id)? + .filter(|v| !v.is_null()) + .map(|v| { + self.value_buf.clear(); + IndexValueCodec::encode_nonnull_value( + v.as_value_ref(), + field, + &mut self.value_buf, + )?; + Ok(self.value_buf.as_slice()) + }) + .transpose()?; - // Safety: the primary key is decoded - let values = batch.pk_values().unwrap(); - for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() { - if !self.indexed_column_ids.contains(col_id) { - continue; - } - - let value = match &values { - CompositeValues::Dense(vec) => { - let value = &vec[idx].1; - if value.is_null() { - None - } else { - Some(value) - } + self.index_creator + .push_with_name_n(col_id_str, value, n) + .await + .context(PushIndexValueSnafu)?; } - CompositeValues::Sparse(sparse_values) => sparse_values.get(col_id), - }; - - if let Some(value) = value.as_ref() { - self.value_buf.clear(); - IndexValueCodec::encode_nonnull_value( - value.as_value_ref(), - field, - &mut self.value_buf, - )?; - } - - // Safety: the column id is guaranteed to be in the map - let col_id_str = self.codec.column_ids().get(col_id).unwrap(); - - // non-null value -> Some(encoded_bytes), null value -> None - let value = value.is_some().then_some(self.value_buf.as_slice()); - self.index_creator - .push_with_name_n(col_id_str, value, n) - .await - .context(PushIndexValueSnafu)?; - } - - for field in batch.fields() { - if !self.indexed_column_ids.contains(&field.column_id) { - continue; - } - - let sort_field = SortField::new(field.data.data_type()); - let col_id_str = field.column_id.to_string(); - for i in 0..n { - self.value_buf.clear(); - let value = field.data.get_ref(i); - - if value.is_null() { - self.index_creator - .push_with_name(&col_id_str, None) - .await - .context(PushIndexValueSnafu)?; - } else { - IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut self.value_buf)?; - self.index_creator - .push_with_name(&col_id_str, Some(&self.value_buf)) - .await - .context(PushIndexValueSnafu)?; + // fields + None => { + let Some(values) = batch.field_col_value(*col_id) else { + debug!( + "Column {} not found in the batch during building inverted index", + col_id + ); + continue; + }; + let sort_field = SortField::new(values.data.data_type()); + for i in 0..n { + self.value_buf.clear(); + let value = values.data.get_ref(i); + if value.is_null() { + self.index_creator + .push_with_name(col_id_str, None) + .await + .context(PushIndexValueSnafu)?; + } else { + IndexValueCodec::encode_nonnull_value( + value, + &sort_field, + &mut self.value_buf, + )?; + self.index_creator + .push_with_name(col_id_str, Some(&self.value_buf)) + .await + .context(PushIndexValueSnafu)?; + } + } } } } @@ -313,7 +308,7 @@ impl InvertedIndexer { } pub fn column_ids(&self) -> impl Iterator + '_ { - self.indexed_column_ids.iter().copied() + self.indexed_column_ids.iter().map(|(col_id, _)| *col_id) } pub fn memory_usage(&self) -> usize {