encode dictionary type in fst footer (#1968)

* encode additional footer for dictionary kind in fst
This commit is contained in:
trinity-1686a
2023-04-12 09:43:01 +02:00
committed by GitHub
parent 4b01cc4c49
commit 205e8a0a92
11 changed files with 206 additions and 94 deletions

View File

@@ -26,7 +26,7 @@ fn test_dataframe_writer_str() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 89);
assert_eq!(cols[0].num_bytes(), 85);
}
#[test]
@@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 89);
assert_eq!(cols[0].num_bytes(), 85);
}
#[test]

View File

@@ -1,63 +0,0 @@
use std::io::{self, Read, Write};
use crate::BinarySerializable;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u32)]
pub enum DictionaryKind {
Fst = 1,
SSTable = 2,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DictionaryFooter {
pub kind: DictionaryKind,
pub version: u32,
}
impl DictionaryFooter {
pub fn verify_equal(&self, other: &DictionaryFooter) -> io::Result<()> {
if self.kind != other.kind {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Invalid dictionary type, expected {:?}, found {:?}",
self.kind, other.kind
),
));
}
if self.version != other.version {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported dictionary version, expected {}, found {}",
self.version, other.version
),
));
}
Ok(())
}
}
impl BinarySerializable for DictionaryFooter {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
self.version.serialize(writer)?;
(self.kind as u32).serialize(writer)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let version = u32::deserialize(reader)?;
let kind = u32::deserialize(reader)?;
let kind = match kind {
1 => DictionaryKind::Fst,
2 => DictionaryKind::SSTable,
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("invalid dictionary kind: {kind}"),
))
}
};
Ok(DictionaryFooter { kind, version })
}
}

View File

@@ -7,7 +7,6 @@ pub use byteorder::LittleEndian as Endianness;
mod bitset;
mod byte_count;
mod datetime;
mod dictionary_footer;
pub mod file_slice;
mod group_by;
mod serialize;
@@ -16,7 +15,6 @@ mod writer;
pub use bitset::*;
pub use byte_count::ByteCount;
pub use datetime::{DatePrecision, DateTime};
pub use dictionary_footer::*;
pub use group_by::GroupByIteratorExtended;
pub use ownedbytes::{OwnedBytes, StableDeref};
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};

View File

@@ -130,7 +130,7 @@ mod tests {
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 95);
assert_eq!(file.len(), 91);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let column = fast_field_readers
.u64("field")
@@ -180,7 +180,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 123);
assert_eq!(file.len(), 119);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
.u64("field")
@@ -213,7 +213,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 96);
assert_eq!(file.len(), 92);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let fast_field_reader = fast_field_readers
.u64("field")
@@ -245,7 +245,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 4491);
assert_eq!(file.len(), 4487);
{
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
@@ -278,7 +278,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 267);
assert_eq!(file.len(), 263);
{
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
@@ -772,7 +772,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 104);
assert_eq!(file.len(), 100);
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = fast_field_readers.bool("field_bool").unwrap();
assert_eq!(bool_col.first(0), Some(true));
@@ -804,7 +804,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 116);
assert_eq!(file.len(), 112);
let readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = readers.bool("field_bool").unwrap();
for i in 0..25 {
@@ -829,7 +829,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 106);
assert_eq!(file.len(), 102);
let fastfield_readers = FastFieldReaders::open(file, schema).unwrap();
let col = fastfield_readers.bool("field_bool").unwrap();
assert_eq!(col.first(0), None);

View File

@@ -2,8 +2,9 @@ use tantivy_fst::map::{OpBuilder, Union};
use tantivy_fst::raw::IndexedValue;
use tantivy_fst::Streamer;
use super::termdict::TermDictionary;
use crate::postings::TermInfo;
use crate::termdict::{TermDictionary, TermOrdinal, TermStreamer};
use crate::termdict::{TermOrdinal, TermStreamer};
/// Given a list of sorted term streams,
/// returns an iterator over sorted unique terms.

View File

@@ -15,6 +15,8 @@ fn convert_fst_error(e: tantivy_fst::Error) -> io::Error {
io::Error::new(io::ErrorKind::Other, e)
}
const FST_VERSION: u32 = 1;
/// Builder for the new term dictionary.
///
/// Inserting must be done in the order of the `keys`.
@@ -80,6 +82,7 @@ where W: Write
.serialize(&mut counting_writer)?;
let footer_size = counting_writer.written_bytes();
footer_size.serialize(&mut counting_writer)?;
FST_VERSION.serialize(&mut counting_writer)?;
}
Ok(file)
}
@@ -118,9 +121,20 @@ pub struct TermDictionary {
impl TermDictionary {
/// Opens a `TermDictionary`.
pub fn open(file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = file.split_from_end(8);
let (main_slice, footer_len_slice) = file.split_from_end(12);
let mut footer_len_bytes = footer_len_slice.read_bytes()?;
let footer_size = u64::deserialize(&mut footer_len_bytes)?;
let version = u32::deserialize(&mut footer_len_bytes)?;
if version != FST_VERSION {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported fst version, expected {}, found {}",
version, FST_VERSION,
),
));
}
let (fst_file_slice, values_file_slice) = main_slice.split_from_end(footer_size as usize);
let fst_index = open_fst_index(fst_file_slice)?;
let term_info_store = TermInfoStore::open(values_file_slice)?;

View File

@@ -35,4 +35,161 @@ mod tests;
/// Position of the term in the sorted list of terms.
pub type TermOrdinal = u64;
pub use self::termdict::{TermDictionary, TermDictionaryBuilder, TermMerger, TermStreamer};
use std::io;
use common::file_slice::FileSlice;
use common::BinarySerializable;
use tantivy_fst::Automaton;
use self::termdict::{
TermDictionary as InnerTermDict, TermDictionaryBuilder as InnerTermDictBuilder,
TermStreamerBuilder,
};
pub use self::termdict::{TermMerger, TermStreamer};
use crate::postings::TermInfo;
#[repr(u32)]
#[allow(dead_code)]
enum DictionaryType {
Fst = 1,
SSTable = 2,
}
#[cfg(not(feature = "quickwit"))]
const CURRENT_TYPE: DictionaryType = DictionaryType::Fst;
#[cfg(feature = "quickwit")]
const CURRENT_TYPE: DictionaryType = DictionaryType::SSTable;
// TODO in the future this should become an enum of supported dictionaries
/// A TermDictionary wrapping either an FST based dictionary or a SSTable based one.
pub struct TermDictionary(InnerTermDict);
impl TermDictionary {
/// Opens a `TermDictionary`.
pub fn open(file: FileSlice) -> io::Result<Self> {
let (main_slice, dict_type) = file.split_from_end(4);
let mut dict_type = dict_type.read_bytes()?;
let dict_type = u32::deserialize(&mut dict_type)?;
if dict_type != CURRENT_TYPE as u32 {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported dictionary type, expected {}, found {}",
CURRENT_TYPE as u32, dict_type,
),
));
}
InnerTermDict::open(main_slice).map(TermDictionary)
}
/// Creates an empty term dictionary which contains no terms.
pub fn empty() -> Self {
TermDictionary(InnerTermDict::empty())
}
/// Returns the number of terms in the dictionary.
/// Term ordinals range from 0 to `num_terms() - 1`.
pub fn num_terms(&self) -> usize {
self.0.num_terms()
}
/// Returns the ordinal associated with a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
self.0.term_ord(key)
}
/// Stores the term associated with a given term ordinal in
/// a `bytes` buffer.
///
/// Term ordinals are defined as the position of the term in
/// the sorted list of terms.
///
/// Returns true if and only if the term has been found.
///
/// Regardless of whether the term is found or not,
/// the buffer may be modified.
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
self.0.ord_to_term(ord, bytes)
}
// this isn't used, and has different prototype in Fst and SSTable
// Returns the number of terms in the dictionary.
// pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> TermInfo {
// self.0.term_info_from_ord(term_ord)
// }
/// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
self.0.get(key)
}
/// Returns a range builder, to stream all of the terms
/// within an interval.
pub fn range(&self) -> TermStreamerBuilder<'_> {
self.0.range()
}
/// A stream of all the sorted terms.
pub fn stream(&self) -> io::Result<TermStreamer<'_>> {
self.0.stream()
}
/// Returns a search builder, to stream all of the terms
/// within the Automaton
pub fn search<'a, A: Automaton + 'a>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A>
where A::State: Clone {
self.0.search(automaton)
}
#[cfg(feature = "quickwit")]
/// Lookups the value corresponding to the key.
pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermInfo>> {
self.0.get_async(key).await
}
}
/// A TermDictionaryBuilder wrapping either an FST or a SSTable dictionary builder.
pub struct TermDictionaryBuilder<W: io::Write>(InnerTermDictBuilder<W>);
impl<W: io::Write> TermDictionaryBuilder<W> {
/// Creates a new `TermDictionaryBuilder`
pub fn create(w: W) -> io::Result<Self> {
InnerTermDictBuilder::create(w).map(TermDictionaryBuilder)
}
/// Inserts a `(key, value)` pair in the term dictionary.
///
/// *Keys have to be inserted in order.*
pub fn insert<K: AsRef<[u8]>>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> {
self.0.insert(key_ref, value)
}
/// # Warning
/// Horribly dangerous internal API
///
/// If used, it must be used by systematically alternating calls
/// to insert_key and insert_value.
///
/// Prefer using `.insert(key, value)`
pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> {
self.0.insert_key(key)
}
/// # Warning
///
/// Horribly dangerous internal API. See `.insert_key(...)`.
pub fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> {
self.0.insert_value(term_info)
}
/// Finalize writing the builder, and returns the underlying
/// `Write` object.
pub fn finish(self) -> io::Result<W> {
let mut writer = self.0.finish()?;
(CURRENT_TYPE as u32).serialize(&mut writer)?;
Ok(writer)
}
}

View File

@@ -30,6 +30,8 @@ pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable,
/// SSTable used to store TermInfo objects.
pub struct TermSSTable;
pub type TermStreamerBuilder<'a, A = AlwaysMatch> = sstable::StreamerBuilder<'a, TermSSTable, A>;
impl SSTable for TermSSTable {
type Value = TermInfo;
type ValueReader = TermInfoValueReader;

View File

@@ -87,16 +87,15 @@ Note: there is no ambiguity between both representation as Add is always guarant
### SSTFooter
```
+-------+-------+-----+-------------+---------+---------+------+
| Block | Block | ... | IndexOffset | NumTerm | Version | Type |
+-------+-------+-----+-------------+---------+---------+------+
+-------+-------+-----+-------------+---------+---------+
| Block | Block | ... | IndexOffset | NumTerm | Version |
+-------+-------+-----+-------------+---------+---------+
|----( # of blocks)---|
```
- Block(SSTBlock): uses IndexValue for its Values format
- IndexOffset(u64): Offset to the start of the SSTFooter
- NumTerm(u64): number of terms in the sstable
- Version(u32): Currently defined to 0x00\_00\_00\_01
- Type(u32): Defined to 0x00\_00\_00\_02
### IndexValue
```

View File

@@ -5,7 +5,7 @@ use std::ops::{Bound, RangeBounds};
use std::sync::Arc;
use common::file_slice::FileSlice;
use common::{BinarySerializable, DictionaryFooter, OwnedBytes};
use common::{BinarySerializable, OwnedBytes};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;
@@ -178,14 +178,22 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Opens a `TermDictionary`.
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(24);
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
let index_offset = u64::deserialize(&mut footer_len_bytes)?;
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let footer = DictionaryFooter::deserialize(&mut footer_len_bytes)?;
crate::FOOTER.verify_equal(&footer)?;
let version = u32::deserialize(&mut footer_len_bytes)?;
if version != crate::SSTABLE_VERSION {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported sstable version, expected {}, found {}",
version,
crate::SSTABLE_VERSION,
),
));
}
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;

View File

@@ -17,7 +17,7 @@ pub use dictionary::Dictionary;
pub use streamer::{Streamer, StreamerBuilder};
mod block_reader;
use common::{BinarySerializable, DictionaryFooter, DictionaryKind};
use common::BinarySerializable;
pub use self::block_reader::BlockReader;
pub use self::delta::{DeltaReader, DeltaWriter};
@@ -28,10 +28,7 @@ use crate::value::{RangeValueReader, RangeValueWriter};
pub type TermOrdinal = u64;
const DEFAULT_KEY_CAPACITY: usize = 50;
const FOOTER: DictionaryFooter = DictionaryFooter {
kind: DictionaryKind::SSTable,
version: 1,
};
const SSTABLE_VERSION: u32 = 1;
/// Given two byte string returns the length of
/// the longest common prefix.
@@ -311,7 +308,7 @@ where
wrt.write_all(&offset.to_le_bytes())?;
wrt.write_all(&self.num_terms.to_le_bytes())?;
FOOTER.serialize(&mut wrt)?;
SSTABLE_VERSION.serialize(&mut wrt)?;
let wrt = wrt.finish();
Ok(wrt.into_inner()?)
@@ -398,7 +395,6 @@ mod test {
15, 0, 0, 0, 0, 0, 0, 0, // index start offset
3, 0, 0, 0, 0, 0, 0, 0, // num_term
1, 0, 0, 0, // version
2, 0, 0, 0, // dictionary kind. sstable = 2
]
);
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);