define and implement reading multi layer index sstable

This commit is contained in:
trinity-1686a
2023-11-09 15:42:00 +01:00
parent 7a0064db1f
commit 8103790c16
6 changed files with 439 additions and 192 deletions

View File

@@ -1,5 +1,5 @@
use std::convert::TryInto;
use std::ops::{Deref, Range};
use std::ops::Deref;
use std::sync::Arc;
use std::{fmt, io};
@@ -37,7 +37,7 @@ impl OwnedBytes {
/// creates a fileslice that is just a view over a slice of the data.
#[must_use]
#[inline]
pub fn slice(&self, range: Range<usize>) -> Self {
pub fn slice(&self, range: impl std::slice::SliceIndex<[u8], Output = [u8]>) -> Self {
OwnedBytes {
data: &self.data[range],
box_stable_deref: self.box_stable_deref.clone(),

View File

@@ -89,15 +89,31 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw
### SSTFooter
```
+-------+-------+-----+-------------+---------+---------+
| Block | Block | ... | IndexOffset | NumTerm | Version |
+-------+-------+-----+-------------+---------+---------+
|----( # of blocks)---|
+-------+-------+-----+------------------+------------+-------------+---------+---------+
| Block | Block | ... | FirstLayerOffset | LayerCount | IndexOffset | NumTerm | Version |
+-------+-------+-----+------------------+------------+-------------+---------+---------+
|----(# of blocks)----|---(optional? cf LayerCount)---|
```
- Block(SSTBlock): uses IndexValue for its Values format
- FirstLayerOffset(u64): Offset between the start of the footer and the start of the top level index
- LayerCount(u32): Number of layers of index (min 1) ## TODO do we want to use 0 as a marker for no layers? It makes small sstables 12 bytes more compact (the 0u32 would alias with the "end of sstable marker")
- IndexOffset(u64): Offset to the start of the SSTFooter
- NumTerm(u64): number of terms in the sstable
- Version(u32): Currently equal to 2
- Version(u32): Currently equal to 3
Blocks referencing the main table and block referencing the index itself are encoded the same way and
are not directly differentiated. Offsets in blocks referencing the index are relative to the start of
the footer, blocks referencing the main table are relative to the start of that table.
#### TODO(trinity) open questions:
the changes are small enough that it's easy to support both v2 and v3 at once (LayerCount alias with a
4 byte empty block we aways add at the end of an sstable. If LayerCount is zero, we are in v2, and must
not read FirstLayerOffset, if we are in v3, LayerCount is non zero and we read FirstLayerOffset.
If we keep that version number to 2, the format is then also forward compatible: an old version would decode
IndexOffset and after, and find enough information to decode the bottom layer sstable, and would stop at
the empty block added end of that sstable. The non-bottom layers would be loaded to memory, but never actually
processed.
Do we want to support that usage (put back version to v2), or prevent it and use v3?
### IndexValue
```

View File

@@ -128,62 +128,92 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
key_range: impl RangeBounds<[u8]>,
limit: Option<u64>,
) -> FileSlice {
let first_block_id = match key_range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => {
let Some(first_block_id) = self.sstable_index.locate_with_key(key) else {
return FileSlice::empty();
};
Some(first_block_id)
}
// TODO replace unwraps with proper error handling
let start_key = match key_range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => key,
Bound::Unbounded => &[],
};
let end_key = match key_range.end_bound() {
Bound::Included(key) | Bound::Excluded(key) => Some(key),
Bound::Unbounded => None,
};
let last_block_id = match key_range.end_bound() {
Bound::Included(key) | Bound::Excluded(key) => self.sstable_index.locate_with_key(key),
Bound::Unbounded => None,
};
let start_bound = if let Some(first_block_id) = first_block_id {
let Some(block_addr) = self.sstable_index.get_block(first_block_id) else {
let bounds = if let Some(limit) = limit {
let mut sstable_iterator = self.sstable_index.iterate_from_key(start_key).unwrap();
let Some(start_block) = sstable_iterator.value() else {
// range_start is after end of table
return FileSlice::empty();
};
Bound::Included(block_addr.byte_range.start)
} else {
Bound::Unbounded
};
if let Some(end_key) = end_key {
if sstable_iterator.key().unwrap() >= end_key {
// the start and end keys are in the same block, return just that block
return self.sstable_slice.slice(start_block.byte_range.clone());
}
}
let start_bound = start_block.byte_range.start;
let last_block_id = if let Some(limit) = limit {
let second_block_id = first_block_id.map(|id| id + 1).unwrap_or(0);
if let Some(block_addr) = self.sstable_index.get_block(second_block_id) {
let ordinal_limit = block_addr.first_ordinal + limit;
let last_block_limit = self.sstable_index.locate_with_ord(ordinal_limit);
if let Some(last_block_id) = last_block_id {
Some(last_block_id.min(last_block_limit))
sstable_iterator.advance().unwrap();
let Some(second_block) = sstable_iterator.value() else {
// we reached the end of the sstable, return everything from start_bound
return self.sstable_slice.slice(start_bound..);
};
let mut end_bound = second_block.byte_range.end;
if let Some(end_key) = end_key {
if sstable_iterator.key().unwrap() >= end_key {
return self.sstable_slice.slice(start_bound..end_bound);
}
}
let target_ord = second_block.first_ordinal + limit;
while sstable_iterator.advance().unwrap() {
let block = sstable_iterator.value().unwrap();
if block.first_ordinal >= target_ord {
break;
}
end_bound = block.byte_range.end;
if let Some(end_key) = end_key {
if sstable_iterator.key().unwrap() >= end_key {
break;
}
}
}
let start_bound = Bound::Included(start_bound);
let end_bound = Bound::Excluded(end_bound);
(start_bound, end_bound)
} else {
let Some(start_block) = self.sstable_index.get_block_with_key(start_key).unwrap()
else {
// range_start is after end of table
return FileSlice::empty();
};
let start_bound = Bound::Included(start_block.byte_range.start);
let end_bound = if let Some(end_key) = end_key {
if let Some(end_block) = self.sstable_index.get_block_with_key(end_key).unwrap() {
Bound::Excluded(end_block.byte_range.end)
} else {
Some(last_block_limit)
Bound::Unbounded
}
} else {
last_block_id
}
} else {
last_block_id
Bound::Unbounded
};
(start_bound, end_bound)
};
let end_bound = last_block_id
.and_then(|block_id| self.sstable_index.get_block(block_id))
.map(|block_addr| Bound::Excluded(block_addr.byte_range.end))
.unwrap_or(Bound::Unbounded);
self.sstable_slice.slice((start_bound, end_bound))
self.sstable_slice.slice(bounds)
}
/// Opens a `TermDictionary`.
pub fn open(term_dictionary_file: FileSlice) -> io::Result<Self> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(24);
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
let layer_count = u32::deserialize(&mut footer_len_bytes)?;
let index_offset = u64::deserialize(&mut footer_len_bytes)?;
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let version = u32::deserialize(&mut footer_len_bytes)?;
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
if version != crate::SSTABLE_VERSION {
return Err(io::Error::new(
io::ErrorKind::Other,
@@ -193,17 +223,37 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
),
));
}
if layer_count == 0 {
// previous format, kept for backward compatibility
let sstable_index_bytes = index_slice.read_bytes()?;
// on the old format, the 1st layer necessarily start immediately, and there is
// only a single layer
let sstable_index = SSTableIndex::load(sstable_index_bytes, 1, 0)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
Ok(Dictionary {
sstable_slice,
sstable_index,
num_terms,
phantom_data: PhantomData,
})
} else {
let sstable_index_bytes = index_slice.read_bytes()?;
let (sstable_index_bytes, mut v3_footer_bytes) = sstable_index_bytes.rsplit(8);
let first_layer_offset = v3_footer_bytes.read_u64();
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = SSTableIndex::load(sstable_index_bytes)
let sstable_index = SSTableIndex::load(
sstable_index_bytes,
layer_count,
first_layer_offset as usize,
)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
Ok(Dictionary {
sstable_slice,
sstable_index,
num_terms,
phantom_data: PhantomData,
})
Ok(Dictionary {
sstable_slice,
sstable_index,
num_terms,
phantom_data: PhantomData,
})
}
}
/// Creates a term dictionary from the supplied bytes.
@@ -227,68 +277,17 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
self.num_terms as usize
}
/// Decode a DeltaReader up to key, returning the number of terms traversed
///
/// If the key was not found, returns Ok(None).
/// After calling this function, it is possible to call `DeltaReader::value` to get the
/// associated value.
fn decode_up_to_key<K: AsRef<[u8]>>(
&self,
key: K,
sstable_delta_reader: &mut DeltaReader<TSSTable::ValueReader>,
) -> io::Result<Option<TermOrdinal>> {
let mut term_ord = 0;
let key_bytes = key.as_ref();
let mut ok_bytes = 0;
while sstable_delta_reader.advance()? {
let prefix_len = sstable_delta_reader.common_prefix_len();
let suffix = sstable_delta_reader.suffix();
match prefix_len.cmp(&ok_bytes) {
Ordering::Less => return Ok(None), // popped bytes already matched => too far
Ordering::Equal => (),
Ordering::Greater => {
// the ok prefix is less than current entry prefix => continue to next elem
term_ord += 1;
continue;
}
}
// we have ok_bytes byte of common prefix, check if this key adds more
for (key_byte, suffix_byte) in key_bytes[ok_bytes..].iter().zip(suffix) {
match suffix_byte.cmp(key_byte) {
Ordering::Less => break, // byte too small
Ordering::Equal => ok_bytes += 1, // new matching byte
Ordering::Greater => return Ok(None), // too far
}
}
if ok_bytes == key_bytes.len() {
if prefix_len + suffix.len() == ok_bytes {
return Ok(Some(term_ord));
} else {
// current key is a prefix of current element, not a match
return Ok(None);
}
}
term_ord += 1;
}
Ok(None)
}
/// Returns the ordinal associated with a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
let key_bytes = key.as_ref();
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else {
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes)? else {
return Ok(None);
};
let first_ordinal = block_addr.first_ordinal;
let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?;
self.decode_up_to_key(key_bytes, &mut sstable_delta_reader)
decode_up_to_key(key_bytes, &mut sstable_delta_reader)
.map(|opt| opt.map(|ord| ord + first_ordinal))
}
@@ -303,7 +302,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// the buffer may be modified.
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
// find block in which the term would be
let block_addr = self.sstable_index.get_block_with_ord(ord);
let block_addr = self.sstable_index.get_block_with_ord(ord)?;
let first_ordinal = block_addr.first_ordinal;
// then search inside that block only
@@ -321,7 +320,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Returns the number of terms in the dictionary.
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<Option<TSSTable::Value>> {
// find block in which the term would be
let block_addr = self.sstable_index.get_block_with_ord(term_ord);
let block_addr = self.sstable_index.get_block_with_ord(term_ord)?;
let first_ordinal = block_addr.first_ordinal;
// then search inside that block only
@@ -336,7 +335,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref())? {
let sstable_reader = self.sstable_delta_reader_block(block_addr)?;
return self.do_get(key, sstable_reader);
}
@@ -345,7 +344,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Lookups the value corresponding to the key.
pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref())? {
let sstable_reader = self.sstable_delta_reader_block_async(block_addr).await?;
return self.do_get(key, sstable_reader);
}
@@ -357,7 +356,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
key: K,
mut reader: DeltaReader<TSSTable::ValueReader>,
) -> io::Result<Option<TSSTable::Value>> {
if let Some(_ord) = self.decode_up_to_key(key, &mut reader)? {
if let Some(_ord) = decode_up_to_key(key, &mut reader)? {
Ok(Some(reader.value().clone()))
} else {
Ok(None)
@@ -394,6 +393,56 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
}
}
/// Decode a DeltaReader up to key, returning the number of terms traversed
///
/// If the key was not found, returns Ok(None).
/// After calling this function, it is possible to call `DeltaReader::value` to get the
/// associated value.
pub(crate) fn decode_up_to_key<K: AsRef<[u8]>, TValueReader: crate::ValueReader>(
key: K,
sstable_delta_reader: &mut DeltaReader<TValueReader>,
) -> io::Result<Option<TermOrdinal>> {
let mut term_ord = 0;
let key_bytes = key.as_ref();
let mut ok_bytes = 0;
while sstable_delta_reader.advance()? {
let prefix_len = sstable_delta_reader.common_prefix_len();
let suffix = sstable_delta_reader.suffix();
match prefix_len.cmp(&ok_bytes) {
Ordering::Less => return Ok(None), // popped bytes already matched => too far
Ordering::Equal => (),
Ordering::Greater => {
// the ok prefix is less than current entry prefix => continue to next elem
term_ord += 1;
continue;
}
}
// we have ok_bytes byte of common prefix, check if this key adds more
for (key_byte, suffix_byte) in key_bytes[ok_bytes..].iter().zip(suffix) {
match suffix_byte.cmp(key_byte) {
Ordering::Less => break, // byte too small
Ordering::Equal => ok_bytes += 1, // new matching byte
Ordering::Greater => return Ok(None), // too far
}
}
if ok_bytes == key_bytes.len() {
if prefix_len + suffix.len() == ok_bytes {
return Ok(Some(term_ord));
} else {
// current key is a prefix of current element, not a match
return Ok(None);
}
}
term_ord += 1;
}
Ok(None)
}
#[cfg(test)]
mod tests {
use std::ops::Range;
@@ -459,8 +508,6 @@ mod tests {
let dictionary = Dictionary::<MonotonicU64SSTable>::open(slice).unwrap();
// if the last block is id 0, tests are meaningless
assert_ne!(dictionary.sstable_index.locate_with_ord(u64::MAX), 0);
assert_eq!(dictionary.num_terms(), 0x3ffff);
(dictionary, table)
}
@@ -469,7 +516,7 @@ mod tests {
fn test_ord_term_conversion() {
let (dic, slice) = make_test_sstable();
let block = dic.sstable_index.get_block_with_ord(100_000);
let block = dic.sstable_index.get_block_with_ord(100_000).unwrap();
slice.restrict(block.byte_range);
let mut res = Vec::new();
@@ -495,7 +542,11 @@ mod tests {
// end of a block
let ordinal = block.first_ordinal - 1;
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
let new_range = dic
.sstable_index
.get_block_with_ord(ordinal)
.unwrap()
.byte_range;
slice.restrict(new_range);
assert!(dic.ord_to_term(ordinal, &mut res).unwrap());
assert_eq!(res, format!("{ordinal:05X}").into_bytes());
@@ -505,7 +556,7 @@ mod tests {
// before first block
// 1st block must be loaded for key-related operations
let block = dic.sstable_index.get_block_with_ord(0);
let block = dic.sstable_index.get_block_with_ord(0).unwrap();
slice.restrict(block.byte_range);
assert!(dic.get(b"$$$").unwrap().is_none());
@@ -514,7 +565,11 @@ mod tests {
// after last block
// last block must be loaded for ord related operations
let ordinal = 0x40000 + 10;
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
let new_range = dic
.sstable_index
.get_block_with_ord(ordinal)
.unwrap()
.byte_range;
slice.restrict(new_range);
assert!(!dic.ord_to_term(ordinal, &mut res).unwrap());
assert!(dic.term_info_from_ord(ordinal).unwrap().is_none());
@@ -539,11 +594,13 @@ mod tests {
.sstable_index
.get_block_with_key(b"10000")
.unwrap()
.unwrap()
.byte_range;
let end = dic
.sstable_index
.get_block_with_key(b"18000")
.unwrap()
.unwrap()
.byte_range;
slice.restrict(start.start..end.end);

View File

@@ -30,6 +30,12 @@ pub type TermOrdinal = u64;
const DEFAULT_KEY_CAPACITY: usize = 50;
const SSTABLE_VERSION: u32 = 2;
// TODO tune that value. Maybe it's too little?
#[cfg(not(test))]
const DEFAULT_MAX_ROOT_BLOCKS: u64 = 8;
#[cfg(test)]
const DEFAULT_MAX_ROOT_BLOCKS: u64 = 1;
/// Given two byte string returns the length of
/// the longest common prefix.
fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
@@ -55,7 +61,7 @@ pub trait SSTable: Sized {
}
fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
Writer::new(wrt)
Writer::new(wrt, DEFAULT_MAX_ROOT_BLOCKS)
}
fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> {
@@ -178,6 +184,7 @@ where W: io::Write
delta_writer: DeltaWriter<W, TValueWriter>,
num_terms: u64,
first_ordinal_of_the_block: u64,
index_max_root_blocks: u64,
}
impl<W, TValueWriter> Writer<W, TValueWriter>
@@ -190,17 +197,18 @@ where
/// TODO remove this function. (See Issue #1727)
#[doc(hidden)]
pub fn create(wrt: W) -> io::Result<Self> {
Ok(Self::new(wrt))
Ok(Self::new(wrt, DEFAULT_MAX_ROOT_BLOCKS))
}
/// Creates a new `TermDictionaryBuilder`.
pub fn new(wrt: W) -> Self {
pub fn new(wrt: W, index_max_root_blocks: u64) -> Self {
Writer {
previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
num_terms: 0u64,
index_builder: SSTableIndexBuilder::default(),
delta_writer: DeltaWriter::new(wrt),
first_ordinal_of_the_block: 0u64,
index_max_root_blocks,
}
}
@@ -304,7 +312,8 @@ where
let offset = wrt.written_bytes();
self.index_builder.serialize(&mut wrt)?;
self.index_builder
.serialize(&mut wrt, self.index_max_root_blocks)?;
wrt.write_all(&offset.to_le_bytes())?;
wrt.write_all(&self.num_terms.to_le_bytes())?;

View File

@@ -5,77 +5,191 @@ use common::OwnedBytes;
use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct SSTableIndex {
blocks: Vec<BlockMeta>,
root_blocks: Vec<BlockMeta>,
layer_count: u32,
index_bytes: OwnedBytes,
}
impl Default for SSTableIndex {
fn default() -> Self {
SSTableIndex {
root_blocks: Vec::new(),
layer_count: 1,
index_bytes: OwnedBytes::empty(),
}
}
}
impl SSTableIndex {
/// Load an index from its binary representation
pub fn load(data: OwnedBytes) -> Result<SSTableIndex, SSTableDataCorruption> {
let mut reader = IndexSSTable::reader(data);
let mut blocks = Vec::new();
pub fn load(
data: OwnedBytes,
layer_count: u32,
first_layer_offset: usize,
) -> Result<SSTableIndex, SSTableDataCorruption> {
let (index_bytes, first_layer_slice) = data.split(first_layer_offset);
let mut reader = IndexSSTable::reader(first_layer_slice);
let mut root_blocks = Vec::new();
while reader.advance().map_err(|_| SSTableDataCorruption)? {
blocks.push(BlockMeta {
root_blocks.push(BlockMeta {
last_key_or_greater: reader.key().to_vec(),
block_addr: reader.value().clone(),
});
}
Ok(SSTableIndex { blocks })
}
/// Get the [`BlockAddr`] of the requested block.
pub(crate) fn get_block(&self, block_id: usize) -> Option<BlockAddr> {
self.blocks
.get(block_id)
.map(|block_meta| block_meta.block_addr.clone())
}
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<usize> {
let pos = self
.blocks
.binary_search_by_key(&key, |block| &block.last_key_or_greater);
match pos {
Ok(pos) => Some(pos),
Err(pos) => {
if pos < self.blocks.len() {
Some(pos)
} else {
// after end of last block: no block matches
None
}
}
}
Ok(SSTableIndex {
root_blocks,
layer_count,
index_bytes,
})
}
/// Get the [`BlockAddr`] of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
self.locate_with_key(key).and_then(|id| self.get_block(id))
}
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize {
let pos = self
.blocks
.binary_search_by_key(&ord, |block| block.block_addr.first_ordinal);
match pos {
Ok(pos) => pos,
// Err(0) can't happen as the sstable starts with ordinal zero
Err(pos) => pos - 1,
}
pub fn get_block_with_key(&self, key: &[u8]) -> io::Result<Option<BlockAddr>> {
self.iterate_from_key(key).map(|iter| iter.value().cloned())
}
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
// locate_with_ord always returns an index within range
self.get_block(self.locate_with_ord(ord)).unwrap()
pub fn get_block_with_ord(&self, ord: TermOrdinal) -> io::Result<BlockAddr> {
let pos = self
.root_blocks
.binary_search_by_key(&ord, |block| block.block_addr.first_ordinal);
let root_pos = match pos {
Ok(pos) => pos,
// Err(0) can't happen as the sstable starts with ordinal zero
Err(pos) => pos - 1,
};
let mut next_layer_block_addr = self.root_blocks[root_pos].block_addr.clone();
let mut last_delta_reader = None;
for _ in 1..self.layer_count {
// we don't enter this loop for 1 layer index
let mut sstable_delta_reader = IndexSSTable::delta_reader(
self.index_bytes.slice(next_layer_block_addr.byte_range),
);
while sstable_delta_reader.advance()? {
if sstable_delta_reader.value().first_ordinal >= ord {
break;
}
}
next_layer_block_addr = sstable_delta_reader.value().clone();
last_delta_reader = Some(sstable_delta_reader);
}
if let Some(sstable_delta_reader) = last_delta_reader {
Ok(sstable_delta_reader.value().clone())
} else {
Ok(self.root_blocks[root_pos].block_addr.clone())
}
}
pub(crate) fn iterate_from_key(&self, key: &[u8]) -> io::Result<ReaderOrSlice<'_>> {
let root_pos = self
.root_blocks
.binary_search_by_key(&key, |block| &block.last_key_or_greater);
let root_pos = match root_pos {
Ok(pos) => pos,
Err(pos) => {
if pos < self.root_blocks.len() {
pos
} else {
// after end of last block: no block matches
return Ok(ReaderOrSlice::End);
}
}
};
let mut next_layer_block_addr = self.root_blocks[root_pos].block_addr.clone();
let mut last_delta_reader = None;
for _ in 1..self.layer_count {
// we don't enter this loop for 1 layer index
let mut sstable_delta_reader = IndexSSTable::delta_reader(
self.index_bytes.slice(next_layer_block_addr.byte_range),
);
if crate::dictionary::decode_up_to_key(key, &mut sstable_delta_reader)?.is_none() {
return Ok(ReaderOrSlice::End);
}
next_layer_block_addr = sstable_delta_reader.value().clone();
last_delta_reader = Some(sstable_delta_reader);
}
if let Some(delta_reader) = last_delta_reader {
// reconstruct the current key. We stopped either on the exact key, or just after
// either way, common_prefix_len is something that did not change between the
// last-key-before-target and the current pos, so those bytes must match the prefix of
// `key`. The next bytes can be obtained from the delta reader
let mut result_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
let common_prefix_len = delta_reader.common_prefix_len();
let suffix = delta_reader.suffix();
let new_len = delta_reader.common_prefix_len() + suffix.len();
result_key.resize(new_len, 0u8);
result_key[..common_prefix_len].copy_from_slice(&key[..common_prefix_len]);
result_key[common_prefix_len..].copy_from_slice(suffix);
let reader = crate::Reader {
key: result_key,
delta_reader,
};
Ok(ReaderOrSlice::Reader(reader))
} else {
// self.layer_count == 1, there is no lvl2 sstable to decode.
Ok(ReaderOrSlice::Iter(&self.root_blocks, root_pos))
}
}
}
pub(crate) enum ReaderOrSlice<'a> {
Reader(crate::Reader<crate::value::index::IndexValueReader>),
Iter(&'a [BlockMeta], usize),
End,
}
impl<'a> ReaderOrSlice<'a> {
pub fn advance(&mut self) -> Result<bool, SSTableDataCorruption> {
match self {
ReaderOrSlice::Reader(reader) => {
let res = reader.advance().map_err(|_| SSTableDataCorruption);
if !matches!(res, Ok(true)) {
*self = ReaderOrSlice::End;
}
res
}
ReaderOrSlice::Iter(slice, index) => {
*index += 1;
if *index < slice.len() {
Ok(true)
} else {
*self = ReaderOrSlice::End;
Ok(false)
}
}
ReaderOrSlice::End => Ok(false),
}
}
/// Get current key. Always Some(_) unless last call to advance returned something else than
/// Ok(true)
pub fn key(&self) -> Option<&[u8]> {
match self {
ReaderOrSlice::Reader(reader) => Some(reader.key()),
ReaderOrSlice::Iter(slice, index) => Some(&slice[*index].last_key_or_greater),
ReaderOrSlice::End => None,
}
}
/// Get current value. Always Some(_) unless last call to advance returned something else than
/// Ok(true)
pub fn value(&self) -> Option<&BlockAddr> {
match self {
ReaderOrSlice::Reader(reader) => Some(reader.value()),
ReaderOrSlice::Iter(slice, index) => Some(&slice[*index].block_addr),
ReaderOrSlice::End => None,
}
}
}
@@ -124,13 +238,13 @@ impl SSTableIndexBuilder {
/// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.index.blocks.last_mut() {
if let Some(last_block) = self.index.root_blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
}
}
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.index.blocks.push(BlockMeta {
self.index.root_blocks.push(BlockMeta {
last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr {
byte_range,
@@ -139,7 +253,11 @@ impl SSTableIndexBuilder {
})
}
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<()> {
pub fn serialize<W: std::io::Write>(
&self,
wrt: W,
_index_max_root_blocks: u64,
) -> io::Result<()> {
// we can't use a plain writer as it would generate an index
let mut sstable_writer = IndexSSTable::delta_writer(wrt);
@@ -148,7 +266,7 @@ impl SSTableIndexBuilder {
sstable_writer.set_block_len(16);
let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
for block in self.index.blocks.iter() {
for block in self.index.root_blocks.iter() {
let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater);
sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]);
@@ -194,28 +312,75 @@ mod tests {
sstable_builder.add_block(b"ccc", 30..40, 10u64);
sstable_builder.add_block(b"dddd", 40..50, 15u64);
let mut buffer: Vec<u8> = Vec::new();
sstable_builder.serialize(&mut buffer).unwrap();
sstable_builder.serialize(&mut buffer, 8).unwrap();
let buffer = OwnedBytes::new(buffer);
let sstable_index = SSTableIndex::load(buffer).unwrap();
let sstable_index = SSTableIndex::load(buffer, 1, 0).unwrap();
assert_eq!(
sstable_index.get_block_with_key(b"bbbde"),
sstable_index.get_block_with_key(b"bbbde").unwrap(),
Some(BlockAddr {
first_ordinal: 10u64,
byte_range: 30..40
})
);
assert_eq!(sstable_index.locate_with_key(b"aa").unwrap(), 0);
assert_eq!(sstable_index.locate_with_key(b"aaa").unwrap(), 0);
assert_eq!(sstable_index.locate_with_key(b"aab").unwrap(), 1);
assert_eq!(sstable_index.locate_with_key(b"ccc").unwrap(), 2);
assert!(sstable_index.locate_with_key(b"e").is_none());
assert_eq!(
sstable_index
.get_block_with_key(b"aa")
.unwrap()
.unwrap()
.first_ordinal,
0
);
assert_eq!(
sstable_index
.get_block_with_key(b"aaa")
.unwrap()
.unwrap()
.first_ordinal,
0
);
assert_eq!(
sstable_index
.get_block_with_key(b"aab")
.unwrap()
.unwrap()
.first_ordinal,
5
);
assert_eq!(
sstable_index
.get_block_with_key(b"ccc")
.unwrap()
.unwrap()
.first_ordinal,
10
);
assert!(sstable_index.get_block_with_key(b"e").unwrap().is_none());
assert_eq!(sstable_index.locate_with_ord(0), 0);
assert_eq!(sstable_index.locate_with_ord(1), 0);
assert_eq!(sstable_index.locate_with_ord(4), 0);
assert_eq!(sstable_index.locate_with_ord(5), 1);
assert_eq!(sstable_index.locate_with_ord(100), 3);
assert_eq!(
sstable_index.get_block_with_ord(0).unwrap().first_ordinal,
0
);
assert_eq!(
sstable_index.get_block_with_ord(1).unwrap().first_ordinal,
0
);
assert_eq!(
sstable_index.get_block_with_ord(4).unwrap().first_ordinal,
0
);
assert_eq!(
sstable_index.get_block_with_ord(5).unwrap().first_ordinal,
5
);
assert_eq!(
sstable_index.get_block_with_ord(6).unwrap().first_ordinal,
5
);
assert_eq!(
sstable_index.get_block_with_ord(100).unwrap().first_ordinal,
15
);
}
#[test]
@@ -226,10 +391,10 @@ mod tests {
sstable_builder.add_block(b"ccc", 30..40, 10u64);
sstable_builder.add_block(b"dddd", 40..50, 15u64);
let mut buffer: Vec<u8> = Vec::new();
sstable_builder.serialize(&mut buffer).unwrap();
sstable_builder.serialize(&mut buffer, 340).unwrap();
buffer[2] = 9u8;
let buffer = OwnedBytes::new(buffer);
let data_corruption_err = SSTableIndex::load(buffer).err().unwrap();
let data_corruption_err = SSTableIndex::load(buffer, 1, 0).err().unwrap();
assert!(matches!(data_corruption_err, SSTableDataCorruption));
}

View File

@@ -110,7 +110,7 @@ where
Bound::Included(key) | Bound::Excluded(key) => self
.term_dict
.sstable_index
.get_block_with_key(key)
.get_block_with_key(key)?
.map(|block| block.first_ordinal)
.unwrap_or(0),
Bound::Unbounded => 0,