new sstable format (#1943)

* document a new sstable format

* add support for changing target block size

* use new format for sstable index

* handle sstable version errror

* use very small blocks for proptests

* add a footer structure
This commit is contained in:
trinity-1686a
2023-03-21 15:03:52 +01:00
committed by GitHub
parent 8f7f1d6be4
commit e5e50603a8
12 changed files with 391 additions and 48 deletions

View File

@@ -17,7 +17,7 @@ fn test_dataframe_writer_str() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 158);
assert_eq!(cols[0].num_bytes(), 88);
}
#[test]
@@ -31,7 +31,7 @@ fn test_dataframe_writer_bytes() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 158);
assert_eq!(cols[0].num_bytes(), 88);
}
#[test]

View File

@@ -0,0 +1,63 @@
use std::io::{self, Read, Write};
use crate::BinarySerializable;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u32)]
pub enum DictionaryKind {
Fst = 1,
SSTable = 2,
}
#[derive(Debug, Clone, PartialEq)]
pub struct DictionaryFooter {
pub kind: DictionaryKind,
pub version: u32,
}
impl DictionaryFooter {
pub fn verify_equal(&self, other: &DictionaryFooter) -> io::Result<()> {
if self.kind != other.kind {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Invalid dictionary type, expected {:?}, found {:?}",
self.kind, other.kind
),
));
}
if self.version != other.version {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported dictionary version, expected {}, found {}",
self.version, other.version
),
));
}
Ok(())
}
}
impl BinarySerializable for DictionaryFooter {
fn serialize<W: Write + ?Sized>(&self, writer: &mut W) -> io::Result<()> {
self.version.serialize(writer)?;
(self.kind as u32).serialize(writer)
}
fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
let version = u32::deserialize(reader)?;
let kind = u32::deserialize(reader)?;
let kind = match kind {
1 => DictionaryKind::Fst,
2 => DictionaryKind::SSTable,
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!("invalid dictionary kind: {kind}"),
))
}
};
Ok(DictionaryFooter { kind, version })
}
}

View File

@@ -7,6 +7,7 @@ pub use byteorder::LittleEndian as Endianness;
mod bitset;
mod byte_count;
mod datetime;
mod dictionary_footer;
pub mod file_slice;
mod group_by;
mod serialize;
@@ -15,6 +16,7 @@ mod writer;
pub use bitset::*;
pub use byte_count::ByteCount;
pub use datetime::{DatePrecision, DateTime};
pub use dictionary_footer::*;
pub use group_by::GroupByIteratorExtended;
pub use ownedbytes::{OwnedBytes, StableDeref};
pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize};

View File

@@ -130,7 +130,7 @@ mod tests {
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 161);
assert_eq!(file.len(), 94);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let column = fast_field_readers
.u64("field")
@@ -180,7 +180,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 189);
assert_eq!(file.len(), 122);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
.u64("field")
@@ -213,7 +213,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 162);
assert_eq!(file.len(), 95);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let fast_field_reader = fast_field_readers
.u64("field")
@@ -245,7 +245,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 4557);
assert_eq!(file.len(), 4490);
{
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
@@ -278,7 +278,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 333_usize);
assert_eq!(file.len(), 266);
{
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
@@ -772,7 +772,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 175);
assert_eq!(file.len(), 103);
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = fast_field_readers.bool("field_bool").unwrap();
assert_eq!(bool_col.first(0), Some(true));
@@ -804,7 +804,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 187);
assert_eq!(file.len(), 115);
let readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = readers.bool("field_bool").unwrap();
for i in 0..25 {
@@ -829,7 +829,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 177);
assert_eq!(file.len(), 105);
let fastfield_readers = FastFieldReaders::open(file, schema).unwrap();
let col = fastfield_readers.bool("field_bool").unwrap();
assert_eq!(col.first(0), None);

View File

@@ -6,8 +6,6 @@ license = "MIT"
[dependencies]
common = {path="../common", package="tantivy-common"}
ciborium = "0.2"
serde = "1"
tantivy-fst = "0.4"
[dev-dependencies]

View File

@@ -26,3 +26,94 @@ possible.
- it allows incremental encoding of the keys
- the front compression is leveraged to optimize
the intersection with an automaton
# On disk format
Overview of the SSTable format. Unless noted otherwise, numbers are little-endian.
### SSTable
```
+-------+-------+-----+--------+
| Block | Block | ... | Footer |
+-------+-------+-----+--------+
|----( # of blocks)---|
```
- Block(`SSTBlock`): list of independent block, terminated by a single empty block.
- Footer(`SSTFooter`)
### SSTBlock
```
+----------+--------+-------+-------+-----+
| BlockLen | Values | Delta | Delta | ... |
+----------+--------+-------+-------+-----+
|----( # of deltas)---|
```
- BlockLen(u32): length of the block
- Values: an application defined format storing a sequence of value, capable of determining it own length
- Delta
### Delta
```
+---------+--------+
| KeepAdd | Suffix |
+---------+--------+
```
- KeepAdd
- Suffix: KeepAdd.add bytes of key suffix
### KeepAdd
KeepAdd can be represented in two different representation, a very compact 1byte one which is enough for most usage, and a longer variable-len one when required
When keep < 16 and add < 16
```
+-----+------+
| Add | Keep |
+-----+------+
```
- Add(u4): number of bytes to push
- Keep(u4): number of bytes to pop
Otherwise:
```
+------+------+-----+
| 0x01 | Keep | Add |
+------+------+-----+
```
- Add(VInt): number of bytes to push
- Keep(VInt): number of bytes to pop
Note: there is no ambiguity between both representation as Add is always guarantee to be non-zero, except for the very first key of an SSTable, where Keep is guaranteed to be zero.
### SSTFooter
```
+-------+-------+-----+-------------+---------+---------+------+
| Block | Block | ... | IndexOffset | NumTerm | Version | Type |
+-------+-------+-----+-------------+---------+---------+------+
|----( # of blocks)---|
```
- Block(SSTBlock): uses IndexValue for its Values format
- IndexOffset(u64): Offset to the start of the SSTFooter
- NumTerm(u64): number of terms in the sstable
- Version(u32): Currently defined to 0x00\_00\_00\_01
- Type(u32): Defined to 0x00\_00\_00\_02
### IndexValue
```
+------------+-------+-------+-----+
| EntryCount | Entry | Entry | ... |
+------------+-------+-------+-----+
|---( # of entries)---|
```
- EntryCount(VInt): number of entries
- Entry (IndexEntry)
### Entry
```
+----------+--------------+
| BlockLen | FirstOrdinal |
+----------+--------------+
```
- BlockLen(VInt): length of the block
- FirstOrdinal(VInt): ordinal of the first element in the given block

View File

@@ -18,6 +18,7 @@ where W: io::Write
value_writer: TValueWriter,
// Only here to avoid allocations.
stateless_buffer: Vec<u8>,
block_len: usize,
}
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
@@ -31,15 +32,14 @@ where
write: CountingWriter::wrap(BufWriter::new(wrt)),
value_writer: TValueWriter::default(),
stateless_buffer: Vec::new(),
block_len: BLOCK_LEN,
}
}
}
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
where
W: io::Write,
TValueWriter: value::ValueWriter,
{
pub fn set_block_len(&mut self, block_len: usize) {
self.block_len = block_len
}
pub fn flush_block(&mut self) -> io::Result<Option<Range<usize>>> {
if self.block.is_empty() {
return Ok(None);
@@ -82,7 +82,7 @@ where
}
pub fn flush_block_if_required(&mut self) -> io::Result<Option<Range<usize>>> {
if self.block.len() > BLOCK_LEN {
if self.block.len() > self.block_len {
return self.flush_block();
}
Ok(None)

View File

@@ -5,7 +5,7 @@ use std::ops::{Bound, RangeBounds};
use std::sync::Arc;
use common::file_slice::FileSlice;
use common::{BinarySerializable, OwnedBytes};
use common::{BinarySerializable, DictionaryFooter, OwnedBytes};
use tantivy_fst::automaton::AlwaysMatch;
use tantivy_fst::Automaton;
@@ -110,7 +110,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// only block for up to `limit` matching terms.
///
/// It works by identifying
/// - `first_block`: the block containing the start boudary key
/// - `first_block`: the block containing the start boundary key
/// - `last_block`: the block containing the end boundary key.
///
/// And then returning the range that spans over all blocks between.
@@ -178,10 +178,15 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Opens a `TermDictionary`.
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16);
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(24);
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
let index_offset = u64::deserialize(&mut footer_len_bytes)?;
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let footer = DictionaryFooter::deserialize(&mut footer_len_bytes)?;
crate::FOOTER.verify_equal(&footer)?;
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice())
@@ -231,7 +236,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let suffix = sstable_delta_reader.suffix();
match prefix_len.cmp(&ok_bytes) {
Ordering::Less => return Ok(None), // poped bytes already matched => too far
Ordering::Less => return Ok(None), // popped bytes already matched => too far
Ordering::Equal => (),
Ordering::Greater => {
// the ok prefix is less than current entry prefix => continue to next elem

View File

@@ -17,6 +17,8 @@ pub use dictionary::Dictionary;
pub use streamer::{Streamer, StreamerBuilder};
mod block_reader;
use common::{BinarySerializable, DictionaryFooter, DictionaryKind};
pub use self::block_reader::BlockReader;
pub use self::delta::{DeltaReader, DeltaWriter};
pub use self::merge::VoidMerge;
@@ -26,6 +28,10 @@ use crate::value::{RangeValueReader, RangeValueWriter};
pub type TermOrdinal = u64;
const DEFAULT_KEY_CAPACITY: usize = 50;
const FOOTER: DictionaryFooter = DictionaryFooter {
kind: DictionaryKind::SSTable,
version: 1,
};
/// Given two byte string returns the length of
/// the longest common prefix.
@@ -201,6 +207,14 @@ where
}
}
/// Set the target block length.
///
/// The delta part of a block will generally be slightly larger than the requested `block_len`,
/// however this does not account for the length of the Value part of the table.
pub fn set_block_len(&mut self, block_len: usize) {
self.delta_writer.set_block_len(block_len)
}
/// Returns the last inserted key.
/// If no key has been inserted yet, or the block was just
/// flushed, this function returns "".
@@ -288,6 +302,7 @@ where
self.first_ordinal_of_the_block = self.num_terms;
}
let mut wrt = self.delta_writer.finish();
// add a final empty block as an end marker
wrt.write_all(&0u32.to_le_bytes())?;
let offset = wrt.written_bytes();
@@ -295,6 +310,9 @@ where
self.index_builder.serialize(&mut wrt)?;
wrt.write_all(&offset.to_le_bytes())?;
wrt.write_all(&self.num_terms.to_le_bytes())?;
FOOTER.serialize(&mut wrt)?;
let wrt = wrt.finish();
Ok(wrt.into_inner()?)
}
@@ -371,19 +389,25 @@ mod test {
assert_eq!(
&buffer,
&[
// block len
7u8, 0u8, 0u8, 0u8, // keep 0 push 1 | ""
16u8, 17u8, // keep 1 push 2 | 18 19
33u8, 18u8, 19u8, // keep 1 push 1 | 20
17u8, 20u8, 0u8, 0u8, 0u8, 0u8, // no more blocks
// block
7u8, 0u8, 0u8, 0u8, // block len
16u8, 17u8, // keep 0 push 1 | 17
33u8, 18u8, 19u8, // keep 1 push 2 | 18 19
17u8, 20u8, // keep 1 push 1 | 20
// end of block
0u8, 0u8, 0u8, 0u8, // no more blocks
// index
161, 102, 98, 108, 111, 99, 107, 115, 129, 162, 115, 108, 97, 115, 116, 95, 107,
101, 121, 95, 111, 114, 95, 103, 114, 101, 97, 116, 101, 114, 130, 17, 20, 106, 98,
108, 111, 99, 107, 95, 97, 100, 100, 114, 162, 106, 98, 121, 116, 101, 95, 114, 97,
110, 103, 101, 162, 101, 115, 116, 97, 114, 116, 0, 99, 101, 110, 100, 11, 109,
102, 105, 114, 115, 116, 95, 111, 114, 100, 105, 110, 97, 108, 0, 15, 0, 0, 0, 0,
0, 0, 0, // offset for the index
3u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8, 0u8 // num terms
6u8, 0u8, 0u8, 0u8, // block len
1, // num blocks
11, // len of 1st block
0, // first ord of 1st block
32, 17, 20, // keep 0 push 2 | 17 20
// end of block
0, 0, 0, 0, // no more blocks
15, 0, 0, 0, 0, 0, 0, 0, // index start offset
3, 0, 0, 0, 0, 0, 0, 0, // num_term
1, 0, 0, 0, // version
2, 0, 0, 0, // dictionary kind. sstable = 2
]
);
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
@@ -501,8 +525,8 @@ mod test {
fn test_proptest_sstable_ranges(words in prop::collection::btree_set("[a-c]{0,6}", 1..100),
(lower_bound, upper_bound) in bounds_strategy(),
) {
// TODO tweak block size.
let mut builder = Dictionary::<VoidSSTable>::builder(Vec::new()).unwrap();
builder.set_block_len(16);
for word in &words {
builder.insert(word.as_bytes(), &()).unwrap();
}

View File

@@ -1,11 +1,9 @@
use std::io;
use std::io::{self, Write};
use std::ops::Range;
use serde::{Deserialize, Serialize};
use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal};
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
#[derive(Default, Debug, Clone)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
}
@@ -13,7 +11,17 @@ pub struct SSTableIndex {
impl SSTableIndex {
/// Load an index from its binary representation
pub fn load(data: &[u8]) -> Result<SSTableIndex, SSTableDataCorruption> {
ciborium::de::from_reader(data).map_err(|_| SSTableDataCorruption)
let mut reader = IndexSSTable::reader(data);
let mut blocks = Vec::new();
while reader.advance().map_err(|_| SSTableDataCorruption)? {
blocks.push(BlockMeta {
last_key_or_greater: reader.key().to_vec(),
block_addr: reader.value().clone(),
});
}
Ok(SSTableIndex { blocks })
}
/// Get the [`BlockAddr`] of the requested block.
@@ -23,7 +31,7 @@ impl SSTableIndex {
.map(|block_meta| block_meta.block_addr.clone())
}
/// Get the block id of the block that woudl contain `key`.
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<usize> {
@@ -69,13 +77,13 @@ impl SSTableIndex {
}
}
#[derive(Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct BlockAddr {
pub byte_range: Range<usize>,
pub first_ordinal: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub(crate) struct BlockMeta {
/// Any byte string that is lexicographically greater or equal to
/// the last key in the block,
@@ -130,11 +138,44 @@ impl SSTableIndexBuilder {
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<()> {
ciborium::ser::into_writer(&self.index, wrt)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))
// we can't use a plain writer as it would generate an index
let mut sstable_writer = IndexSSTable::delta_writer(wrt);
// in tests, set a smaller block size to stress-test
#[cfg(test)]
sstable_writer.set_block_len(16);
let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
for block in self.index.blocks.iter() {
let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater);
sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]);
sstable_writer.write_value(&block.block_addr);
previous_key.clear();
previous_key.extend_from_slice(&block.last_key_or_greater);
}
sstable_writer.flush_block()?;
sstable_writer.finish().write_all(&0u32.to_le_bytes())?;
Ok(())
}
}
/// SSTable representing an index
///
/// `last_key_or_greater` is used as the key, the value contains the
/// length and first ordinal of each block. The start offset is implicitly
/// obtained from lengths.
struct IndexSSTable;
impl SSTable for IndexSSTable {
type Value = BlockAddr;
type ValueReader = crate::value::index::IndexValueReader;
type ValueWriter = crate::value::index::IndexValueWriter;
}
#[cfg(test)]
mod tests {
use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
@@ -143,7 +184,7 @@ mod tests {
#[test]
fn test_sstable_index() {
let mut sstable_builder = SSTableIndexBuilder::default();
sstable_builder.add_block(b"aaa", 10..20, 0u64);
sstable_builder.add_block(b"aaa", 0..20, 0u64);
sstable_builder.add_block(b"bbbbbbb", 20..30, 5u64);
sstable_builder.add_block(b"ccc", 30..40, 10u64);
sstable_builder.add_block(b"dddd", 40..50, 15u64);

118
sstable/src/value/index.rs Normal file
View File

@@ -0,0 +1,118 @@
use std::io;
use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter};
use crate::{vint, BlockAddr};
#[derive(Default)]
pub(crate) struct IndexValueReader {
vals: Vec<BlockAddr>,
}
impl ValueReader for IndexValueReader {
type Value = BlockAddr;
#[inline(always)]
fn value(&self, idx: usize) -> &Self::Value {
&self.vals[idx]
}
fn load(&mut self, mut data: &[u8]) -> io::Result<usize> {
let original_num_bytes = data.len();
let num_vals = deserialize_vint_u64(&mut data) as usize;
self.vals.clear();
let mut first_ordinal = 0u64;
let mut prev_start = 0usize;
for _ in 0..num_vals {
let len = deserialize_vint_u64(&mut data);
let delta_ordinal = deserialize_vint_u64(&mut data);
first_ordinal += delta_ordinal;
let end = prev_start + len as usize;
self.vals.push(BlockAddr {
byte_range: prev_start..end,
first_ordinal,
});
prev_start = end;
}
Ok(original_num_bytes - data.len())
}
}
#[derive(Default)]
pub(crate) struct IndexValueWriter {
vals: Vec<BlockAddr>,
}
impl ValueWriter for IndexValueWriter {
type Value = BlockAddr;
fn write(&mut self, val: &Self::Value) {
self.vals.push(val.clone());
}
fn serialize_block(&self, output: &mut Vec<u8>) {
let mut prev_ord = 0u64;
vint::serialize_into_vec(self.vals.len() as u64, output);
// TODO use array_windows when it gets stabilized
for elem in self.vals.windows(2) {
let [current, next] = elem else {
unreachable!("windows should always return exactly 2 elements");
};
let len = next.byte_range.start - current.byte_range.start;
vint::serialize_into_vec(len as u64, output);
let delta = current.first_ordinal - prev_ord;
vint::serialize_into_vec(delta, output);
prev_ord = current.first_ordinal;
}
if let Some(last) = self.vals.last() {
let len = last.byte_range.end - last.byte_range.start;
vint::serialize_into_vec(len as u64, output);
let delta = last.first_ordinal - prev_ord;
vint::serialize_into_vec(delta, output);
}
}
fn clear(&mut self) {
self.vals.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_index_reader_writer() {
crate::value::tests::test_value_reader_writer::<_, IndexValueReader, IndexValueWriter>(&[]);
crate::value::tests::test_value_reader_writer::<_, IndexValueReader, IndexValueWriter>(&[
BlockAddr {
byte_range: 0..10,
first_ordinal: 0,
},
]);
crate::value::tests::test_value_reader_writer::<_, IndexValueReader, IndexValueWriter>(&[
BlockAddr {
byte_range: 0..10,
first_ordinal: 0,
},
BlockAddr {
byte_range: 10..20,
first_ordinal: 5,
},
]);
crate::value::tests::test_value_reader_writer::<_, IndexValueReader, IndexValueWriter>(&[
BlockAddr {
byte_range: 0..10,
first_ordinal: 0,
},
BlockAddr {
byte_range: 10..20,
first_ordinal: 5,
},
BlockAddr {
byte_range: 20..30,
first_ordinal: 10,
},
]);
}
}

View File

@@ -1,3 +1,4 @@
pub(crate) mod index;
mod range;
mod u64_monotonic;
mod void;