From d7a8053cc255174fff0920d496419886c436660b Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Fri, 20 Jan 2023 18:17:46 +0900 Subject: [PATCH] Introduced a select cursor. --- .../src/column_index/optional_index/mod.rs | 110 ++++++++++++------ .../src/column_index/optional_index/set.rs | 21 ++-- .../optional_index/set_block/dense.rs | 51 ++++---- .../optional_index/set_block/sparse.rs | 34 +++--- .../optional_index/set_block/tests.rs | 32 +++-- .../src/column_index/optional_index/tests.rs | 26 ++--- 6 files changed, 153 insertions(+), 121 deletions(-) diff --git a/columnar/src/column_index/optional_index/mod.rs b/columnar/src/column_index/optional_index/mod.rs index 615e2a837..5a45335c4 100644 --- a/columnar/src/column_index/optional_index/mod.rs +++ b/columnar/src/column_index/optional_index/mod.rs @@ -5,8 +5,8 @@ use std::sync::Arc; mod set; mod set_block; -use common::{BinarySerializable, GroupByIteratorExtended, OwnedBytes, VInt}; -pub use set::{Set, SetCodec}; +use common::{BinarySerializable, OwnedBytes, VInt}; +pub use set::{Set, SetCodec, SelectCursor}; use set_block::{ DenseBlock, DenseBlockCodec, SparseBlock, SparseBlockCodec, DENSE_BLOCK_NUM_BYTES, }; @@ -115,7 +115,59 @@ fn row_addr_from_row_id(row_id: RowId) -> RowAddr { } } +enum BlockSelectCursor<'a> { + Dense( as Set>::SelectCursor<'a>), + Sparse( as Set>::SelectCursor<'a>), +} + +impl<'a> BlockSelectCursor<'a> { + fn select(&mut self, rank: u16) -> u16 { + match self { + BlockSelectCursor::Dense(dense_select_cursor) => dense_select_cursor.select(rank), + BlockSelectCursor::Sparse(sparse_select_cursor) => sparse_select_cursor.select(rank), + } + } + +} +pub struct OptionalIndexSelectCursor<'a> { + current_block_cursor: BlockSelectCursor<'a>, + current_block_id: u16, + // The current block is guaranteed to contain ranks < end_rank. + current_block_end_rank: RowId, + optional_index: &'a OptionalIndex, + block_doc_idx_start: RowId, + num_null_rows_before_block: RowId, +} + +impl<'a> OptionalIndexSelectCursor<'a> { + fn search_and_load_block(&mut self, rank: RowId) { + if rank < self.current_block_end_rank { + // we are already in the right block + return; + } + self.current_block_id = self.optional_index.find_block(rank, self.current_block_id); + self.current_block_end_rank = self.optional_index.block_metas.get(self.current_block_id as usize + 1).map(|block_meta| block_meta.non_null_rows_before_block).unwrap_or(u32::MAX); + self.block_doc_idx_start = (self.current_block_id as u32) * ELEMENTS_PER_BLOCK; + let block_meta = self.optional_index.block_metas[self.current_block_id as usize]; + self.num_null_rows_before_block = block_meta.non_null_rows_before_block; + let block: Block<'_> = self.optional_index.block(block_meta); + self.current_block_cursor = match block { + Block::Dense(dense_block) => BlockSelectCursor::Dense(dense_block.select_cursor()), + Block::Sparse(sparse_block) => BlockSelectCursor::Sparse(sparse_block.select_cursor()), + }; + } +} + +impl<'a> SelectCursor for OptionalIndexSelectCursor<'a> { + fn select(&mut self, rank: RowId) -> RowId { + self.search_and_load_block(rank); + let index_in_block = (rank - self.num_null_rows_before_block) as u16; + self.current_block_cursor.select(index_in_block) as RowId + self.block_doc_idx_start + } +} + impl Set for OptionalIndex { + type SelectCursor<'b> = OptionalIndexSelectCursor<'b> where Self: 'b; // Check if value at position is not null. #[inline] fn contains(&self, row_id: RowId) -> bool { @@ -148,7 +200,7 @@ impl Set for OptionalIndex { #[inline] fn select(&self, rank: RowId) -> RowId { let block_pos = self.find_block(rank, 0); - let block_doc_idx_start = block_pos * ELEMENTS_PER_BLOCK; + let block_doc_idx_start = (block_pos as u32) * ELEMENTS_PER_BLOCK; let block_meta = self.block_metas[block_pos as usize]; let block: Block<'_> = self.block(block_meta); let index_in_block = (rank - block_meta.non_null_rows_before_block) as u16; @@ -159,39 +211,27 @@ impl Set for OptionalIndex { block_doc_idx_start + in_block_rank as u32 } - fn select_batch(&self, ranks: &[u32], output_idxs: &mut [u32]) { - let mut block_pos = 0u32; - let mut start = 0; - let group_by_it = ranks.iter().copied().group_by(move |codec_idx| { - block_pos = self.find_block(*codec_idx, block_pos); - block_pos - }); - for (block_pos, block_iter) in group_by_it { - let block_doc_idx_start = block_pos * ELEMENTS_PER_BLOCK; - let block_meta = self.block_metas[block_pos as usize]; - let block: Block<'_> = self.block(block_meta); - let offset = block_meta.non_null_rows_before_block; - let indexes_in_block_iter = - block_iter.map(move |codec_idx| (codec_idx - offset) as u16); - match block { - Block::Dense(dense_block) => { - for in_offset in dense_block.select_iter(indexes_in_block_iter) { - output_idxs[start] = in_offset as u32 + block_doc_idx_start; - start += 1; - } - } - Block::Sparse(sparse_block) => { - for in_offset in sparse_block.select_iter(indexes_in_block_iter) { - output_idxs[start] = in_offset as u32 + block_doc_idx_start; - start += 1; - } - } - }; + fn select_cursor<'b>(&'b self) -> OptionalIndexSelectCursor<'b> { + OptionalIndexSelectCursor { + current_block_cursor: BlockSelectCursor::Sparse(SparseBlockCodec::open(b"").select_cursor()), + current_block_id: 0u16, + current_block_end_rank: 0u32, //< this is sufficient to force the first load + optional_index: self, + block_doc_idx_start: 0u32, + num_null_rows_before_block: 0u32, } } } impl OptionalIndex { + + pub fn select_batch(&self, ranks: &mut [RowId]) { + let mut select_cursor = self.select_cursor(); + for rank in ranks.iter_mut() { + *rank = select_cursor.select(*rank); + } + } + #[inline] fn block<'a>(&'a self, block_meta: BlockMeta) -> Block<'a> { let BlockMeta { @@ -214,14 +254,14 @@ impl OptionalIndex { } #[inline] - fn find_block(&self, dense_idx: u32, start_block_pos: u32) -> u32 { - for block_pos in start_block_pos..self.block_metas.len() as u32 { + fn find_block(&self, dense_idx: u32, start_block_pos: u16) -> u16 { + for block_pos in start_block_pos..self.block_metas.len() as u16 { let offset = self.block_metas[block_pos as usize].non_null_rows_before_block; if offset > dense_idx { - return block_pos - 1; + return block_pos - 1u16; } } - self.block_metas.len() as u32 - 1u32 + self.block_metas.len() as u16 - 1u16 } // TODO Add a good API for the codec_idx to original_idx translation. diff --git a/columnar/src/column_index/optional_index/set.rs b/columnar/src/column_index/optional_index/set.rs index f447b60d9..0db2a502d 100644 --- a/columnar/src/column_index/optional_index/set.rs +++ b/columnar/src/column_index/optional_index/set.rs @@ -13,7 +13,19 @@ pub trait SetCodec { fn open<'a>(data: &'a [u8]) -> Self::Reader<'a>; } + +/// Stateful object that makes it possible to compute several select in a row, +/// provided the rank passed as argument are increasing. +pub trait SelectCursor { + // May panic if rank is greater than the number of elements in the Set, + // or if rank is < than value provided in the previous call. + fn select(&mut self, rank: T) -> T; +} + pub trait Set { + type SelectCursor<'b>: SelectCursor where Self: 'b; + + /// Returns true if the elements is contained in the Set fn contains(&self, el: T) -> bool; @@ -28,11 +40,6 @@ pub trait Set { /// May panic if rank is greater than the number of elements in the Set. fn select(&self, rank: T) -> T; - /// Batch version of select. - /// `ranks` is assumed to be sorted. - /// - /// # Panics - /// - /// May panic if rank is greater than the number of elements in the Set. - fn select_batch(&self, ranks: &[T], outputs: &mut [T]); + /// Creates a brand new select cursor. + fn select_cursor<'b>(&'b self,) -> Self::SelectCursor<'b>; } diff --git a/columnar/src/column_index/optional_index/set_block/dense.rs b/columnar/src/column_index/optional_index/set_block/dense.rs index b01ea1f07..813c2cfb9 100644 --- a/columnar/src/column_index/optional_index/set_block/dense.rs +++ b/columnar/src/column_index/optional_index/set_block/dense.rs @@ -3,7 +3,7 @@ use std::io::{self, Write}; use common::BinarySerializable; -use crate::column_index::optional_index::{Set, SetCodec, ELEMENTS_PER_BLOCK}; +use crate::column_index::optional_index::{Set, SetCodec, SelectCursor, ELEMENTS_PER_BLOCK}; #[inline(always)] fn get_bit_at(input: u64, n: u16) -> bool { @@ -105,7 +105,24 @@ impl DenseMiniBlock { #[derive(Copy, Clone)] pub struct DenseBlock<'a>(&'a [u8]); +pub struct DenseBlockSelectCursor<'a> { + block_id: u16, + dense_block: DenseBlock<'a>, +} + +impl<'a> SelectCursor for DenseBlockSelectCursor<'a> { + #[inline] + fn select(&mut self, rank: u16) -> u16 { + self.block_id = self.dense_block.find_miniblock_containing_rank(rank, self.block_id).unwrap(); + let index_block = self.dense_block.mini_block(self.block_id); + let in_block_rank = rank - index_block.rank; + self.block_id * ELEMENTS_PER_MINI_BLOCK + select_u64(index_block.bitvec, in_block_rank) + } +} + impl<'a> Set for DenseBlock<'a> { + type SelectCursor<'b> = DenseBlockSelectCursor<'a> where Self: 'b; + #[inline(always)] fn contains(&self, el: u16) -> bool { let mini_block_id = el / ELEMENTS_PER_MINI_BLOCK; @@ -136,37 +153,15 @@ impl<'a> Set for DenseBlock<'a> { block_id * ELEMENTS_PER_MINI_BLOCK + select_u64(index_block.bitvec, in_block_rank) } - fn select_batch(&self, ranks: &[u16], outputs: &mut [u16]) { - let orig_ids = self.select_iter(ranks.iter().copied()); - for (output, original_id) in outputs.iter_mut().zip(orig_ids) { - *output = original_id; + #[inline(always)] + fn select_cursor<'b>(&'b self,) -> Self::SelectCursor<'b> { + DenseBlockSelectCursor { + block_id: 0, + dense_block: *self, } } } -impl<'a> DenseBlock<'a> { - /// Iterator verison of select. - /// - /// # Panics - /// Panics if one of the rank is higher than the number of elements in the set. - pub fn select_iter<'b>( - &self, - rank_it: impl Iterator + 'b, - ) -> impl Iterator + 'b - where - Self: 'b, - { - let mut block_id = 0u16; - let me = *self; - rank_it.map(move |rank| { - block_id = me.find_miniblock_containing_rank(rank, block_id).unwrap(); - let index_block = me.mini_block(block_id); - let in_block_rank = rank - index_block.rank; - block_id * ELEMENTS_PER_MINI_BLOCK + select_u64(index_block.bitvec, in_block_rank) - }) - } -} - impl<'a> DenseBlock<'a> { #[inline] fn mini_block(&self, mini_block_id: u16) -> DenseMiniBlock { diff --git a/columnar/src/column_index/optional_index/set_block/sparse.rs b/columnar/src/column_index/optional_index/set_block/sparse.rs index 486dc70a5..460064395 100644 --- a/columnar/src/column_index/optional_index/set_block/sparse.rs +++ b/columnar/src/column_index/optional_index/set_block/sparse.rs @@ -1,4 +1,4 @@ -use crate::column_index::optional_index::{Set, SetCodec}; +use crate::column_index::optional_index::{Set, SetCodec, SelectCursor}; pub struct SparseBlockCodec; @@ -24,7 +24,17 @@ impl SetCodec for SparseBlockCodec { #[derive(Copy, Clone)] pub struct SparseBlock<'a>(&'a [u8]); +impl<'a> SelectCursor for SparseBlock<'a> { + #[inline] + fn select(&mut self, rank: u16) -> u16 { + as Set>::select(self, rank) + } +} + impl<'a> Set for SparseBlock<'a> { + + type SelectCursor<'b> = Self where Self: 'b; + #[inline(always)] fn contains(&self, el: u16) -> bool { self.binary_search(el).is_ok() @@ -41,12 +51,11 @@ impl<'a> Set for SparseBlock<'a> { u16::from_le_bytes(self.0[offset..offset + 2].try_into().unwrap()) } - fn select_batch(&self, ranks: &[u16], outputs: &mut [u16]) { - let orig_ids = self.select_iter(ranks.iter().copied()); - for (output, original_id) in outputs.iter_mut().zip(orig_ids) { - *output = original_id; - } + #[inline(always)] + fn select_cursor<'b>(&'b self,) -> Self::SelectCursor<'b> { + *self } + } #[inline(always)] @@ -96,17 +105,4 @@ impl<'a> SparseBlock<'a> { } Err(left) } - - pub fn select_iter<'b>( - &self, - iter: impl Iterator + 'b, - ) -> impl Iterator + 'b - where - Self: 'b, - { - iter.map(|codec_id| { - let offset = codec_id as usize * 2; - u16::from_le_bytes(self.0[offset..offset + 2].try_into().unwrap()) - }) - } } diff --git a/columnar/src/column_index/optional_index/set_block/tests.rs b/columnar/src/column_index/optional_index/set_block/tests.rs index 55e36b73a..1138483c3 100644 --- a/columnar/src/column_index/optional_index/set_block/tests.rs +++ b/columnar/src/column_index/optional_index/set_block/tests.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use crate::column_index::optional_index::set_block::dense::DENSE_BLOCK_NUM_BYTES; use crate::column_index::optional_index::set_block::{DenseBlockCodec, SparseBlockCodec}; -use crate::column_index::optional_index::{Set, SetCodec}; +use crate::column_index::optional_index::{Set, SetCodec, SelectCursor}; fn test_set_helper>(vals: &[u16]) -> usize { let mut buffer = Vec::new(); @@ -74,12 +74,10 @@ fn test_simple_translate_codec_codec_idx_to_original_idx_dense() { .unwrap(); let tested_set = DenseBlockCodec::open(buffer.as_slice()); assert!(tested_set.contains(1)); - assert_eq!( - &tested_set - .select_iter([0, 1, 2, 5].iter().copied()) - .collect::>(), - &[1, 3, 17, 30_001] - ); + let mut select_cursor = tested_set.select_cursor(); + assert_eq!(select_cursor.select(0), 1); + assert_eq!(select_cursor.select(1), 3); + assert_eq!(select_cursor.select(2), 17); } #[test] @@ -88,12 +86,10 @@ fn test_simple_translate_codec_idx_to_original_idx_sparse() { SparseBlockCodec::serialize([1, 3, 17].iter().copied(), &mut buffer).unwrap(); let tested_set = SparseBlockCodec::open(buffer.as_slice()); assert!(tested_set.contains(1)); - assert_eq!( - &tested_set - .select_iter([0, 1, 2].iter().copied()) - .collect::>(), - &[1, 3, 17] - ); + let mut select_cursor = tested_set.select_cursor(); + assert_eq!(SelectCursor::select(&mut select_cursor, 0), 1); + assert_eq!(SelectCursor::select(&mut select_cursor, 1), 3); + assert_eq!(SelectCursor::select(&mut select_cursor, 2), 17); } #[test] @@ -102,10 +98,8 @@ fn test_simple_translate_codec_idx_to_original_idx_dense() { DenseBlockCodec::serialize(0u16..150u16, &mut buffer).unwrap(); let tested_set = DenseBlockCodec::open(buffer.as_slice()); assert!(tested_set.contains(1)); - let rg = 0u16..150u16; - let els: Vec = rg.clone().collect(); - assert_eq!( - &tested_set.select_iter(rg.clone()).collect::>(), - &els - ); + let mut select_cursor = tested_set.select_cursor(); + for i in 0..150 { + assert_eq!(i, select_cursor.select(i)); + } } diff --git a/columnar/src/column_index/optional_index/tests.rs b/columnar/src/column_index/optional_index/tests.rs index e34700b4d..e7330f5ee 100644 --- a/columnar/src/column_index/optional_index/tests.rs +++ b/columnar/src/column_index/optional_index/tests.rs @@ -41,9 +41,10 @@ fn test_with_random_sets_simple() { let null_index = open_optional_index(OwnedBytes::new(out)).unwrap(); let ranks: Vec = (65_472u32..65_473u32).collect(); let els: Vec = ranks.iter().copied().map(|rank| rank + 10).collect(); - let mut output = vec![0u32; ranks.len()]; - null_index.select_batch(&ranks[..], &mut output[..]); - assert_eq!(&output, &els); + let mut select_cursor = null_index.select_cursor(); + for (rank, el) in ranks.iter().copied().zip(els.iter().copied()) { + assert_eq!(select_cursor.select(rank), el); + } } #[test] @@ -91,11 +92,10 @@ fn test_null_index(data: &[bool]) { .filter(|(_pos, val)| **val) .map(|(pos, _val)| pos as u32) .collect(); - let ids: Vec = (0..orig_idx_with_value.len() as u32).collect(); - let mut output = vec![0u32; ids.len()]; - null_index.select_batch(&ids[..], &mut output); - // assert_eq!(&output[0..100], &orig_idx_with_value[0..100]); - assert_eq!(output, orig_idx_with_value); + let mut select_iter = null_index.select_cursor(); + for i in 0..orig_idx_with_value.len() { + assert_eq!(select_iter.select(i as u32), orig_idx_with_value[i]); + } let step_size = (orig_idx_with_value.len() / 100).max(1); for (dense_idx, orig_idx) in orig_idx_with_value.iter().enumerate().step_by(step_size) { @@ -115,9 +115,9 @@ fn test_optional_index_test_translation() { let iter = &[true, false, true, false]; serialize_optional_index(&&iter[..], &mut out).unwrap(); let null_index = open_optional_index(OwnedBytes::new(out)).unwrap(); - let mut output = vec![0u32; 2]; - null_index.select_batch(&[0, 1], &mut output); - assert_eq!(output, &[0, 2]); + let mut select_cursor = null_index.select_cursor(); + assert_eq!(select_cursor.select(0), 0); + assert_eq!(select_cursor.select(1), 2); } #[test] @@ -175,7 +175,6 @@ mod bench { .map(|_| rng.gen_bool(fill_ratio)) .collect(); serialize_optional_index(&&vals[..], &mut out).unwrap(); - let codec = open_optional_index(OwnedBytes::new(out)).unwrap(); codec } @@ -311,7 +310,8 @@ mod bench { }; let mut output = vec![0u32; idxs.len()]; bench.iter(|| { - codec.select_batch(&idxs[..], &mut output); + output.copy_from_slice(&idxs[..]); + codec.select_batch(&mut output); }); }