Compare commits

...

4 Commits

Author SHA1 Message Date
trinity-1686a
2b686ffa54 fix columnar tests 2023-11-10 11:46:58 +01:00
trinity-1686a
04b3a27a0a increment sstable version number 2023-11-10 11:38:58 +01:00
trinity-1686a
710cf1efa6 implement multilayer sstable writer 2023-11-10 11:09:50 +01:00
trinity-1686a
8103790c16 define and implement reading multi layer index sstable 2023-11-09 15:42:00 +01:00
10 changed files with 561 additions and 233 deletions

View File

@@ -26,7 +26,7 @@ fn test_dataframe_writer_str() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 87);
assert_eq!(cols[0].num_bytes(), 99);
}
#[test]
@@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() {
assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 87);
assert_eq!(cols[0].num_bytes(), 99);
}
#[test]

View File

@@ -1,5 +1,5 @@
use std::convert::TryInto;
use std::ops::{Deref, Range};
use std::ops::Deref;
use std::sync::Arc;
use std::{fmt, io};
@@ -37,7 +37,7 @@ impl OwnedBytes {
/// creates a fileslice that is just a view over a slice of the data.
#[must_use]
#[inline]
pub fn slice(&self, range: Range<usize>) -> Self {
pub fn slice(&self, range: impl std::slice::SliceIndex<[u8], Output = [u8]>) -> Self {
OwnedBytes {
data: &self.data[range],
box_stable_deref: self.box_stable_deref.clone(),

View File

@@ -131,7 +131,7 @@ mod tests {
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 93);
assert_eq!(file.len(), 105);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let column = fast_field_readers
.u64("field")
@@ -181,7 +181,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 121);
assert_eq!(file.len(), 133);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
.u64("field")
@@ -214,7 +214,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 94);
assert_eq!(file.len(), 106);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let fast_field_reader = fast_field_readers
.u64("field")
@@ -246,7 +246,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 4489);
assert_eq!(file.len(), 4501);
{
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers
@@ -279,7 +279,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 265);
assert_eq!(file.len(), 277);
{
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
@@ -773,7 +773,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 102);
assert_eq!(file.len(), 114);
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = fast_field_readers.bool("field_bool").unwrap();
assert_eq!(bool_col.first(0), Some(true));
@@ -805,7 +805,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 114);
assert_eq!(file.len(), 126);
let readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = readers.bool("field_bool").unwrap();
for i in 0..25 {
@@ -830,7 +830,7 @@ mod tests {
write.terminate().unwrap();
}
let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 104);
assert_eq!(file.len(), 116);
let fastfield_readers = FastFieldReaders::open(file, schema).unwrap();
let col = fastfield_readers.bool("field_bool").unwrap();
assert_eq!(col.first(0), None);

View File

@@ -89,15 +89,21 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw
### SSTFooter
```
+-------+-------+-----+-------------+---------+---------+
| Block | Block | ... | IndexOffset | NumTerm | Version |
+-------+-------+-----+-------------+---------+---------+
|----( # of blocks)---|
+-------+-------+-----+------------------+------------+-------------+---------+---------+
| Block | Block | ... | FirstLayerOffset | LayerCount | IndexOffset | NumTerm | Version |
+-------+-------+-----+------------------+------------+-------------+---------+---------+
|----(# of blocks)----|
```
- Block(SSTBlock): uses IndexValue for its Values format
- FirstLayerOffset(u64): Offset between the start of the footer and the start of the top level index
- LayerCount(u32): Number of layers of index (min 1) ## TODO do we want to use 0 as a marker for no layers? It makes small sstables 12 bytes more compact (the 0u32 would alias with the "end of sstable marker")
- IndexOffset(u64): Offset to the start of the SSTFooter
- NumTerm(u64): number of terms in the sstable
- Version(u32): Currently equal to 2
- Version(u32): Currently equal to 3
Blocks referencing the main table and block referencing the index itself are encoded the same way and
are not directly differentiated. Offsets in blocks referencing the index are relative to the start of
the footer, blocks referencing the main table are relative to the start of that table.
### IndexValue
```

View File

@@ -20,6 +20,7 @@ where W: io::Write
// Only here to avoid allocations.
stateless_buffer: Vec<u8>,
block_len: usize,
compress: bool,
}
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
@@ -34,6 +35,18 @@ where
value_writer: TValueWriter::default(),
stateless_buffer: Vec::new(),
block_len: BLOCK_LEN,
compress: true,
}
}
pub fn new_no_compression(wrt: W) -> Self {
DeltaWriter {
block: Vec::with_capacity(BLOCK_LEN * 2),
write: CountingWriter::wrap(BufWriter::new(wrt)),
value_writer: TValueWriter::default(),
stateless_buffer: Vec::new(),
block_len: BLOCK_LEN,
compress: false,
}
}
@@ -53,7 +66,7 @@ where
let block_len = buffer.len() + self.block.len();
if block_len > 2048 {
if block_len > 2048 && self.compress {
buffer.extend_from_slice(&self.block);
self.block.clear();

View File

@@ -128,52 +128,86 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
key_range: impl RangeBounds<[u8]>,
limit: Option<u64>,
) -> FileSlice {
let first_block_id = match key_range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => {
let Some(first_block_id) = self.sstable_index.locate_with_key(key) else {
return FileSlice::empty();
};
Some(first_block_id)
}
// we don't perform great when limit is set to a large value, and sometime we use u64::MAX
// as a marker for no limit, so we'd better capture that.
// (not great means we decode up to the whole bottom layer index, which can take dozens of
// ms on a 100m term dictionary)
let limit = limit.filter(|limit| *limit != u64::MAX);
// TODO replace unwraps with proper error handling
let start_key = match key_range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => key,
Bound::Unbounded => &[],
};
let end_key = match key_range.end_bound() {
Bound::Included(key) | Bound::Excluded(key) => Some(key),
Bound::Unbounded => None,
};
let last_block_id = match key_range.end_bound() {
Bound::Included(key) | Bound::Excluded(key) => self.sstable_index.locate_with_key(key),
Bound::Unbounded => None,
};
let start_bound = if let Some(first_block_id) = first_block_id {
let Some(block_addr) = self.sstable_index.get_block(first_block_id) else {
let bounds = if let Some(limit) = limit {
let mut sstable_iterator = self.sstable_index.iterate_from_key(start_key).unwrap();
let Some(start_block) = sstable_iterator.value() else {
// range_start is after end of table
return FileSlice::empty();
};
Bound::Included(block_addr.byte_range.start)
} else {
Bound::Unbounded
};
if let Some(end_key) = end_key {
if sstable_iterator.key().unwrap() >= end_key {
// the start and end keys are in the same block, return just that block
return self.sstable_slice.slice(start_block.byte_range.clone());
}
}
let start_bound = start_block.byte_range.start;
let last_block_id = if let Some(limit) = limit {
let second_block_id = first_block_id.map(|id| id + 1).unwrap_or(0);
if let Some(block_addr) = self.sstable_index.get_block(second_block_id) {
let ordinal_limit = block_addr.first_ordinal + limit;
let last_block_limit = self.sstable_index.locate_with_ord(ordinal_limit);
if let Some(last_block_id) = last_block_id {
Some(last_block_id.min(last_block_limit))
sstable_iterator.advance().unwrap();
let Some(second_block) = sstable_iterator.value() else {
// we reached the end of the sstable, return everything from start_bound
return self.sstable_slice.slice(start_bound..);
};
let mut end_bound = second_block.byte_range.end;
if let Some(end_key) = end_key {
if sstable_iterator.key().unwrap() >= end_key {
return self.sstable_slice.slice(start_bound..end_bound);
}
}
let target_ord = second_block.first_ordinal + limit;
while sstable_iterator.advance().unwrap() {
let block = sstable_iterator.value().unwrap();
if block.first_ordinal >= target_ord {
break;
}
end_bound = block.byte_range.end;
if let Some(end_key) = end_key {
if sstable_iterator.key().unwrap() >= end_key {
break;
}
}
}
let start_bound = Bound::Included(start_bound);
let end_bound = Bound::Excluded(end_bound);
(start_bound, end_bound)
} else {
let Some(start_block) = self.sstable_index.get_block_with_key(start_key).unwrap()
else {
// range_start is after end of table
return FileSlice::empty();
};
let start_bound = Bound::Included(start_block.byte_range.start);
let end_bound = if let Some(end_key) = end_key {
if let Some(end_block) = self.sstable_index.get_block_with_key(end_key).unwrap() {
Bound::Excluded(end_block.byte_range.end)
} else {
Some(last_block_limit)
Bound::Unbounded
}
} else {
last_block_id
}
} else {
last_block_id
Bound::Unbounded
};
(start_bound, end_bound)
};
let end_bound = last_block_id
.and_then(|block_id| self.sstable_index.get_block(block_id))
.map(|block_addr| Bound::Excluded(block_addr.byte_range.end))
.unwrap_or(Bound::Unbounded);
self.sstable_slice.slice((start_bound, end_bound))
self.sstable_slice.slice(bounds)
}
/// Opens a `TermDictionary`.
@@ -181,29 +215,57 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
// let layer_count = u32::deserialize(&mut footer_len_bytes)?;
let index_offset = u64::deserialize(&mut footer_len_bytes)?;
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let version = u32::deserialize(&mut footer_len_bytes)?;
if version != crate::SSTABLE_VERSION {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported sstable version, expected {version}, found {}",
crate::SSTABLE_VERSION,
),
));
}
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = SSTableIndex::load(sstable_index_bytes)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
Ok(Dictionary {
sstable_slice,
sstable_index,
num_terms,
phantom_data: PhantomData,
})
match version {
2 => {
// previous format, kept for backward compatibility
let sstable_index_bytes = index_slice.read_bytes()?;
// on the old format, the 1st layer necessarily start immediately, and there is
// only a single layer
let sstable_index =
SSTableIndex::load(sstable_index_bytes, 1, 0).map_err(|_| {
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?;
Ok(Dictionary {
sstable_slice,
sstable_index,
num_terms,
phantom_data: PhantomData,
})
}
3 => {
let sstable_index_bytes = index_slice.read_bytes()?;
let (sstable_index_bytes, mut v3_footer_bytes) = sstable_index_bytes.rsplit(12);
let first_layer_offset = v3_footer_bytes.read_u64();
let layer_count = v3_footer_bytes.read_u32();
let sstable_index = SSTableIndex::load(
sstable_index_bytes,
layer_count,
first_layer_offset as usize,
)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
Ok(Dictionary {
sstable_slice,
sstable_index,
num_terms,
phantom_data: PhantomData,
})
}
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported sstable version, expected {}, found {version}",
crate::SSTABLE_VERSION,
),
))
}
}
}
/// Creates a term dictionary from the supplied bytes.
@@ -227,68 +289,17 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
self.num_terms as usize
}
/// Decode a DeltaReader up to key, returning the number of terms traversed
///
/// If the key was not found, returns Ok(None).
/// After calling this function, it is possible to call `DeltaReader::value` to get the
/// associated value.
fn decode_up_to_key<K: AsRef<[u8]>>(
&self,
key: K,
sstable_delta_reader: &mut DeltaReader<TSSTable::ValueReader>,
) -> io::Result<Option<TermOrdinal>> {
let mut term_ord = 0;
let key_bytes = key.as_ref();
let mut ok_bytes = 0;
while sstable_delta_reader.advance()? {
let prefix_len = sstable_delta_reader.common_prefix_len();
let suffix = sstable_delta_reader.suffix();
match prefix_len.cmp(&ok_bytes) {
Ordering::Less => return Ok(None), // popped bytes already matched => too far
Ordering::Equal => (),
Ordering::Greater => {
// the ok prefix is less than current entry prefix => continue to next elem
term_ord += 1;
continue;
}
}
// we have ok_bytes byte of common prefix, check if this key adds more
for (key_byte, suffix_byte) in key_bytes[ok_bytes..].iter().zip(suffix) {
match suffix_byte.cmp(key_byte) {
Ordering::Less => break, // byte too small
Ordering::Equal => ok_bytes += 1, // new matching byte
Ordering::Greater => return Ok(None), // too far
}
}
if ok_bytes == key_bytes.len() {
if prefix_len + suffix.len() == ok_bytes {
return Ok(Some(term_ord));
} else {
// current key is a prefix of current element, not a match
return Ok(None);
}
}
term_ord += 1;
}
Ok(None)
}
/// Returns the ordinal associated with a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
let key_bytes = key.as_ref();
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else {
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes)? else {
return Ok(None);
};
let first_ordinal = block_addr.first_ordinal;
let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?;
self.decode_up_to_key(key_bytes, &mut sstable_delta_reader)
decode_up_to_key(key_bytes, &mut sstable_delta_reader)
.map(|opt| opt.map(|ord| ord + first_ordinal))
}
@@ -303,7 +314,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// the buffer may be modified.
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
// find block in which the term would be
let block_addr = self.sstable_index.get_block_with_ord(ord);
let block_addr = self.sstable_index.get_block_with_ord(ord)?;
let first_ordinal = block_addr.first_ordinal;
// then search inside that block only
@@ -321,7 +332,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Returns the number of terms in the dictionary.
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<Option<TSSTable::Value>> {
// find block in which the term would be
let block_addr = self.sstable_index.get_block_with_ord(term_ord);
let block_addr = self.sstable_index.get_block_with_ord(term_ord)?;
let first_ordinal = block_addr.first_ordinal;
// then search inside that block only
@@ -336,7 +347,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref())? {
let sstable_reader = self.sstable_delta_reader_block(block_addr)?;
return self.do_get(key, sstable_reader);
}
@@ -345,7 +356,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Lookups the value corresponding to the key.
pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref())? {
let sstable_reader = self.sstable_delta_reader_block_async(block_addr).await?;
return self.do_get(key, sstable_reader);
}
@@ -357,7 +368,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
key: K,
mut reader: DeltaReader<TSSTable::ValueReader>,
) -> io::Result<Option<TSSTable::Value>> {
if let Some(_ord) = self.decode_up_to_key(key, &mut reader)? {
if let Some(_ord) = decode_up_to_key(key, &mut reader)? {
Ok(Some(reader.value().clone()))
} else {
Ok(None)
@@ -394,6 +405,56 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
}
}
/// Decode a DeltaReader up to key, returning the number of terms traversed
///
/// If the key was not found, returns Ok(None).
/// After calling this function, it is possible to call `DeltaReader::value` to get the
/// associated value.
pub(crate) fn decode_up_to_key<K: AsRef<[u8]>, TValueReader: crate::ValueReader>(
key: K,
sstable_delta_reader: &mut DeltaReader<TValueReader>,
) -> io::Result<Option<TermOrdinal>> {
let mut term_ord = 0;
let key_bytes = key.as_ref();
let mut ok_bytes = 0;
while sstable_delta_reader.advance()? {
let prefix_len = sstable_delta_reader.common_prefix_len();
let suffix = sstable_delta_reader.suffix();
match prefix_len.cmp(&ok_bytes) {
Ordering::Less => return Ok(None), // popped bytes already matched => too far
Ordering::Equal => (),
Ordering::Greater => {
// the ok prefix is less than current entry prefix => continue to next elem
term_ord += 1;
continue;
}
}
// we have ok_bytes byte of common prefix, check if this key adds more
for (key_byte, suffix_byte) in key_bytes[ok_bytes..].iter().zip(suffix) {
match suffix_byte.cmp(key_byte) {
Ordering::Less => break, // byte too small
Ordering::Equal => ok_bytes += 1, // new matching byte
Ordering::Greater => return Ok(None), // too far
}
}
if ok_bytes == key_bytes.len() {
if prefix_len + suffix.len() == ok_bytes {
return Ok(Some(term_ord));
} else {
// current key is a prefix of current element, not a match
return Ok(None);
}
}
term_ord += 1;
}
Ok(None)
}
#[cfg(test)]
mod tests {
use std::ops::Range;
@@ -459,8 +520,6 @@ mod tests {
let dictionary = Dictionary::<MonotonicU64SSTable>::open(slice).unwrap();
// if the last block is id 0, tests are meaningless
assert_ne!(dictionary.sstable_index.locate_with_ord(u64::MAX), 0);
assert_eq!(dictionary.num_terms(), 0x3ffff);
(dictionary, table)
}
@@ -469,7 +528,7 @@ mod tests {
fn test_ord_term_conversion() {
let (dic, slice) = make_test_sstable();
let block = dic.sstable_index.get_block_with_ord(100_000);
let block = dic.sstable_index.get_block_with_ord(100_000).unwrap();
slice.restrict(block.byte_range);
let mut res = Vec::new();
@@ -495,7 +554,11 @@ mod tests {
// end of a block
let ordinal = block.first_ordinal - 1;
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
let new_range = dic
.sstable_index
.get_block_with_ord(ordinal)
.unwrap()
.byte_range;
slice.restrict(new_range);
assert!(dic.ord_to_term(ordinal, &mut res).unwrap());
assert_eq!(res, format!("{ordinal:05X}").into_bytes());
@@ -505,7 +568,7 @@ mod tests {
// before first block
// 1st block must be loaded for key-related operations
let block = dic.sstable_index.get_block_with_ord(0);
let block = dic.sstable_index.get_block_with_ord(0).unwrap();
slice.restrict(block.byte_range);
assert!(dic.get(b"$$$").unwrap().is_none());
@@ -514,7 +577,11 @@ mod tests {
// after last block
// last block must be loaded for ord related operations
let ordinal = 0x40000 + 10;
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
let new_range = dic
.sstable_index
.get_block_with_ord(ordinal)
.unwrap()
.byte_range;
slice.restrict(new_range);
assert!(!dic.ord_to_term(ordinal, &mut res).unwrap());
assert!(dic.term_info_from_ord(ordinal).unwrap().is_none());
@@ -539,11 +606,13 @@ mod tests {
.sstable_index
.get_block_with_key(b"10000")
.unwrap()
.unwrap()
.byte_range;
let end = dic
.sstable_index
.get_block_with_key(b"18000")
.unwrap()
.unwrap()
.byte_range;
slice.restrict(start.start..end.end);

View File

@@ -1,6 +1,6 @@
use std::io::{self, Write};
use std::num::NonZeroU64;
use std::ops::Range;
use std::usize;
use merge::ValueMerger;
@@ -28,7 +28,13 @@ use crate::value::{RangeValueReader, RangeValueWriter};
pub type TermOrdinal = u64;
const DEFAULT_KEY_CAPACITY: usize = 50;
const SSTABLE_VERSION: u32 = 2;
const SSTABLE_VERSION: u32 = 3;
// TODO tune that value. Maybe it's too little?
#[cfg(not(test))]
const DEFAULT_MAX_ROOT_BLOCKS: NonZeroU64 = unsafe { NonZeroU64::new_unchecked(32) };
#[cfg(test)]
const DEFAULT_MAX_ROOT_BLOCKS: NonZeroU64 = unsafe { NonZeroU64::new_unchecked(1) };
/// Given two byte string returns the length of
/// the longest common prefix.
@@ -55,7 +61,7 @@ pub trait SSTable: Sized {
}
fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
Writer::new(wrt)
Writer::new(wrt, DEFAULT_MAX_ROOT_BLOCKS)
}
fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> {
@@ -178,6 +184,7 @@ where W: io::Write
delta_writer: DeltaWriter<W, TValueWriter>,
num_terms: u64,
first_ordinal_of_the_block: u64,
index_max_root_blocks: NonZeroU64,
}
impl<W, TValueWriter> Writer<W, TValueWriter>
@@ -190,17 +197,18 @@ where
/// TODO remove this function. (See Issue #1727)
#[doc(hidden)]
pub fn create(wrt: W) -> io::Result<Self> {
Ok(Self::new(wrt))
Ok(Self::new(wrt, DEFAULT_MAX_ROOT_BLOCKS))
}
/// Creates a new `TermDictionaryBuilder`.
pub fn new(wrt: W) -> Self {
pub fn new(wrt: W, index_max_root_blocks: NonZeroU64) -> Self {
Writer {
previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
num_terms: 0u64,
index_builder: SSTableIndexBuilder::default(),
delta_writer: DeltaWriter::new(wrt),
first_ordinal_of_the_block: 0u64,
index_max_root_blocks,
}
}
@@ -302,10 +310,15 @@ where
// add a final empty block as an end marker
wrt.write_all(&0u32.to_le_bytes())?;
let offset = wrt.written_bytes();
let index_offset = wrt.written_bytes();
self.index_builder.serialize(&mut wrt)?;
wrt.write_all(&offset.to_le_bytes())?;
let (layer_count, layer_offset): (u32, u64) = self
.index_builder
.serialize(&mut wrt, self.index_max_root_blocks)?;
wrt.write_all(&layer_offset.to_le_bytes())?;
wrt.write_all(&layer_count.to_le_bytes())?;
wrt.write_all(&index_offset.to_le_bytes())?;
wrt.write_all(&self.num_terms.to_le_bytes())?;
SSTABLE_VERSION.serialize(&mut wrt)?;
@@ -389,9 +402,11 @@ mod test {
0, // compression
1, 0, 12, 0, 32, 17, 20, // index block
0, 0, 0, 0, // no more index block
0, 0, 0, 0, 0, 0, 0, 0, // first layer offset
1, 0, 0, 0, // layer count
16, 0, 0, 0, 0, 0, 0, 0, // index start offset
3, 0, 0, 0, 0, 0, 0, 0, // num term
2, 0, 0, 0, // version
3, 0, 0, 0, // version
]
);
let buffer = OwnedBytes::new(buffer);

View File

@@ -5,77 +5,188 @@ use common::OwnedBytes;
use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
root_blocks: Vec<BlockMeta>,
layer_count: u32,
index_bytes: OwnedBytes,
}
impl Default for SSTableIndex {
fn default() -> Self {
SSTableIndex {
root_blocks: Vec::new(),
layer_count: 1,
index_bytes: OwnedBytes::empty(),
}
}
}
impl SSTableIndex {
/// Load an index from its binary representation
pub fn load(data: OwnedBytes) -> Result<SSTableIndex, SSTableDataCorruption> {
let mut reader = IndexSSTable::reader(data);
let mut blocks = Vec::new();
pub fn load(
data: OwnedBytes,
layer_count: u32,
first_layer_offset: usize,
) -> Result<SSTableIndex, SSTableDataCorruption> {
let (index_bytes, first_layer_slice) = data.split(first_layer_offset);
let mut reader = IndexSSTable::reader(first_layer_slice);
let mut root_blocks = Vec::new();
while reader.advance().map_err(|_| SSTableDataCorruption)? {
blocks.push(BlockMeta {
root_blocks.push(BlockMeta {
last_key_or_greater: reader.key().to_vec(),
block_addr: reader.value().clone(),
});
}
Ok(SSTableIndex { blocks })
}
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, block_id: usize) -> Option<BlockAddr> {
self.blocks
.get(block_id)
.map(|block_meta| block_meta.block_addr.clone())
}
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<usize> {
let pos = self
.blocks
.binary_search_by_key(&key, |block| &block.last_key_or_greater);
match pos {
Ok(pos) => Some(pos),
Err(pos) => {
if pos < self.blocks.len() {
Some(pos)
} else {
// after end of last block: no block matches
None
}
}
}
Ok(SSTableIndex {
root_blocks,
layer_count,
index_bytes,
// index_bytes: OwnedBytes::empty(),
})
}
/// Get the [`BlockAddr`] of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
self.locate_with_key(key).and_then(|id| self.get_block(id))
}
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize {
let pos = self
.blocks
.binary_search_by_key(&ord, |block| block.block_addr.first_ordinal);
match pos {
Ok(pos) => pos,
// Err(0) can't happen as the sstable starts with ordinal zero
Err(pos) => pos - 1,
}
pub fn get_block_with_key(&self, key: &[u8]) -> io::Result<Option<BlockAddr>> {
self.iterate_from_key(key).map(|iter| iter.value().cloned())
}
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
// locate_with_ord always returns an index within range
self.get_block(self.locate_with_ord(ord)).unwrap()
pub fn get_block_with_ord(&self, ord: TermOrdinal) -> io::Result<BlockAddr> {
let pos = self
.root_blocks
.binary_search_by_key(&ord, |block| block.block_addr.first_ordinal);
let root_pos = match pos {
Ok(pos) => pos,
// Err(0) can't happen as the sstable starts with ordinal zero
Err(pos) => pos - 1,
};
if self.layer_count == 1 {
return Ok(self.root_blocks[root_pos].block_addr.clone());
}
let mut next_layer_block_addr = self.root_blocks[root_pos].block_addr.clone();
for _ in 1..self.layer_count {
let mut sstable_delta_reader = IndexSSTable::delta_reader(
self.index_bytes
.slice(next_layer_block_addr.byte_range.clone()),
);
while sstable_delta_reader.advance()? {
if sstable_delta_reader.value().first_ordinal > ord {
break;
}
next_layer_block_addr = sstable_delta_reader.value().clone();
}
}
Ok(next_layer_block_addr)
}
pub(crate) fn iterate_from_key(&self, key: &[u8]) -> io::Result<ReaderOrSlice<'_>> {
let root_pos = self
.root_blocks
.binary_search_by_key(&key, |block| &block.last_key_or_greater);
let root_pos = match root_pos {
Ok(pos) => pos,
Err(pos) => {
if pos < self.root_blocks.len() {
pos
} else {
// after end of last block: no block matches
return Ok(ReaderOrSlice::End);
}
}
};
let mut next_layer_block_addr = self.root_blocks[root_pos].block_addr.clone();
let mut last_delta_reader = None;
for _ in 1..self.layer_count {
// we don't enter this loop for 1 layer index
let mut sstable_delta_reader = IndexSSTable::delta_reader(
self.index_bytes.slice(next_layer_block_addr.byte_range),
);
crate::dictionary::decode_up_to_key(key, &mut sstable_delta_reader)?;
next_layer_block_addr = sstable_delta_reader.value().clone();
last_delta_reader = Some(sstable_delta_reader);
}
if let Some(delta_reader) = last_delta_reader {
// reconstruct the current key. We stopped either on the exact key, or just after
// either way, common_prefix_len is something that did not change between the
// last-key-before-target and the current pos, so those bytes must match the prefix of
// `key`. The next bytes can be obtained from the delta reader
let mut result_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
let common_prefix_len = delta_reader.common_prefix_len();
let suffix = delta_reader.suffix();
let new_len = delta_reader.common_prefix_len() + suffix.len();
result_key.resize(new_len, 0u8);
result_key[..common_prefix_len].copy_from_slice(&key[..common_prefix_len]);
result_key[common_prefix_len..].copy_from_slice(suffix);
let reader = crate::Reader {
key: result_key,
delta_reader,
};
Ok(ReaderOrSlice::Reader(reader))
} else {
// self.layer_count == 1, there is no lvl2 sstable to decode.
Ok(ReaderOrSlice::Iter(&self.root_blocks, root_pos))
}
}
}
pub(crate) enum ReaderOrSlice<'a> {
Reader(crate::Reader<crate::value::index::IndexValueReader>),
Iter(&'a [BlockMeta], usize),
End,
}
impl<'a> ReaderOrSlice<'a> {
pub fn advance(&mut self) -> Result<bool, SSTableDataCorruption> {
match self {
ReaderOrSlice::Reader(reader) => {
let res = reader.advance().map_err(|_| SSTableDataCorruption);
if !matches!(res, Ok(true)) {
*self = ReaderOrSlice::End;
}
res
}
ReaderOrSlice::Iter(slice, index) => {
*index += 1;
if *index < slice.len() {
Ok(true)
} else {
*self = ReaderOrSlice::End;
Ok(false)
}
}
ReaderOrSlice::End => Ok(false),
}
}
/// Get current key. Always Some(_) unless last call to advance returned something else than
/// Ok(true)
pub fn key(&self) -> Option<&[u8]> {
match self {
ReaderOrSlice::Reader(reader) => Some(reader.key()),
ReaderOrSlice::Iter(slice, index) => Some(&slice[*index].last_key_or_greater),
ReaderOrSlice::End => None,
}
}
/// Get current value. Always Some(_) unless last call to advance returned something else than
/// Ok(true)
pub fn value(&self) -> Option<&BlockAddr> {
match self {
ReaderOrSlice::Reader(reader) => Some(reader.value()),
ReaderOrSlice::Iter(slice, index) => Some(&slice[*index].block_addr),
ReaderOrSlice::End => None,
}
}
}
@@ -124,13 +235,13 @@ impl SSTableIndexBuilder {
/// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.index.blocks.last_mut() {
if let Some(last_block) = self.index.root_blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
}
}
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.index.blocks.push(BlockMeta {
self.index.root_blocks.push(BlockMeta {
last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr {
byte_range,
@@ -139,31 +250,90 @@ impl SSTableIndexBuilder {
})
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<()> {
// we can't use a plain writer as it would generate an index
let mut sstable_writer = IndexSSTable::delta_writer(wrt);
pub fn serialize<W: std::io::Write>(
&self,
wrt: W,
index_max_root_blocks: std::num::NonZeroU64,
) -> io::Result<(u32, u64)> {
let index_max_root_blocks = index_max_root_blocks.get();
// in tests, set a smaller block size to stress-test
#[cfg(test)]
sstable_writer.set_block_len(16);
let mut wrt = common::CountingWriter::wrap(wrt);
let mut next_layer = write_sstable_layer(&mut wrt, &self.index.root_blocks, 0)?;
let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
for block in self.index.blocks.iter() {
let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater);
let mut layer_count = 1;
let mut offset = 0;
while next_layer.len() as u64 > index_max_root_blocks {
offset = wrt.written_bytes();
layer_count += 1;
sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]);
sstable_writer.write_value(&block.block_addr);
sstable_writer.flush_block_if_required()?;
previous_key.clear();
previous_key.extend_from_slice(&block.last_key_or_greater);
next_layer = write_sstable_layer(&mut wrt, &next_layer, offset as usize)?;
}
sstable_writer.flush_block()?;
sstable_writer.finish().write_all(&0u32.to_le_bytes())?;
Ok(())
Ok((layer_count, offset))
}
}
fn write_sstable_layer<W: std::io::Write>(
wrt: W,
layer_content: &[BlockMeta],
offset: usize,
) -> io::Result<Vec<BlockMeta>> {
// we can't use a plain writer as it would generate an index
// also disable compression, the index is small anyway, and it's the most costly part of
// opening that kind of sstable
let mut sstable_writer =
crate::DeltaWriter::<_, crate::value::index::IndexValueWriter>::new_no_compression(wrt);
// in tests, set a smaller block size to stress-test
#[cfg(test)]
sstable_writer.set_block_len(16);
let mut next_layer = Vec::new();
let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
let mut first_ordinal = None;
for block in layer_content.iter() {
if first_ordinal.is_none() {
first_ordinal = Some(block.block_addr.first_ordinal);
}
let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater);
sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]);
sstable_writer.write_value(&block.block_addr);
if let Some(range) = sstable_writer.flush_block_if_required()? {
let real_range = (range.start + offset)..(range.end + offset);
let block_meta = BlockMeta {
last_key_or_greater: block.last_key_or_greater.clone(),
block_addr: BlockAddr {
byte_range: real_range,
first_ordinal: first_ordinal.take().unwrap(),
},
};
next_layer.push(block_meta);
previous_key.clear();
} else {
previous_key.extend_from_slice(&block.last_key_or_greater);
previous_key.resize(block.last_key_or_greater.len(), 0u8);
previous_key[keep_len..].copy_from_slice(&block.last_key_or_greater[keep_len..]);
}
}
if let Some(range) = sstable_writer.flush_block()? {
if let Some(last_block) = layer_content.last() {
// not going here means an empty table (?!)
let real_range = (range.start + offset)..(range.end + offset);
let block_meta = BlockMeta {
last_key_or_greater: last_block.last_key_or_greater.clone(),
block_addr: BlockAddr {
byte_range: real_range,
first_ordinal: first_ordinal.take().unwrap(),
},
};
next_layer.push(block_meta);
}
}
sstable_writer.finish().write_all(&0u32.to_le_bytes())?;
Ok(next_layer)
}
/// SSTable representing an index
///
/// `last_key_or_greater` is used as the key, the value contains the
@@ -194,28 +364,77 @@ mod tests {
sstable_builder.add_block(b"ccc", 30..40, 10u64);
sstable_builder.add_block(b"dddd", 40..50, 15u64);
let mut buffer: Vec<u8> = Vec::new();
sstable_builder.serialize(&mut buffer).unwrap();
sstable_builder
.serialize(&mut buffer, crate::DEFAULT_MAX_ROOT_BLOCKS)
.unwrap();
let buffer = OwnedBytes::new(buffer);
let sstable_index = SSTableIndex::load(buffer).unwrap();
let sstable_index = SSTableIndex::load(buffer, 1, 0).unwrap();
assert_eq!(
sstable_index.get_block_with_key(b"bbbde"),
sstable_index.get_block_with_key(b"bbbde").unwrap(),
Some(BlockAddr {
first_ordinal: 10u64,
byte_range: 30..40
})
);
assert_eq!(sstable_index.locate_with_key(b"aa").unwrap(), 0);
assert_eq!(sstable_index.locate_with_key(b"aaa").unwrap(), 0);
assert_eq!(sstable_index.locate_with_key(b"aab").unwrap(), 1);
assert_eq!(sstable_index.locate_with_key(b"ccc").unwrap(), 2);
assert!(sstable_index.locate_with_key(b"e").is_none());
assert_eq!(
sstable_index
.get_block_with_key(b"aa")
.unwrap()
.unwrap()
.first_ordinal,
0
);
assert_eq!(
sstable_index
.get_block_with_key(b"aaa")
.unwrap()
.unwrap()
.first_ordinal,
0
);
assert_eq!(
sstable_index
.get_block_with_key(b"aab")
.unwrap()
.unwrap()
.first_ordinal,
5
);
assert_eq!(
sstable_index
.get_block_with_key(b"ccc")
.unwrap()
.unwrap()
.first_ordinal,
10
);
assert!(sstable_index.get_block_with_key(b"e").unwrap().is_none());
assert_eq!(sstable_index.locate_with_ord(0), 0);
assert_eq!(sstable_index.locate_with_ord(1), 0);
assert_eq!(sstable_index.locate_with_ord(4), 0);
assert_eq!(sstable_index.locate_with_ord(5), 1);
assert_eq!(sstable_index.locate_with_ord(100), 3);
assert_eq!(
sstable_index.get_block_with_ord(0).unwrap().first_ordinal,
0
);
assert_eq!(
sstable_index.get_block_with_ord(1).unwrap().first_ordinal,
0
);
assert_eq!(
sstable_index.get_block_with_ord(4).unwrap().first_ordinal,
0
);
assert_eq!(
sstable_index.get_block_with_ord(5).unwrap().first_ordinal,
5
);
assert_eq!(
sstable_index.get_block_with_ord(6).unwrap().first_ordinal,
5
);
assert_eq!(
sstable_index.get_block_with_ord(100).unwrap().first_ordinal,
15
);
}
#[test]
@@ -226,10 +445,12 @@ mod tests {
sstable_builder.add_block(b"ccc", 30..40, 10u64);
sstable_builder.add_block(b"dddd", 40..50, 15u64);
let mut buffer: Vec<u8> = Vec::new();
sstable_builder.serialize(&mut buffer).unwrap();
sstable_builder
.serialize(&mut buffer, crate::DEFAULT_MAX_ROOT_BLOCKS)
.unwrap();
buffer[2] = 9u8;
let buffer = OwnedBytes::new(buffer);
let data_corruption_err = SSTableIndex::load(buffer).err().unwrap();
let data_corruption_err = SSTableIndex::load(buffer, 1, 0).err().unwrap();
assert!(matches!(data_corruption_err, SSTableDataCorruption));
}

View File

@@ -110,7 +110,7 @@ where
Bound::Included(key) | Bound::Excluded(key) => self
.term_dict
.sstable_index
.get_block_with_key(key)
.get_block_with_key(key)?
.map(|block| block.first_ordinal)
.unwrap_or(0),
Bound::Unbounded => 0,

View File

@@ -3,6 +3,10 @@ use std::io;
use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter};
use crate::{vint, BlockAddr};
// TODO define a LazyIndexValueReader?
// one which keeps state could be useful for ord_to_block fns,
// one which doesn't at all woud be perfect for term_to_block fns
// pending bench to asses real impact
#[derive(Default)]
pub(crate) struct IndexValueReader {
vals: Vec<BlockAddr>,