From 205e8a0a92237cd7a6872aa652429a24927daa01 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 12 Apr 2023 09:43:01 +0200 Subject: [PATCH] encode dictionary type in fst footer (#1968) * encode additional footer for dictionary kind in fst --- columnar/src/tests.rs | 4 +- common/src/dictionary_footer.rs | 63 ---------- common/src/lib.rs | 2 - src/fastfield/mod.rs | 16 +-- src/termdict/fst_termdict/merger.rs | 3 +- src/termdict/fst_termdict/termdict.rs | 16 ++- src/termdict/mod.rs | 159 +++++++++++++++++++++++++- src/termdict/sstable_termdict/mod.rs | 2 + sstable/README.md | 7 +- sstable/src/dictionary.rs | 18 ++- sstable/src/lib.rs | 10 +- 11 files changed, 206 insertions(+), 94 deletions(-) delete mode 100644 common/src/dictionary_footer.rs diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index af7052caf..117ebc663 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -26,7 +26,7 @@ fn test_dataframe_writer_str() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("my_string").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 89); + assert_eq!(cols[0].num_bytes(), 85); } #[test] @@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("my_string").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 89); + assert_eq!(cols[0].num_bytes(), 85); } #[test] diff --git a/common/src/dictionary_footer.rs b/common/src/dictionary_footer.rs deleted file mode 100644 index 2e3f83f1f..000000000 --- a/common/src/dictionary_footer.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::io::{self, Read, Write}; - -use crate::BinarySerializable; - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -#[repr(u32)] -pub enum DictionaryKind { - Fst = 1, - SSTable = 2, -} - -#[derive(Debug, Clone, PartialEq)] -pub struct DictionaryFooter { - pub kind: DictionaryKind, - pub version: u32, -} - -impl DictionaryFooter { - pub fn verify_equal(&self, other: &DictionaryFooter) -> io::Result<()> { - if self.kind != other.kind { - return Err(io::Error::new( - io::ErrorKind::Other, - format!( - "Invalid dictionary type, expected {:?}, found {:?}", - self.kind, other.kind - ), - )); - } - if self.version != other.version { - return Err(io::Error::new( - io::ErrorKind::Other, - format!( - "Unsuported dictionary version, expected {}, found {}", - self.version, other.version - ), - )); - } - Ok(()) - } -} - -impl BinarySerializable for DictionaryFooter { - fn serialize(&self, writer: &mut W) -> io::Result<()> { - self.version.serialize(writer)?; - (self.kind as u32).serialize(writer) - } - fn deserialize(reader: &mut R) -> io::Result { - let version = u32::deserialize(reader)?; - let kind = u32::deserialize(reader)?; - let kind = match kind { - 1 => DictionaryKind::Fst, - 2 => DictionaryKind::SSTable, - _ => { - return Err(io::Error::new( - io::ErrorKind::Other, - format!("invalid dictionary kind: {kind}"), - )) - } - }; - - Ok(DictionaryFooter { kind, version }) - } -} diff --git a/common/src/lib.rs b/common/src/lib.rs index e63a6b6c3..68a29ed4d 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -7,7 +7,6 @@ pub use byteorder::LittleEndian as Endianness; mod bitset; mod byte_count; mod datetime; -mod dictionary_footer; pub mod file_slice; mod group_by; mod serialize; @@ -16,7 +15,6 @@ mod writer; pub use bitset::*; pub use byte_count::ByteCount; pub use datetime::{DatePrecision, DateTime}; -pub use dictionary_footer::*; pub use group_by::GroupByIteratorExtended; pub use ownedbytes::{OwnedBytes, StableDeref}; pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize}; diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 238a89df1..582048bdf 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -130,7 +130,7 @@ mod tests { } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 95); + assert_eq!(file.len(), 91); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let column = fast_field_readers .u64("field") @@ -180,7 +180,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 123); + assert_eq!(file.len(), 119); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let col = fast_field_readers .u64("field") @@ -213,7 +213,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 96); + assert_eq!(file.len(), 92); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_reader = fast_field_readers .u64("field") @@ -245,7 +245,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 4491); + assert_eq!(file.len(), 4487); { let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let col = fast_field_readers @@ -278,7 +278,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 267); + assert_eq!(file.len(), 263); { let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); @@ -772,7 +772,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 104); + assert_eq!(file.len(), 100); let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); let bool_col = fast_field_readers.bool("field_bool").unwrap(); assert_eq!(bool_col.first(0), Some(true)); @@ -804,7 +804,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 116); + assert_eq!(file.len(), 112); let readers = FastFieldReaders::open(file, schema).unwrap(); let bool_col = readers.bool("field_bool").unwrap(); for i in 0..25 { @@ -829,7 +829,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 106); + assert_eq!(file.len(), 102); let fastfield_readers = FastFieldReaders::open(file, schema).unwrap(); let col = fastfield_readers.bool("field_bool").unwrap(); assert_eq!(col.first(0), None); diff --git a/src/termdict/fst_termdict/merger.rs b/src/termdict/fst_termdict/merger.rs index 83361a908..b41748bfb 100644 --- a/src/termdict/fst_termdict/merger.rs +++ b/src/termdict/fst_termdict/merger.rs @@ -2,8 +2,9 @@ use tantivy_fst::map::{OpBuilder, Union}; use tantivy_fst::raw::IndexedValue; use tantivy_fst::Streamer; +use super::termdict::TermDictionary; use crate::postings::TermInfo; -use crate::termdict::{TermDictionary, TermOrdinal, TermStreamer}; +use crate::termdict::{TermOrdinal, TermStreamer}; /// Given a list of sorted term streams, /// returns an iterator over sorted unique terms. diff --git a/src/termdict/fst_termdict/termdict.rs b/src/termdict/fst_termdict/termdict.rs index 01fb9a918..a2553b4d3 100644 --- a/src/termdict/fst_termdict/termdict.rs +++ b/src/termdict/fst_termdict/termdict.rs @@ -15,6 +15,8 @@ fn convert_fst_error(e: tantivy_fst::Error) -> io::Error { io::Error::new(io::ErrorKind::Other, e) } +const FST_VERSION: u32 = 1; + /// Builder for the new term dictionary. /// /// Inserting must be done in the order of the `keys`. @@ -80,6 +82,7 @@ where W: Write .serialize(&mut counting_writer)?; let footer_size = counting_writer.written_bytes(); footer_size.serialize(&mut counting_writer)?; + FST_VERSION.serialize(&mut counting_writer)?; } Ok(file) } @@ -118,9 +121,20 @@ pub struct TermDictionary { impl TermDictionary { /// Opens a `TermDictionary`. pub fn open(file: FileSlice) -> io::Result { - let (main_slice, footer_len_slice) = file.split_from_end(8); + let (main_slice, footer_len_slice) = file.split_from_end(12); let mut footer_len_bytes = footer_len_slice.read_bytes()?; let footer_size = u64::deserialize(&mut footer_len_bytes)?; + let version = u32::deserialize(&mut footer_len_bytes)?; + if version != FST_VERSION { + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Unsuported fst version, expected {}, found {}", + version, FST_VERSION, + ), + )); + } + let (fst_file_slice, values_file_slice) = main_slice.split_from_end(footer_size as usize); let fst_index = open_fst_index(fst_file_slice)?; let term_info_store = TermInfoStore::open(values_file_slice)?; diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 0b2763dc9..8fb903a6b 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -35,4 +35,161 @@ mod tests; /// Position of the term in the sorted list of terms. pub type TermOrdinal = u64; -pub use self::termdict::{TermDictionary, TermDictionaryBuilder, TermMerger, TermStreamer}; +use std::io; + +use common::file_slice::FileSlice; +use common::BinarySerializable; +use tantivy_fst::Automaton; + +use self::termdict::{ + TermDictionary as InnerTermDict, TermDictionaryBuilder as InnerTermDictBuilder, + TermStreamerBuilder, +}; +pub use self::termdict::{TermMerger, TermStreamer}; +use crate::postings::TermInfo; + +#[repr(u32)] +#[allow(dead_code)] +enum DictionaryType { + Fst = 1, + SSTable = 2, +} + +#[cfg(not(feature = "quickwit"))] +const CURRENT_TYPE: DictionaryType = DictionaryType::Fst; + +#[cfg(feature = "quickwit")] +const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable; + +// TODO in the future this should become an enum of supported dictionaries +/// A TermDictionary wrapping either an FST based dictionary or a SSTable based one. +pub struct TermDictionary(InnerTermDict); + +impl TermDictionary { + /// Opens a `TermDictionary`. + pub fn open(file: FileSlice) -> io::Result { + let (main_slice, dict_type) = file.split_from_end(4); + let mut dict_type = dict_type.read_bytes()?; + let dict_type = u32::deserialize(&mut dict_type)?; + + if dict_type != CURRENT_TYPE as u32 { + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Unsuported dictionary type, expected {}, found {}", + CURRENT_TYPE as u32, dict_type, + ), + )); + } + + InnerTermDict::open(main_slice).map(TermDictionary) + } + + /// Creates an empty term dictionary which contains no terms. + pub fn empty() -> Self { + TermDictionary(InnerTermDict::empty()) + } + + /// Returns the number of terms in the dictionary. + /// Term ordinals range from 0 to `num_terms() - 1`. + pub fn num_terms(&self) -> usize { + self.0.num_terms() + } + + /// Returns the ordinal associated with a given term. + pub fn term_ord>(&self, key: K) -> io::Result> { + self.0.term_ord(key) + } + + /// Stores the term associated with a given term ordinal in + /// a `bytes` buffer. + /// + /// Term ordinals are defined as the position of the term in + /// the sorted list of terms. + /// + /// Returns true if and only if the term has been found. + /// + /// Regardless of whether the term is found or not, + /// the buffer may be modified. + pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec) -> io::Result { + self.0.ord_to_term(ord, bytes) + } + + // this isn't used, and has different prototype in Fst and SSTable + // Returns the number of terms in the dictionary. + // pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo { + // self.0.term_info_from_ord(term_ord) + // } + + /// Lookups the value corresponding to the key. + pub fn get>(&self, key: K) -> io::Result> { + self.0.get(key) + } + + /// Returns a range builder, to stream all of the terms + /// within an interval. + pub fn range(&self) -> TermStreamerBuilder<'_> { + self.0.range() + } + + /// A stream of all the sorted terms. + pub fn stream(&self) -> io::Result> { + self.0.stream() + } + + /// Returns a search builder, to stream all of the terms + /// within the Automaton + pub fn search<'a, A: Automaton + 'a>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A> + where A::State: Clone { + self.0.search(automaton) + } + + #[cfg(feature = "quickwit")] + /// Lookups the value corresponding to the key. + pub async fn get_async>(&self, key: K) -> io::Result> { + self.0.get_async(key).await + } +} + +/// A TermDictionaryBuilder wrapping either an FST or a SSTable dictionary builder. +pub struct TermDictionaryBuilder(InnerTermDictBuilder); + +impl TermDictionaryBuilder { + /// Creates a new `TermDictionaryBuilder` + pub fn create(w: W) -> io::Result { + InnerTermDictBuilder::create(w).map(TermDictionaryBuilder) + } + + /// Inserts a `(key, value)` pair in the term dictionary. + /// + /// *Keys have to be inserted in order.* + pub fn insert>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> { + self.0.insert(key_ref, value) + } + + /// # Warning + /// Horribly dangerous internal API + /// + /// If used, it must be used by systematically alternating calls + /// to insert_key and insert_value. + /// + /// Prefer using `.insert(key, value)` + pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { + self.0.insert_key(key) + } + + /// # Warning + /// + /// Horribly dangerous internal API. See `.insert_key(...)`. + pub fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { + self.0.insert_value(term_info) + } + + /// Finalize writing the builder, and returns the underlying + /// `Write` object. + pub fn finish(self) -> io::Result { + let mut writer = self.0.finish()?; + (CURRENT_TYPE as u32).serialize(&mut writer)?; + Ok(writer) + } +} diff --git a/src/termdict/sstable_termdict/mod.rs b/src/termdict/sstable_termdict/mod.rs index ce329c916..621b85b35 100644 --- a/src/termdict/sstable_termdict/mod.rs +++ b/src/termdict/sstable_termdict/mod.rs @@ -30,6 +30,8 @@ pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, /// SSTable used to store TermInfo objects. pub struct TermSSTable; +pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>; + impl SSTable for TermSSTable { type Value = TermInfo; type ValueReader = TermInfoValueReader; diff --git a/sstable/README.md b/sstable/README.md index 9fdc5b609..83d3c8139 100644 --- a/sstable/README.md +++ b/sstable/README.md @@ -87,16 +87,15 @@ Note: there is no ambiguity between both representation as Add is always guarant ### SSTFooter ``` -+-------+-------+-----+-------------+---------+---------+------+ -| Block | Block | ... | IndexOffset | NumTerm | Version | Type | -+-------+-------+-----+-------------+---------+---------+------+ ++-------+-------+-----+-------------+---------+---------+ +| Block | Block | ... | IndexOffset | NumTerm | Version | ++-------+-------+-----+-------------+---------+---------+ |----( # of blocks)---| ``` - Block(SSTBlock): uses IndexValue for its Values format - IndexOffset(u64): Offset to the start of the SSTFooter - NumTerm(u64): number of terms in the sstable - Version(u32): Currently defined to 0x00\_00\_00\_01 -- Type(u32): Defined to 0x00\_00\_00\_02 ### IndexValue ``` diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index f81262d2b..3d1c80d60 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -5,7 +5,7 @@ use std::ops::{Bound, RangeBounds}; use std::sync::Arc; use common::file_slice::FileSlice; -use common::{BinarySerializable, DictionaryFooter, OwnedBytes}; +use common::{BinarySerializable, OwnedBytes}; use tantivy_fst::automaton::AlwaysMatch; use tantivy_fst::Automaton; @@ -178,14 +178,22 @@ impl Dictionary { /// Opens a `TermDictionary`. pub fn open(term_dictionary_file: FileSlice) -> io::Result { - let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(24); + let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20); let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?; let index_offset = u64::deserialize(&mut footer_len_bytes)?; let num_terms = u64::deserialize(&mut footer_len_bytes)?; - - let footer = DictionaryFooter::deserialize(&mut footer_len_bytes)?; - crate::FOOTER.verify_equal(&footer)?; + let version = u32::deserialize(&mut footer_len_bytes)?; + if version != crate::SSTABLE_VERSION { + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Unsuported sstable version, expected {}, found {}", + version, + crate::SSTABLE_VERSION, + ), + )); + } let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); let sstable_index_bytes = index_slice.read_bytes()?; diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 9f7783755..00a88793c 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -17,7 +17,7 @@ pub use dictionary::Dictionary; pub use streamer::{Streamer, StreamerBuilder}; mod block_reader; -use common::{BinarySerializable, DictionaryFooter, DictionaryKind}; +use common::BinarySerializable; pub use self::block_reader::BlockReader; pub use self::delta::{DeltaReader, DeltaWriter}; @@ -28,10 +28,7 @@ use crate::value::{RangeValueReader, RangeValueWriter}; pub type TermOrdinal = u64; const DEFAULT_KEY_CAPACITY: usize = 50; -const FOOTER: DictionaryFooter = DictionaryFooter { - kind: DictionaryKind::SSTable, - version: 1, -}; +const SSTABLE_VERSION: u32 = 1; /// Given two byte string returns the length of /// the longest common prefix. @@ -311,7 +308,7 @@ where wrt.write_all(&offset.to_le_bytes())?; wrt.write_all(&self.num_terms.to_le_bytes())?; - FOOTER.serialize(&mut wrt)?; + SSTABLE_VERSION.serialize(&mut wrt)?; let wrt = wrt.finish(); Ok(wrt.into_inner()?) @@ -398,7 +395,6 @@ mod test { 15, 0, 0, 0, 0, 0, 0, 0, // index start offset 3, 0, 0, 0, 0, 0, 0, 0, // num_term 1, 0, 0, 0, // version - 2, 0, 0, 0, // dictionary kind. sstable = 2 ] ); let mut sstable_reader = VoidSSTable::reader(&buffer[..]);