Isolating sstable and stacker in independant crates. (#1718)

Both crate will be used in the new (optional + dynamic) fastfield work.
This commit is contained in:
Paul Masurel
2022-12-13 11:44:17 +09:00
committed by GitHub
parent 5d4535de83
commit 136a8f4124
24 changed files with 133 additions and 103 deletions

View File

@@ -53,10 +53,11 @@ lru = "0.7.5"
fastdivide = "0.4.0"
itertools = "0.10.3"
measure_time = "0.8.2"
ciborium = { version = "0.2", optional = true}
async-trait = "0.1.53"
arc-swap = "1.5.0"
sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optional = true }
stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" }
tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" }
tantivy-bitpacker = { version= "0.3", path="./bitpacker" }
common = { version= "0.4", path = "./common/", package = "tantivy-common" }
@@ -104,10 +105,10 @@ zstd-compression = ["zstd"]
failpoints = ["fail/failpoints"]
unstable = [] # useful for benches.
quickwit = ["ciborium"]
quickwit = ["sstable"]
[workspace]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes"]
members = ["query-grammar", "bitpacker", "common", "fastfield_codecs", "ownedbytes", "stacker", "sstable"]
# Following the "fail" crate best practises, we isolate
# tests that define specific behavior in fail check points

View File

@@ -1,11 +1,11 @@
use crate::postings::stacker::{MemoryArena, TermHashMap};
use stacker::{ArenaHashMap, MemoryArena};
/// IndexingContext contains all of the transient memory arenas
/// required for building the inverted index.
pub(crate) struct IndexingContext {
/// The term index is an adhoc hashmap,
/// itself backed by a dedicated memory arena.
pub term_index: TermHashMap,
pub term_index: ArenaHashMap,
/// Arena is a memory arena that stores posting lists / term frequencies / positions.
pub arena: MemoryArena,
}
@@ -13,9 +13,9 @@ pub(crate) struct IndexingContext {
impl IndexingContext {
/// Create a new IndexingContext given the size of the term hash map.
pub(crate) fn new(table_size: usize) -> IndexingContext {
let term_index = TermHashMap::new(table_size);
let term_index = ArenaHashMap::new(table_size);
IndexingContext {
arena: MemoryArena::new(),
arena: MemoryArena::default(),
term_index,
}
}

View File

@@ -1,10 +1,11 @@
use std::io;
use stacker::Addr;
use crate::fastfield::MultiValuedFastFieldWriter;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::postings_writer::SpecializedPostingsWriter;
use crate::postings::recorder::{BufferLender, DocIdRecorder, Recorder};
use crate::postings::stacker::Addr;
use crate::postings::{
FieldSerializer, IndexingContext, IndexingPosition, PostingsWriter, UnorderedTermId,
};

View File

@@ -15,9 +15,10 @@ mod recorder;
mod segment_postings;
mod serializer;
mod skip;
mod stacker;
mod term_info;
pub(crate) use stacker::compute_table_size;
pub use self::block_segment_postings::BlockSegmentPostings;
pub(crate) use self::indexing_context::IndexingContext;
pub(crate) use self::per_field_postings_writer::PerFieldPostingsWriter;
@@ -26,10 +27,9 @@ pub(crate) use self::postings_writer::{serialize_postings, IndexingPosition, Pos
pub use self::segment_postings::SegmentPostings;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub(crate) use self::skip::{BlockInfo, SkipReader};
pub(crate) use self::stacker::compute_table_size;
pub use self::term_info::TermInfo;
pub(crate) type UnorderedTermId = u64;
pub(crate) type UnorderedTermId = stacker::UnorderedId;
#[allow(clippy::enum_variant_names)]
#[derive(Debug, PartialEq, Clone, Copy, Eq)]

View File

@@ -4,8 +4,8 @@ use std::marker::PhantomData;
use std::ops::Range;
use rustc_hash::FxHashMap;
use stacker::Addr;
use super::stacker::Addr;
use crate::fastfield::MultiValuedFastFieldWriter;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::doc_id_mapping::DocIdMapping;
@@ -59,7 +59,11 @@ pub(crate) fn serialize_postings(
) -> crate::Result<HashMap<Field, FxHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(ctx.term_index.len());
term_offsets.extend(ctx.term_index.iter());
term_offsets.extend(
ctx.term_index
.iter()
.map(|(bytes, addr, unordered_id)| (Term::wrap(bytes), addr, unordered_id)),
);
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let mut unordered_term_mappings: HashMap<Field, FxHashMap<UnorderedTermId, TermOrdinal>> =
HashMap::new();

View File

@@ -1,6 +1,6 @@
use common::read_u32_vint;
use stacker::{ExpUnrolledLinkedList, MemoryArena};
use super::stacker::{ExpUnrolledLinkedList, MemoryArena};
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::FieldSerializer;
use crate::DocId;
@@ -91,7 +91,7 @@ pub struct DocIdRecorder {
impl Default for DocIdRecorder {
fn default() -> Self {
DocIdRecorder {
stack: ExpUnrolledLinkedList::new(),
stack: ExpUnrolledLinkedList::default(),
current_doc: u32::MAX,
}
}
@@ -144,7 +144,7 @@ impl Recorder for DocIdRecorder {
}
/// Recorder encoding document ids, and term frequencies
#[derive(Clone, Copy)]
#[derive(Clone, Copy, Default)]
pub struct TermFrequencyRecorder {
stack: ExpUnrolledLinkedList,
current_doc: DocId,
@@ -152,17 +152,6 @@ pub struct TermFrequencyRecorder {
term_doc_freq: u32,
}
impl Default for TermFrequencyRecorder {
fn default() -> Self {
TermFrequencyRecorder {
stack: ExpUnrolledLinkedList::new(),
current_doc: 0,
current_tf: 0u32,
term_doc_freq: 0u32,
}
}
}
impl Recorder for TermFrequencyRecorder {
fn current_doc(&self) -> DocId {
self.current_doc
@@ -229,7 +218,7 @@ pub struct TfAndPositionRecorder {
impl Default for TfAndPositionRecorder {
fn default() -> Self {
TfAndPositionRecorder {
stack: ExpUnrolledLinkedList::new(),
stack: ExpUnrolledLinkedList::default(),
current_doc: u32::MAX,
term_doc_freq: 0u32,
}

View File

@@ -1,7 +0,0 @@
mod expull;
mod memory_arena;
mod term_hashmap;
pub(crate) use self::expull::ExpUnrolledLinkedList;
pub(crate) use self::memory_arena::{Addr, MemoryArena};
pub(crate) use self::term_hashmap::{compute_table_size, TermHashMap};

View File

@@ -1,17 +1,16 @@
use std::io;
mod merger;
mod sstable;
mod streamer;
mod termdict;
use std::iter::ExactSizeIterator;
use common::VInt;
use sstable::value::{ValueReader, ValueWriter};
use sstable::{BlockReader, SSTable};
pub use self::merger::TermMerger;
use self::sstable::value::{ValueReader, ValueWriter};
use self::sstable::{BlockReader, SSTable};
pub use self::streamer::{TermStreamer, TermStreamerBuilder};
pub use self::termdict::{TermDictionary, TermDictionaryBuilder};
use crate::postings::TermInfo;

View File

@@ -87,7 +87,7 @@ where
{
automaton: A,
states: Vec<A::State>,
delta_reader: super::sstable::DeltaReader<'a, TermInfoReader>,
delta_reader: sstable::DeltaReader<'a, TermInfoReader>,
key: Vec<u8>,
term_ord: Option<TermOrdinal>,
lower_bound: Bound<Vec<u8>>,

View File

@@ -3,15 +3,12 @@ use std::sync::Arc;
use common::BinarySerializable;
use once_cell::sync::Lazy;
use sstable::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, Writer};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;
use crate::directory::{FileSlice, OwnedBytes};
use crate::postings::TermInfo;
use crate::termdict::sstable_termdict::sstable::sstable_index::BlockAddr;
use crate::termdict::sstable_termdict::sstable::{
DeltaReader, Reader, SSTable, SSTableIndex, Writer,
};
use crate::termdict::sstable_termdict::{
TermInfoReader, TermInfoWriter, TermSSTable, TermStreamer, TermStreamerBuilder,
};
@@ -132,7 +129,8 @@ impl TermDictionary {
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice())?;
let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice())
.map_err(|_| crate::error::DataCorruption::comment_only("SSTable corruption"))?;
Ok(TermDictionary {
sstable_slice,
sstable_index,

13
sstable/Cargo.toml Normal file
View File

@@ -0,0 +1,13 @@
[package]
name = "tantivy-sstable"
version = "0.1.0"
edition = "2021"
[dependencies]
common = {path="../common", package="tantivy-common"}
ciborium = "0.2"
byteorder = "1"
serde = "1"
[dev-dependencies]
proptest = "1"

View File

@@ -7,21 +7,19 @@ mod delta;
pub mod merge;
pub mod value;
pub(crate) mod sstable_index;
pub(crate) use self::sstable_index::{SSTableIndex, SSTableIndexBuilder};
mod sstable_index;
pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
pub(crate) mod vint;
mod block_reader;
pub use self::block_reader::BlockReader;
pub use self::delta::DeltaReader;
use self::delta::DeltaWriter;
pub use self::delta::{DeltaReader, DeltaWriter};
pub use self::merge::VoidMerge;
use self::value::{U64MonotonicReader, U64MonotonicWriter, ValueReader, ValueWriter};
const DEFAULT_KEY_CAPACITY: usize = 50;
pub(crate) fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
left.iter()
.cloned()
.zip(right.iter().cloned())
@@ -29,6 +27,9 @@ pub(crate) fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
.count()
}
#[derive(Debug, Copy, Clone)]
pub struct SSTableDataCorruption;
pub trait SSTable: Sized {
type Value;
type Reader: ValueReader<Value = Self::Value>;

View File

@@ -4,7 +4,7 @@ use std::collections::BinaryHeap;
use std::io;
use super::{SingleValueMerger, ValueMerger};
use crate::termdict::sstable_termdict::sstable::{Reader, SSTable, Writer};
use crate::{Reader, SSTable, Writer};
struct HeapItem<B: AsRef<[u8]>>(B);

View File

@@ -3,8 +3,7 @@ use std::ops::Range;
use serde::{Deserialize, Serialize};
use crate::error::DataCorruption;
use crate::termdict::sstable_termdict::sstable::common_prefix_len;
use crate::{common_prefix_len, SSTableDataCorruption};
#[derive(Default, Debug, Serialize, Deserialize)]
pub struct SSTableIndex {
@@ -12,9 +11,8 @@ pub struct SSTableIndex {
}
impl SSTableIndex {
pub(crate) fn load(data: &[u8]) -> Result<SSTableIndex, DataCorruption> {
ciborium::de::from_reader(data)
.map_err(|_| DataCorruption::comment_only("SSTable index is corrupted"))
pub fn load(data: &[u8]) -> Result<SSTableIndex, SSTableDataCorruption> {
ciborium::de::from_reader(data).map_err(|_| SSTableDataCorruption)
}
pub fn search(&self, key: &[u8]) -> Option<BlockAddr> {
@@ -94,6 +92,7 @@ impl SSTableIndexBuilder {
#[cfg(test)]
mod tests {
use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
use crate::SSTableDataCorruption;
#[test]
fn test_sstable_index() {
@@ -125,10 +124,7 @@ mod tests {
sstable_builder.serialize(&mut buffer).unwrap();
buffer[1] = 9u8;
let data_corruption_err = SSTableIndex::load(&buffer[..]).err().unwrap();
assert_eq!(
format!("{data_corruption_err:?}"),
"Data corruption: SSTable index is corrupted."
);
assert!(matches!(data_corruption_err, SSTableDataCorruption));
}
#[track_caller]

9
stacker/Cargo.toml Normal file
View File

@@ -0,0 +1,9 @@
[package]
name = "tantivy-stacker"
version = "0.1.0"
edition = "2021"
[dependencies]
murmurhash32 = "0.2"
byteorder = "1"
common = { version = "0.4", path = "../common/", package = "tantivy-common" }

View File

@@ -4,14 +4,13 @@ use byteorder::{ByteOrder, NativeEndian};
use murmurhash32::murmurhash2;
use super::{Addr, MemoryArena};
use crate::postings::stacker::memory_arena::store;
use crate::postings::UnorderedTermId;
use crate::Term;
use crate::memory_arena::store;
use crate::UnorderedId;
/// Returns the actual memory size in bytes
/// required to create a table with a given capacity.
/// required to create a table of size
pub(crate) fn compute_table_size(capacity: usize) -> usize {
pub fn compute_table_size(capacity: usize) -> usize {
capacity * mem::size_of::<KeyValue>()
}
@@ -22,7 +21,7 @@ pub(crate) fn compute_table_size(capacity: usize) -> usize {
struct KeyValue {
key_value_addr: Addr,
hash: u32,
unordered_term_id: UnorderedTermId,
unordered_id: UnorderedId,
}
impl Default for KeyValue {
@@ -30,7 +29,7 @@ impl Default for KeyValue {
KeyValue {
key_value_addr: Addr::null_pointer(),
hash: 0u32,
unordered_term_id: UnorderedTermId::default(),
unordered_id: UnorderedId::default(),
}
}
}
@@ -41,15 +40,16 @@ impl KeyValue {
}
}
/// Customized `HashMap` with string keys
/// Customized `HashMap` with `&[u8]` keys
///
/// This `HashMap` takes String as keys. Keys are
/// stored in a user defined memory arena.
/// Its main particularity is that rather than storing its
/// keys in the heap, keys are stored in a memory arena
/// inline with the values.
///
/// The quirky API has the benefit of avoiding
/// the computation of the hash of the key twice,
/// or copying the key as long as there is no insert.
pub struct TermHashMap {
pub struct ArenaHashMap {
table: Box<[KeyValue]>,
memory_arena: MemoryArena,
mask: usize,
@@ -64,6 +64,7 @@ struct QuadraticProbing {
}
impl QuadraticProbing {
#[inline]
fn compute(hash: usize, mask: usize) -> QuadraticProbing {
QuadraticProbing { hash, i: 0, mask }
}
@@ -76,18 +77,18 @@ impl QuadraticProbing {
}
pub struct Iter<'a> {
hashmap: &'a TermHashMap,
hashmap: &'a ArenaHashMap,
inner: slice::Iter<'a, usize>,
}
impl<'a> Iterator for Iter<'a> {
type Item = (Term<&'a [u8]>, Addr, UnorderedTermId);
type Item = (&'a [u8], Addr, UnorderedId);
fn next(&mut self) -> Option<Self::Item> {
self.inner.next().cloned().map(move |bucket: usize| {
let kv = self.hashmap.table[bucket];
let (key, offset): (&'a [u8], Addr) = self.hashmap.get_key_value(kv.key_value_addr);
(Term::wrap(key), offset, kv.unordered_term_id)
(key, offset, kv.unordered_id)
})
}
}
@@ -102,15 +103,15 @@ fn compute_previous_power_of_two(n: usize) -> usize {
1 << msb
}
impl TermHashMap {
pub(crate) fn new(table_size: usize) -> TermHashMap {
impl ArenaHashMap {
pub fn new(table_size: usize) -> ArenaHashMap {
assert!(table_size > 0);
let table_size_power_of_2 = compute_previous_power_of_two(table_size);
let memory_arena = MemoryArena::new();
let memory_arena = MemoryArena::default();
let table: Vec<KeyValue> = iter::repeat(KeyValue::default())
.take(table_size_power_of_2)
.collect();
TermHashMap {
ArenaHashMap {
table: table.into_boxed_slice(),
memory_arena,
mask: table_size_power_of_2 - 1,
@@ -119,18 +120,22 @@ impl TermHashMap {
}
}
#[inline]
pub fn read<Item: Copy + 'static>(&self, addr: Addr) -> Item {
self.memory_arena.read(addr)
}
#[inline]
fn probe(&self, hash: u32) -> QuadraticProbing {
QuadraticProbing::compute(hash as usize, self.mask)
}
#[inline]
pub fn mem_usage(&self) -> usize {
self.table.len() * mem::size_of::<KeyValue>()
}
#[inline]
fn is_saturated(&self) -> bool {
self.table.len() < self.occupied.len() * 3
}
@@ -153,22 +158,30 @@ impl TermHashMap {
}
}
fn set_bucket(&mut self, hash: u32, key_value_addr: Addr, bucket: usize) -> UnorderedTermId {
#[inline]
fn set_bucket(&mut self, hash: u32, key_value_addr: Addr, bucket: usize) -> UnorderedId {
self.occupied.push(bucket);
let unordered_term_id = self.len as UnorderedTermId;
let unordered_id = self.len as UnorderedId;
self.len += 1;
self.table[bucket] = KeyValue {
key_value_addr,
hash,
unordered_term_id,
unordered_id,
};
unordered_term_id
unordered_id
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn len(&self) -> usize {
self.len
}
#[inline]
pub fn iter(&self) -> Iter<'_> {
Iter {
inner: self.occupied.iter(),
@@ -210,7 +223,7 @@ impl TermHashMap {
&mut self,
key: &[u8],
mut updater: TMutator,
) -> UnorderedTermId
) -> UnorderedId
where
V: Copy + 'static,
TMutator: FnMut(Option<V>) -> V,
@@ -241,7 +254,7 @@ impl TermHashMap {
let v = self.memory_arena.read(val_addr);
let new_v = updater(Some(v));
self.memory_arena.write_at(val_addr, new_v);
return kv.unordered_term_id;
return kv.unordered_id;
}
}
}
@@ -253,11 +266,11 @@ mod tests {
use std::collections::HashMap;
use super::{compute_previous_power_of_two, TermHashMap};
use super::{compute_previous_power_of_two, ArenaHashMap};
#[test]
fn test_hash_map() {
let mut hash_map: TermHashMap = TermHashMap::new(1 << 18);
let mut hash_map: ArenaHashMap = ArenaHashMap::new(1 << 18);
hash_map.mutate_or_create(b"abc", |opt_val: Option<u32>| {
assert_eq!(opt_val, None);
3u32

View File

@@ -2,8 +2,8 @@ use std::mem;
use common::serialize_vint_u32;
use super::{Addr, MemoryArena};
use crate::postings::stacker::memory_arena::{load, store};
use crate::memory_arena::{load, store};
use crate::{Addr, MemoryArena};
const MAX_BLOCK_LEN: u32 = 1u32 << 15;
const FIRST_BLOCK: usize = 16;
@@ -120,15 +120,17 @@ impl<'a> ExpUnrolledLinkedListWriter<'a> {
}
}
impl ExpUnrolledLinkedList {
pub fn new() -> ExpUnrolledLinkedList {
impl Default for ExpUnrolledLinkedList {
fn default() -> ExpUnrolledLinkedList {
ExpUnrolledLinkedList {
len: 0u32,
tail: Addr::null_pointer(),
inlined_data: [0u8; INLINED_BLOCK_LEN as usize],
}
}
}
impl ExpUnrolledLinkedList {
#[inline]
pub fn writer<'a>(&'a mut self, arena: &'a mut MemoryArena) -> ExpUnrolledLinkedListWriter<'a> {
ExpUnrolledLinkedListWriter { eull: self, arena }
@@ -169,8 +171,8 @@ mod tests {
#[test]
fn test_eull() {
let mut arena = MemoryArena::new();
let mut stack = ExpUnrolledLinkedList::new();
let mut arena = MemoryArena::default();
let mut stack = ExpUnrolledLinkedList::default();
stack.writer(&mut arena).extend_from_slice(&[1u8]);
stack.writer(&mut arena).extend_from_slice(&[2u8]);
stack.writer(&mut arena).extend_from_slice(&[3u8, 4u8]);
@@ -184,8 +186,8 @@ mod tests {
#[test]
fn test_eull_long() {
let mut arena = MemoryArena::new();
let mut eull = ExpUnrolledLinkedList::new();
let mut arena = MemoryArena::default();
let mut eull = ExpUnrolledLinkedList::default();
let data: Vec<u32> = (0..100).collect();
for &el in &data {
eull.writer(&mut arena).write_u32_vint(el);
@@ -202,9 +204,9 @@ mod tests {
#[test]
fn test_eull_interlaced() {
let mut eull = MemoryArena::new();
let mut stack = ExpUnrolledLinkedList::new();
let mut stack2 = ExpUnrolledLinkedList::new();
let mut eull = MemoryArena::default();
let mut stack = ExpUnrolledLinkedList::default();
let mut stack2 = ExpUnrolledLinkedList::default();
let mut vec1: Vec<u8> = vec![];
let mut vec2: Vec<u8> = vec![];
@@ -306,9 +308,9 @@ mod bench {
#[bench]
fn bench_push_stack(bench: &mut Bencher) {
bench.iter(|| {
let mut arena = MemoryArena::new();
let mut arena = MemoryArena::default();
let mut stacks: Vec<ExpUnrolledLinkedList> =
iter::repeat_with(ExpUnrolledLinkedList::new)
iter::repeat_with(ExpUnrolledLinkedList::default)
.take(NUM_STACK)
.collect();
for s in 0..NUM_STACK {

10
stacker/src/lib.rs Normal file
View File

@@ -0,0 +1,10 @@
mod arena_hashmap;
mod expull;
mod memory_arena;
pub use self::arena_hashmap::{compute_table_size, ArenaHashMap};
pub use self::expull::ExpUnrolledLinkedList;
pub use self::memory_arena::{Addr, MemoryArena};
/// When adding an element in a `ArenaHashMap`, we get a unique id associated to the given key.
pub type UnorderedId = u64;

View File

@@ -86,15 +86,16 @@ pub struct MemoryArena {
pages: Vec<Page>,
}
impl MemoryArena {
/// Creates a new memory arena.
pub fn new() -> MemoryArena {
impl Default for MemoryArena {
fn default() -> MemoryArena {
let first_page = Page::new(0);
MemoryArena {
pages: vec![first_page],
}
}
}
impl MemoryArena {
fn add_page(&mut self) -> &mut Page {
let new_page_id = self.pages.len();
self.pages.push(Page::new(new_page_id));
@@ -197,7 +198,7 @@ mod tests {
#[test]
fn test_arena_allocate_slice() {
let mut arena = MemoryArena::new();
let mut arena = MemoryArena::default();
let a = b"hello";
let b = b"happy tax payer";
@@ -220,7 +221,7 @@ mod tests {
#[test]
fn test_store_object() {
let mut arena = MemoryArena::new();
let mut arena = MemoryArena::default();
let a = MyTest {
a: 143,
b: 21,