refactor: change traversal order during index construction (#5498)

* refactor: change traversal order during index construction

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>

* chain

Co-authored-by: jeremyhi <jiachun_feng@proton.me>

---------

Signed-off-by: Zhenchi <zhongzc_arch@outlook.com>
Co-authored-by: jeremyhi <jiachun_feng@proton.me>
This commit is contained in:
Zhenchi
2025-02-10 14:31:35 +08:00
committed by GitHub
parent 15f4b10065
commit c19ecd7ea2
5 changed files with 287 additions and 168 deletions

View File

@@ -26,7 +26,7 @@ pub(crate) mod scan_util;
pub(crate) mod seq_scan;
pub(crate) mod unordered_scan;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
@@ -40,7 +40,7 @@ use datatypes::arrow::compute::SortOptions;
use datatypes::arrow::row::{RowConverter, SortField};
use datatypes::prelude::{ConcreteDataType, DataType, ScalarVector};
use datatypes::types::TimestampType;
use datatypes::value::ValueRef;
use datatypes::value::{Value, ValueRef};
use datatypes::vectors::{
BooleanVector, Helper, TimestampMicrosecondVector, TimestampMillisecondVector,
TimestampNanosecondVector, TimestampSecondVector, UInt32Vector, UInt64Vector, UInt8Vector,
@@ -58,7 +58,7 @@ use crate::error::{
use crate::memtable::BoxedBatchIterator;
use crate::metrics::{READ_BATCHES_RETURN, READ_ROWS_RETURN, READ_STAGE_ELAPSED};
use crate::read::prune::PruneReader;
use crate::row_converter::CompositeValues;
use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
/// Storage internal representation of a batch of rows for a primary key (time series).
///
@@ -82,6 +82,8 @@ pub struct Batch {
op_types: Arc<UInt8Vector>,
/// Fields organized in columnar format.
fields: Vec<BatchColumn>,
/// Cache for field index lookup.
fields_idx: Option<HashMap<ColumnId, usize>>,
}
impl Batch {
@@ -229,6 +231,7 @@ impl Batch {
sequences: Arc::new(self.sequences.get_slice(offset, length)),
op_types: Arc::new(self.op_types.get_slice(offset, length)),
fields,
fields_idx: self.fields_idx.clone(),
}
}
@@ -588,6 +591,47 @@ impl Batch {
other.first_sequence()
))
}
/// Returns the value of the column in the primary key.
///
/// Lazily decodes the primary key and caches the result.
pub fn pk_col_value(
&mut self,
codec: &dyn PrimaryKeyCodec,
col_idx_in_pk: usize,
column_id: ColumnId,
) -> Result<Option<&Value>> {
if self.pk_values.is_none() {
self.pk_values = Some(codec.decode(&self.primary_key)?);
}
let pk_values = self.pk_values.as_ref().unwrap();
Ok(match pk_values {
CompositeValues::Dense(values) => values.get(col_idx_in_pk).map(|(_, v)| v),
CompositeValues::Sparse(values) => values.get(&column_id),
})
}
/// Returns values of the field in the batch.
///
/// Lazily caches the field index.
pub fn field_col_value(&mut self, column_id: ColumnId) -> Option<&BatchColumn> {
if self.fields_idx.is_none() {
self.fields_idx = Some(
self.fields
.iter()
.enumerate()
.map(|(i, c)| (c.column_id, i))
.collect(),
);
}
self.fields_idx
.as_ref()
.unwrap()
.get(&column_id)
.map(|&idx| &self.fields[idx])
}
}
/// A struct to check the batch is monotonic.
@@ -876,6 +920,7 @@ impl BatchBuilder {
sequences,
op_types,
fields: self.fields,
fields_idx: None,
})
}
}
@@ -1019,8 +1064,12 @@ impl ScannerMetrics {
#[cfg(test)]
mod tests {
use store_api::codec::PrimaryKeyEncoding;
use store_api::storage::consts::ReservedColumnId;
use super::*;
use crate::error::Error;
use crate::row_converter::{self, build_primary_key_codec_with_fields};
use crate::test_util::new_batch_builder;
fn new_batch(
@@ -1392,4 +1441,88 @@ mod tests {
);
assert_eq!(expect, batch);
}
#[test]
fn test_get_value() {
let encodings = [PrimaryKeyEncoding::Dense, PrimaryKeyEncoding::Sparse];
for encoding in encodings {
let codec = build_primary_key_codec_with_fields(
encoding,
[
(
ReservedColumnId::table_id(),
row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
),
(
ReservedColumnId::tsid(),
row_converter::SortField::new(ConcreteDataType::uint64_datatype()),
),
(
100,
row_converter::SortField::new(ConcreteDataType::string_datatype()),
),
(
200,
row_converter::SortField::new(ConcreteDataType::string_datatype()),
),
]
.into_iter(),
);
let values = [
Value::UInt32(1000),
Value::UInt64(2000),
Value::String("abcdefgh".into()),
Value::String("zyxwvu".into()),
];
let mut buf = vec![];
codec
.encode_values(
&[
(ReservedColumnId::table_id(), values[0].clone()),
(ReservedColumnId::tsid(), values[1].clone()),
(100, values[2].clone()),
(200, values[3].clone()),
],
&mut buf,
)
.unwrap();
let field_col_id = 2;
let mut batch = new_batch_builder(
&buf,
&[1, 2, 3],
&[1, 1, 1],
&[OpType::Put, OpType::Put, OpType::Put],
field_col_id,
&[42, 43, 44],
)
.build()
.unwrap();
let v = batch
.pk_col_value(&*codec, 0, ReservedColumnId::table_id())
.unwrap()
.unwrap();
assert_eq!(values[0], *v);
let v = batch
.pk_col_value(&*codec, 1, ReservedColumnId::tsid())
.unwrap()
.unwrap();
assert_eq!(values[1], *v);
let v = batch.pk_col_value(&*codec, 2, 100).unwrap().unwrap();
assert_eq!(values[2], *v);
let v = batch.pk_col_value(&*codec, 3, 200).unwrap().unwrap();
assert_eq!(values[3], *v);
let v = batch.field_col_value(field_col_id).unwrap();
assert_eq!(v.data.get(0), Value::UInt64(42));
assert_eq!(v.data.get(1), Value::UInt64(43));
assert_eq!(v.data.get(2), Value::UInt64(44));
}
}
}

View File

@@ -55,6 +55,7 @@ pub trait PrimaryKeyFilter: Send + Sync {
fn matches(&mut self, pk: &[u8]) -> bool;
}
/// Composite values decoded from primary key bytes.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CompositeValues {
Dense(Vec<(ColumnId, Value)>),

View File

@@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use datatypes::schema::SkippingIndexType;
use index::bloom_filter::creator::BloomFilterCreator;
use puffin::puffin_manager::{PuffinWriter, PutOptions};
@@ -30,7 +30,7 @@ use crate::error::{
PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
};
use crate::read::Batch;
use crate::row_converter::{CompositeValues, SortField};
use crate::row_converter::SortField;
use crate::sst::file::FileId;
use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
@@ -195,68 +195,63 @@ impl BloomFilterIndexer {
let n = batch.num_rows();
guard.inc_row_count(n);
// TODO(weny, zhenchi): lazy decode
if batch.pk_values().is_none() {
let values = self.codec.decode(batch.primary_key())?;
batch.set_pk_values(values);
}
for (col_id, creator) in &mut self.creators {
match self.codec.pk_col_info(*col_id) {
// tags
Some(col_info) => {
let pk_idx = col_info.idx;
let field = &col_info.field;
let elems = batch
.pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
.filter(|v| !v.is_null())
.map(|v| {
let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(
v.as_value_ref(),
field,
&mut buf,
)?;
Ok(buf)
})
.transpose()?;
creator
.push_n_row_elems(n, elems)
.await
.context(PushBloomFilterValueSnafu)?;
}
// fields
None => {
let Some(values) = batch.field_col_value(*col_id) else {
debug!(
"Column {} not found in the batch during building bloom filter index",
col_id
);
continue;
};
let sort_field = SortField::new(values.data.data_type());
for i in 0..n {
let value = values.data.get_ref(i);
let elems = (!value.is_null())
.then(|| {
let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(
value,
&sort_field,
&mut buf,
)?;
Ok(buf)
})
.transpose()?;
// Safety: the primary key is decoded
let values = batch.pk_values().unwrap();
// Tags
for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() {
let Some(creator) = self.creators.get_mut(col_id) else {
continue;
};
let value = match &values {
CompositeValues::Dense(vec) => {
let value = &vec[idx].1;
if value.is_null() {
None
} else {
Some(value)
creator
.push_row_elems(elems)
.await
.context(PushBloomFilterValueSnafu)?;
}
}
CompositeValues::Sparse(sparse_values) => sparse_values.get(col_id),
};
let elems = value
.map(|v| {
let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(v.as_value_ref(), field, &mut buf)?;
Ok(buf)
})
.transpose()?;
creator
.push_n_row_elems(n, elems)
.await
.context(PushBloomFilterValueSnafu)?;
}
// Fields
for field in batch.fields() {
let Some(creator) = self.creators.get_mut(&field.column_id) else {
continue;
};
let sort_field = SortField::new(field.data.data_type());
for i in 0..n {
let value = field.data.get_ref(i);
let elems = (!value.is_null())
.then(|| {
let mut buf = vec![];
IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)?;
Ok(buf)
})
.transpose()?;
creator
.push_row_elems(elems)
.await
.context(PushBloomFilterValueSnafu)?;
}
}
Ok(())
}

View File

@@ -24,9 +24,7 @@ use store_api::metadata::ColumnMetadata;
use store_api::storage::ColumnId;
use crate::error::{FieldTypeMismatchSnafu, IndexEncodeNullSnafu, Result};
use crate::row_converter::{
build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec, SortField,
};
use crate::row_converter::{build_primary_key_codec_with_fields, PrimaryKeyCodec, SortField};
/// Encodes index values according to their data types for sorting and storage use.
pub struct IndexValueCodec;
@@ -64,13 +62,21 @@ impl IndexValueCodec {
}
}
pub struct PkColInfo {
pub idx: usize,
pub field: SortField,
}
impl PkColInfo {
pub fn new(idx: usize, field: SortField) -> Self {
Self { idx, field }
}
}
/// Decodes primary key values into their corresponding column ids, data types and values.
pub struct IndexValuesCodec {
/// Tuples containing column id and its corresponding index_name (result of `to_string` on ColumnId),
/// to minimize redundant `to_string` calls.
column_ids: HashMap<ColumnId, String>,
/// The data types of tag columns.
fields: Vec<(ColumnId, SortField)>,
/// Column ids -> column info mapping.
columns_mapping: HashMap<ColumnId, PkColInfo>,
/// The decoder for the primary key.
decoder: Arc<dyn PrimaryKeyCodec>,
}
@@ -81,42 +87,31 @@ impl IndexValuesCodec {
primary_key_encoding: PrimaryKeyEncoding,
tag_columns: impl Iterator<Item = &'a ColumnMetadata>,
) -> Self {
let (column_ids, fields): (Vec<_>, Vec<_>) = tag_columns
.map(|column| {
(
(column.column_id, column.column_id.to_string()),
(
column.column_id,
SortField::new(column.column_schema.data_type.clone()),
),
)
})
.unzip();
let (columns_mapping, fields): (HashMap<ColumnId, PkColInfo>, Vec<(ColumnId, SortField)>) =
tag_columns
.enumerate()
.map(|(idx, column)| {
let col_id = column.column_id;
let field = SortField::new(column.column_schema.data_type.clone());
let pk_col_info = PkColInfo::new(idx, field.clone());
((col_id, pk_col_info), (col_id, field))
})
.unzip();
let column_ids = column_ids.into_iter().collect();
let decoder =
build_primary_key_codec_with_fields(primary_key_encoding, fields.clone().into_iter());
let decoder = build_primary_key_codec_with_fields(primary_key_encoding, fields.into_iter());
Self {
column_ids,
fields,
columns_mapping,
decoder,
}
}
/// Returns the column ids of the index.
pub fn column_ids(&self) -> &HashMap<ColumnId, String> {
&self.column_ids
pub fn pk_col_info(&self, column_id: ColumnId) -> Option<&PkColInfo> {
self.columns_mapping.get(&column_id)
}
/// Returns the fields of the index.
pub fn fields(&self) -> &[(ColumnId, SortField)] {
&self.fields
}
/// Decodes a primary key into its corresponding column ids, data types and values.
pub fn decode(&self, primary_key: &[u8]) -> Result<CompositeValues> {
self.decoder.decode(primary_key)
pub fn decoder(&self) -> &dyn PrimaryKeyCodec {
self.decoder.as_ref()
}
}
@@ -185,7 +180,7 @@ mod tests {
let codec =
IndexValuesCodec::from_tag_columns(PrimaryKeyEncoding::Dense, tag_columns.iter());
let values = codec.decode(&primary_key).unwrap().into_dense();
let values = codec.decoder().decode(&primary_key).unwrap().into_dense();
assert_eq!(values.len(), 2);
assert_eq!(values[0], Value::Null);

View File

@@ -17,7 +17,7 @@ use std::num::NonZeroUsize;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use common_telemetry::warn;
use common_telemetry::{debug, warn};
use index::inverted_index::create::sort::external_sort::ExternalSorter;
use index::inverted_index::create::sort_create::SortIndexCreator;
use index::inverted_index::create::InvertedIndexCreator;
@@ -34,7 +34,7 @@ use crate::error::{
PushIndexValueSnafu, Result,
};
use crate::read::Batch;
use crate::row_converter::{CompositeValues, SortField};
use crate::row_converter::SortField;
use crate::sst::file::FileId;
use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec};
use crate::sst::index::intermediate::{
@@ -71,8 +71,8 @@ pub struct InvertedIndexer {
/// The memory usage of the index creator.
memory_usage: Arc<AtomicUsize>,
/// Ids of indexed columns.
indexed_column_ids: HashSet<ColumnId>,
/// Ids of indexed columns and their names (`to_string` of the column id).
indexed_column_ids: Vec<(ColumnId, String)>,
}
impl InvertedIndexer {
@@ -105,6 +105,13 @@ impl InvertedIndexer {
metadata.primary_key_encoding,
metadata.primary_key_columns(),
);
let indexed_column_ids = indexed_column_ids
.into_iter()
.map(|col_id| {
let col_id_str = col_id.to_string();
(col_id, col_id_str)
})
.collect();
Self {
codec,
index_creator,
@@ -183,73 +190,61 @@ impl InvertedIndexer {
let n = batch.num_rows();
guard.inc_row_count(n);
// TODO(weny, zhenchi): lazy decode
if batch.pk_values().is_none() {
let values = self.codec.decode(batch.primary_key())?;
batch.set_pk_values(values);
}
for (col_id, col_id_str) in &self.indexed_column_ids {
match self.codec.pk_col_info(*col_id) {
// pk
Some(col_info) => {
let pk_idx = col_info.idx;
let field = &col_info.field;
let value = batch
.pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
.filter(|v| !v.is_null())
.map(|v| {
self.value_buf.clear();
IndexValueCodec::encode_nonnull_value(
v.as_value_ref(),
field,
&mut self.value_buf,
)?;
Ok(self.value_buf.as_slice())
})
.transpose()?;
// Safety: the primary key is decoded
let values = batch.pk_values().unwrap();
for (idx, (col_id, field)) in self.codec.fields().iter().enumerate() {
if !self.indexed_column_ids.contains(col_id) {
continue;
}
let value = match &values {
CompositeValues::Dense(vec) => {
let value = &vec[idx].1;
if value.is_null() {
None
} else {
Some(value)
}
self.index_creator
.push_with_name_n(col_id_str, value, n)
.await
.context(PushIndexValueSnafu)?;
}
CompositeValues::Sparse(sparse_values) => sparse_values.get(col_id),
};
if let Some(value) = value.as_ref() {
self.value_buf.clear();
IndexValueCodec::encode_nonnull_value(
value.as_value_ref(),
field,
&mut self.value_buf,
)?;
}
// Safety: the column id is guaranteed to be in the map
let col_id_str = self.codec.column_ids().get(col_id).unwrap();
// non-null value -> Some(encoded_bytes), null value -> None
let value = value.is_some().then_some(self.value_buf.as_slice());
self.index_creator
.push_with_name_n(col_id_str, value, n)
.await
.context(PushIndexValueSnafu)?;
}
for field in batch.fields() {
if !self.indexed_column_ids.contains(&field.column_id) {
continue;
}
let sort_field = SortField::new(field.data.data_type());
let col_id_str = field.column_id.to_string();
for i in 0..n {
self.value_buf.clear();
let value = field.data.get_ref(i);
if value.is_null() {
self.index_creator
.push_with_name(&col_id_str, None)
.await
.context(PushIndexValueSnafu)?;
} else {
IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut self.value_buf)?;
self.index_creator
.push_with_name(&col_id_str, Some(&self.value_buf))
.await
.context(PushIndexValueSnafu)?;
// fields
None => {
let Some(values) = batch.field_col_value(*col_id) else {
debug!(
"Column {} not found in the batch during building inverted index",
col_id
);
continue;
};
let sort_field = SortField::new(values.data.data_type());
for i in 0..n {
self.value_buf.clear();
let value = values.data.get_ref(i);
if value.is_null() {
self.index_creator
.push_with_name(col_id_str, None)
.await
.context(PushIndexValueSnafu)?;
} else {
IndexValueCodec::encode_nonnull_value(
value,
&sort_field,
&mut self.value_buf,
)?;
self.index_creator
.push_with_name(col_id_str, Some(&self.value_buf))
.await
.context(PushIndexValueSnafu)?;
}
}
}
}
}
@@ -313,7 +308,7 @@ impl InvertedIndexer {
}
pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + '_ {
self.indexed_column_ids.iter().copied()
self.indexed_column_ids.iter().map(|(col_id, _)| *col_id)
}
pub fn memory_usage(&self) -> usize {