Compare commits

...

4 Commits

Author SHA1 Message Date
trinity-1686a
2b686ffa54 fix columnar tests 2023-11-10 11:46:58 +01:00
trinity-1686a
04b3a27a0a increment sstable version number 2023-11-10 11:38:58 +01:00
trinity-1686a
710cf1efa6 implement multilayer sstable writer 2023-11-10 11:09:50 +01:00
trinity-1686a
8103790c16 define and implement reading multi layer index sstable 2023-11-09 15:42:00 +01:00
10 changed files with 561 additions and 233 deletions

View File

@@ -26,7 +26,7 @@ fn test_dataframe_writer_str() {
assert_eq!(columnar.num_columns(), 1); assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap(); let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1); assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 87); assert_eq!(cols[0].num_bytes(), 99);
} }
#[test] #[test]
@@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() {
assert_eq!(columnar.num_columns(), 1); assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap(); let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1); assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 87); assert_eq!(cols[0].num_bytes(), 99);
} }
#[test] #[test]

View File

@@ -1,5 +1,5 @@
use std::convert::TryInto; use std::convert::TryInto;
use std::ops::{Deref, Range}; use std::ops::Deref;
use std::sync::Arc; use std::sync::Arc;
use std::{fmt, io}; use std::{fmt, io};
@@ -37,7 +37,7 @@ impl OwnedBytes {
/// creates a fileslice that is just a view over a slice of the data. /// creates a fileslice that is just a view over a slice of the data.
#[must_use] #[must_use]
#[inline] #[inline]
pub fn slice(&self, range: Range<usize>) -> Self { pub fn slice(&self, range: impl std::slice::SliceIndex<[u8], Output = [u8]>) -> Self {
OwnedBytes { OwnedBytes {
data: &self.data[range], data: &self.data[range],
box_stable_deref: self.box_stable_deref.clone(), box_stable_deref: self.box_stable_deref.clone(),

View File

@@ -131,7 +131,7 @@ mod tests {
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 93); assert_eq!(file.len(), 105);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let column = fast_field_readers let column = fast_field_readers
.u64("field") .u64("field")
@@ -181,7 +181,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 121); assert_eq!(file.len(), 133);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers let col = fast_field_readers
.u64("field") .u64("field")
@@ -214,7 +214,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 94); assert_eq!(file.len(), 106);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let fast_field_reader = fast_field_readers let fast_field_reader = fast_field_readers
.u64("field") .u64("field")
@@ -246,7 +246,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 4489); assert_eq!(file.len(), 4501);
{ {
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers let col = fast_field_readers
@@ -279,7 +279,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 265); assert_eq!(file.len(), 277);
{ {
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
@@ -773,7 +773,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 102); assert_eq!(file.len(), 114);
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = fast_field_readers.bool("field_bool").unwrap(); let bool_col = fast_field_readers.bool("field_bool").unwrap();
assert_eq!(bool_col.first(0), Some(true)); assert_eq!(bool_col.first(0), Some(true));
@@ -805,7 +805,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 114); assert_eq!(file.len(), 126);
let readers = FastFieldReaders::open(file, schema).unwrap(); let readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = readers.bool("field_bool").unwrap(); let bool_col = readers.bool("field_bool").unwrap();
for i in 0..25 { for i in 0..25 {
@@ -830,7 +830,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 104); assert_eq!(file.len(), 116);
let fastfield_readers = FastFieldReaders::open(file, schema).unwrap(); let fastfield_readers = FastFieldReaders::open(file, schema).unwrap();
let col = fastfield_readers.bool("field_bool").unwrap(); let col = fastfield_readers.bool("field_bool").unwrap();
assert_eq!(col.first(0), None); assert_eq!(col.first(0), None);

View File

@@ -89,15 +89,21 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw
### SSTFooter ### SSTFooter
``` ```
+-------+-------+-----+-------------+---------+---------+ +-------+-------+-----+------------------+------------+-------------+---------+---------+
| Block | Block | ... | IndexOffset | NumTerm | Version | | Block | Block | ... | FirstLayerOffset | LayerCount | IndexOffset | NumTerm | Version |
+-------+-------+-----+-------------+---------+---------+ +-------+-------+-----+------------------+------------+-------------+---------+---------+
|----( # of blocks)---| |----(# of blocks)----|
``` ```
- Block(SSTBlock): uses IndexValue for its Values format - Block(SSTBlock): uses IndexValue for its Values format
- FirstLayerOffset(u64): Offset between the start of the footer and the start of the top level index
- LayerCount(u32): Number of layers of index (min 1) ## TODO do we want to use 0 as a marker for no layers? It makes small sstables 12 bytes more compact (the 0u32 would alias with the "end of sstable marker")
- IndexOffset(u64): Offset to the start of the SSTFooter - IndexOffset(u64): Offset to the start of the SSTFooter
- NumTerm(u64): number of terms in the sstable - NumTerm(u64): number of terms in the sstable
- Version(u32): Currently equal to 2 - Version(u32): Currently equal to 3
Blocks referencing the main table and block referencing the index itself are encoded the same way and
are not directly differentiated. Offsets in blocks referencing the index are relative to the start of
the footer, blocks referencing the main table are relative to the start of that table.
### IndexValue ### IndexValue
``` ```

View File

@@ -20,6 +20,7 @@ where W: io::Write
// Only here to avoid allocations. // Only here to avoid allocations.
stateless_buffer: Vec<u8>, stateless_buffer: Vec<u8>,
block_len: usize, block_len: usize,
compress: bool,
} }
impl<W, TValueWriter> DeltaWriter<W, TValueWriter> impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
@@ -34,6 +35,18 @@ where
value_writer: TValueWriter::default(), value_writer: TValueWriter::default(),
stateless_buffer: Vec::new(), stateless_buffer: Vec::new(),
block_len: BLOCK_LEN, block_len: BLOCK_LEN,
compress: true,
}
}
pub fn new_no_compression(wrt: W) -> Self {
DeltaWriter {
block: Vec::with_capacity(BLOCK_LEN * 2),
write: CountingWriter::wrap(BufWriter::new(wrt)),
value_writer: TValueWriter::default(),
stateless_buffer: Vec::new(),
block_len: BLOCK_LEN,
compress: false,
} }
} }
@@ -53,7 +66,7 @@ where
let block_len = buffer.len() + self.block.len(); let block_len = buffer.len() + self.block.len();
if block_len > 2048 { if block_len > 2048 && self.compress {
buffer.extend_from_slice(&self.block); buffer.extend_from_slice(&self.block);
self.block.clear(); self.block.clear();

View File

@@ -128,52 +128,86 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
key_range: impl RangeBounds<[u8]>, key_range: impl RangeBounds<[u8]>,
limit: Option<u64>, limit: Option<u64>,
) -> FileSlice { ) -> FileSlice {
let first_block_id = match key_range.start_bound() { // we don't perform great when limit is set to a large value, and sometime we use u64::MAX
Bound::Included(key) | Bound::Excluded(key) => { // as a marker for no limit, so we'd better capture that.
let Some(first_block_id) = self.sstable_index.locate_with_key(key) else { // (not great means we decode up to the whole bottom layer index, which can take dozens of
return FileSlice::empty(); // ms on a 100m term dictionary)
}; let limit = limit.filter(|limit| *limit != u64::MAX);
Some(first_block_id)
} // TODO replace unwraps with proper error handling
let start_key = match key_range.start_bound() {
Bound::Included(key) | Bound::Excluded(key) => key,
Bound::Unbounded => &[],
};
let end_key = match key_range.end_bound() {
Bound::Included(key) | Bound::Excluded(key) => Some(key),
Bound::Unbounded => None, Bound::Unbounded => None,
}; };
let bounds = if let Some(limit) = limit {
let last_block_id = match key_range.end_bound() { let mut sstable_iterator = self.sstable_index.iterate_from_key(start_key).unwrap();
Bound::Included(key) | Bound::Excluded(key) => self.sstable_index.locate_with_key(key), let Some(start_block) = sstable_iterator.value() else {
Bound::Unbounded => None, // range_start is after end of table
};
let start_bound = if let Some(first_block_id) = first_block_id {
let Some(block_addr) = self.sstable_index.get_block(first_block_id) else {
return FileSlice::empty(); return FileSlice::empty();
}; };
Bound::Included(block_addr.byte_range.start) if let Some(end_key) = end_key {
} else { if sstable_iterator.key().unwrap() >= end_key {
Bound::Unbounded // the start and end keys are in the same block, return just that block
}; return self.sstable_slice.slice(start_block.byte_range.clone());
}
}
let start_bound = start_block.byte_range.start;
let last_block_id = if let Some(limit) = limit { sstable_iterator.advance().unwrap();
let second_block_id = first_block_id.map(|id| id + 1).unwrap_or(0); let Some(second_block) = sstable_iterator.value() else {
if let Some(block_addr) = self.sstable_index.get_block(second_block_id) { // we reached the end of the sstable, return everything from start_bound
let ordinal_limit = block_addr.first_ordinal + limit; return self.sstable_slice.slice(start_bound..);
let last_block_limit = self.sstable_index.locate_with_ord(ordinal_limit); };
if let Some(last_block_id) = last_block_id { let mut end_bound = second_block.byte_range.end;
Some(last_block_id.min(last_block_limit)) if let Some(end_key) = end_key {
if sstable_iterator.key().unwrap() >= end_key {
return self.sstable_slice.slice(start_bound..end_bound);
}
}
let target_ord = second_block.first_ordinal + limit;
while sstable_iterator.advance().unwrap() {
let block = sstable_iterator.value().unwrap();
if block.first_ordinal >= target_ord {
break;
}
end_bound = block.byte_range.end;
if let Some(end_key) = end_key {
if sstable_iterator.key().unwrap() >= end_key {
break;
}
}
}
let start_bound = Bound::Included(start_bound);
let end_bound = Bound::Excluded(end_bound);
(start_bound, end_bound)
} else {
let Some(start_block) = self.sstable_index.get_block_with_key(start_key).unwrap()
else {
// range_start is after end of table
return FileSlice::empty();
};
let start_bound = Bound::Included(start_block.byte_range.start);
let end_bound = if let Some(end_key) = end_key {
if let Some(end_block) = self.sstable_index.get_block_with_key(end_key).unwrap() {
Bound::Excluded(end_block.byte_range.end)
} else { } else {
Some(last_block_limit) Bound::Unbounded
} }
} else { } else {
last_block_id Bound::Unbounded
} };
} else { (start_bound, end_bound)
last_block_id
}; };
let end_bound = last_block_id
.and_then(|block_id| self.sstable_index.get_block(block_id))
.map(|block_addr| Bound::Excluded(block_addr.byte_range.end))
.unwrap_or(Bound::Unbounded);
self.sstable_slice.slice((start_bound, end_bound)) self.sstable_slice.slice(bounds)
} }
/// Opens a `TermDictionary`. /// Opens a `TermDictionary`.
@@ -181,29 +215,57 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20); let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?; let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
// let layer_count = u32::deserialize(&mut footer_len_bytes)?;
let index_offset = u64::deserialize(&mut footer_len_bytes)?; let index_offset = u64::deserialize(&mut footer_len_bytes)?;
let num_terms = u64::deserialize(&mut footer_len_bytes)?; let num_terms = u64::deserialize(&mut footer_len_bytes)?;
let version = u32::deserialize(&mut footer_len_bytes)?; let version = u32::deserialize(&mut footer_len_bytes)?;
if version != crate::SSTABLE_VERSION {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported sstable version, expected {version}, found {}",
crate::SSTABLE_VERSION,
),
));
}
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?; match version {
let sstable_index = SSTableIndex::load(sstable_index_bytes) 2 => {
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?; // previous format, kept for backward compatibility
Ok(Dictionary { let sstable_index_bytes = index_slice.read_bytes()?;
sstable_slice, // on the old format, the 1st layer necessarily start immediately, and there is
sstable_index, // only a single layer
num_terms, let sstable_index =
phantom_data: PhantomData, SSTableIndex::load(sstable_index_bytes, 1, 0).map_err(|_| {
}) io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
})?;
Ok(Dictionary {
sstable_slice,
sstable_index,
num_terms,
phantom_data: PhantomData,
})
}
3 => {
let sstable_index_bytes = index_slice.read_bytes()?;
let (sstable_index_bytes, mut v3_footer_bytes) = sstable_index_bytes.rsplit(12);
let first_layer_offset = v3_footer_bytes.read_u64();
let layer_count = v3_footer_bytes.read_u32();
let sstable_index = SSTableIndex::load(
sstable_index_bytes,
layer_count,
first_layer_offset as usize,
)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
Ok(Dictionary {
sstable_slice,
sstable_index,
num_terms,
phantom_data: PhantomData,
})
}
_ => {
return Err(io::Error::new(
io::ErrorKind::Other,
format!(
"Unsuported sstable version, expected {}, found {version}",
crate::SSTABLE_VERSION,
),
))
}
}
} }
/// Creates a term dictionary from the supplied bytes. /// Creates a term dictionary from the supplied bytes.
@@ -227,68 +289,17 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
self.num_terms as usize self.num_terms as usize
} }
/// Decode a DeltaReader up to key, returning the number of terms traversed
///
/// If the key was not found, returns Ok(None).
/// After calling this function, it is possible to call `DeltaReader::value` to get the
/// associated value.
fn decode_up_to_key<K: AsRef<[u8]>>(
&self,
key: K,
sstable_delta_reader: &mut DeltaReader<TSSTable::ValueReader>,
) -> io::Result<Option<TermOrdinal>> {
let mut term_ord = 0;
let key_bytes = key.as_ref();
let mut ok_bytes = 0;
while sstable_delta_reader.advance()? {
let prefix_len = sstable_delta_reader.common_prefix_len();
let suffix = sstable_delta_reader.suffix();
match prefix_len.cmp(&ok_bytes) {
Ordering::Less => return Ok(None), // popped bytes already matched => too far
Ordering::Equal => (),
Ordering::Greater => {
// the ok prefix is less than current entry prefix => continue to next elem
term_ord += 1;
continue;
}
}
// we have ok_bytes byte of common prefix, check if this key adds more
for (key_byte, suffix_byte) in key_bytes[ok_bytes..].iter().zip(suffix) {
match suffix_byte.cmp(key_byte) {
Ordering::Less => break, // byte too small
Ordering::Equal => ok_bytes += 1, // new matching byte
Ordering::Greater => return Ok(None), // too far
}
}
if ok_bytes == key_bytes.len() {
if prefix_len + suffix.len() == ok_bytes {
return Ok(Some(term_ord));
} else {
// current key is a prefix of current element, not a match
return Ok(None);
}
}
term_ord += 1;
}
Ok(None)
}
/// Returns the ordinal associated with a given term. /// Returns the ordinal associated with a given term.
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> { pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
let key_bytes = key.as_ref(); let key_bytes = key.as_ref();
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else { let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes)? else {
return Ok(None); return Ok(None);
}; };
let first_ordinal = block_addr.first_ordinal; let first_ordinal = block_addr.first_ordinal;
let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?; let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?;
self.decode_up_to_key(key_bytes, &mut sstable_delta_reader) decode_up_to_key(key_bytes, &mut sstable_delta_reader)
.map(|opt| opt.map(|ord| ord + first_ordinal)) .map(|opt| opt.map(|ord| ord + first_ordinal))
} }
@@ -303,7 +314,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// the buffer may be modified. /// the buffer may be modified.
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> { pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
// find block in which the term would be // find block in which the term would be
let block_addr = self.sstable_index.get_block_with_ord(ord); let block_addr = self.sstable_index.get_block_with_ord(ord)?;
let first_ordinal = block_addr.first_ordinal; let first_ordinal = block_addr.first_ordinal;
// then search inside that block only // then search inside that block only
@@ -321,7 +332,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Returns the number of terms in the dictionary. /// Returns the number of terms in the dictionary.
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<Option<TSSTable::Value>> { pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<Option<TSSTable::Value>> {
// find block in which the term would be // find block in which the term would be
let block_addr = self.sstable_index.get_block_with_ord(term_ord); let block_addr = self.sstable_index.get_block_with_ord(term_ord)?;
let first_ordinal = block_addr.first_ordinal; let first_ordinal = block_addr.first_ordinal;
// then search inside that block only // then search inside that block only
@@ -336,7 +347,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Lookups the value corresponding to the key. /// Lookups the value corresponding to the key.
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> { pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) { if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref())? {
let sstable_reader = self.sstable_delta_reader_block(block_addr)?; let sstable_reader = self.sstable_delta_reader_block(block_addr)?;
return self.do_get(key, sstable_reader); return self.do_get(key, sstable_reader);
} }
@@ -345,7 +356,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Lookups the value corresponding to the key. /// Lookups the value corresponding to the key.
pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> { pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) { if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref())? {
let sstable_reader = self.sstable_delta_reader_block_async(block_addr).await?; let sstable_reader = self.sstable_delta_reader_block_async(block_addr).await?;
return self.do_get(key, sstable_reader); return self.do_get(key, sstable_reader);
} }
@@ -357,7 +368,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
key: K, key: K,
mut reader: DeltaReader<TSSTable::ValueReader>, mut reader: DeltaReader<TSSTable::ValueReader>,
) -> io::Result<Option<TSSTable::Value>> { ) -> io::Result<Option<TSSTable::Value>> {
if let Some(_ord) = self.decode_up_to_key(key, &mut reader)? { if let Some(_ord) = decode_up_to_key(key, &mut reader)? {
Ok(Some(reader.value().clone())) Ok(Some(reader.value().clone()))
} else { } else {
Ok(None) Ok(None)
@@ -394,6 +405,56 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
} }
} }
/// Decode a DeltaReader up to key, returning the number of terms traversed
///
/// If the key was not found, returns Ok(None).
/// After calling this function, it is possible to call `DeltaReader::value` to get the
/// associated value.
pub(crate) fn decode_up_to_key<K: AsRef<[u8]>, TValueReader: crate::ValueReader>(
key: K,
sstable_delta_reader: &mut DeltaReader<TValueReader>,
) -> io::Result<Option<TermOrdinal>> {
let mut term_ord = 0;
let key_bytes = key.as_ref();
let mut ok_bytes = 0;
while sstable_delta_reader.advance()? {
let prefix_len = sstable_delta_reader.common_prefix_len();
let suffix = sstable_delta_reader.suffix();
match prefix_len.cmp(&ok_bytes) {
Ordering::Less => return Ok(None), // popped bytes already matched => too far
Ordering::Equal => (),
Ordering::Greater => {
// the ok prefix is less than current entry prefix => continue to next elem
term_ord += 1;
continue;
}
}
// we have ok_bytes byte of common prefix, check if this key adds more
for (key_byte, suffix_byte) in key_bytes[ok_bytes..].iter().zip(suffix) {
match suffix_byte.cmp(key_byte) {
Ordering::Less => break, // byte too small
Ordering::Equal => ok_bytes += 1, // new matching byte
Ordering::Greater => return Ok(None), // too far
}
}
if ok_bytes == key_bytes.len() {
if prefix_len + suffix.len() == ok_bytes {
return Ok(Some(term_ord));
} else {
// current key is a prefix of current element, not a match
return Ok(None);
}
}
term_ord += 1;
}
Ok(None)
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use std::ops::Range; use std::ops::Range;
@@ -459,8 +520,6 @@ mod tests {
let dictionary = Dictionary::<MonotonicU64SSTable>::open(slice).unwrap(); let dictionary = Dictionary::<MonotonicU64SSTable>::open(slice).unwrap();
// if the last block is id 0, tests are meaningless
assert_ne!(dictionary.sstable_index.locate_with_ord(u64::MAX), 0);
assert_eq!(dictionary.num_terms(), 0x3ffff); assert_eq!(dictionary.num_terms(), 0x3ffff);
(dictionary, table) (dictionary, table)
} }
@@ -469,7 +528,7 @@ mod tests {
fn test_ord_term_conversion() { fn test_ord_term_conversion() {
let (dic, slice) = make_test_sstable(); let (dic, slice) = make_test_sstable();
let block = dic.sstable_index.get_block_with_ord(100_000); let block = dic.sstable_index.get_block_with_ord(100_000).unwrap();
slice.restrict(block.byte_range); slice.restrict(block.byte_range);
let mut res = Vec::new(); let mut res = Vec::new();
@@ -495,7 +554,11 @@ mod tests {
// end of a block // end of a block
let ordinal = block.first_ordinal - 1; let ordinal = block.first_ordinal - 1;
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range; let new_range = dic
.sstable_index
.get_block_with_ord(ordinal)
.unwrap()
.byte_range;
slice.restrict(new_range); slice.restrict(new_range);
assert!(dic.ord_to_term(ordinal, &mut res).unwrap()); assert!(dic.ord_to_term(ordinal, &mut res).unwrap());
assert_eq!(res, format!("{ordinal:05X}").into_bytes()); assert_eq!(res, format!("{ordinal:05X}").into_bytes());
@@ -505,7 +568,7 @@ mod tests {
// before first block // before first block
// 1st block must be loaded for key-related operations // 1st block must be loaded for key-related operations
let block = dic.sstable_index.get_block_with_ord(0); let block = dic.sstable_index.get_block_with_ord(0).unwrap();
slice.restrict(block.byte_range); slice.restrict(block.byte_range);
assert!(dic.get(b"$$$").unwrap().is_none()); assert!(dic.get(b"$$$").unwrap().is_none());
@@ -514,7 +577,11 @@ mod tests {
// after last block // after last block
// last block must be loaded for ord related operations // last block must be loaded for ord related operations
let ordinal = 0x40000 + 10; let ordinal = 0x40000 + 10;
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range; let new_range = dic
.sstable_index
.get_block_with_ord(ordinal)
.unwrap()
.byte_range;
slice.restrict(new_range); slice.restrict(new_range);
assert!(!dic.ord_to_term(ordinal, &mut res).unwrap()); assert!(!dic.ord_to_term(ordinal, &mut res).unwrap());
assert!(dic.term_info_from_ord(ordinal).unwrap().is_none()); assert!(dic.term_info_from_ord(ordinal).unwrap().is_none());
@@ -539,11 +606,13 @@ mod tests {
.sstable_index .sstable_index
.get_block_with_key(b"10000") .get_block_with_key(b"10000")
.unwrap() .unwrap()
.unwrap()
.byte_range; .byte_range;
let end = dic let end = dic
.sstable_index .sstable_index
.get_block_with_key(b"18000") .get_block_with_key(b"18000")
.unwrap() .unwrap()
.unwrap()
.byte_range; .byte_range;
slice.restrict(start.start..end.end); slice.restrict(start.start..end.end);

View File

@@ -1,6 +1,6 @@
use std::io::{self, Write}; use std::io::{self, Write};
use std::num::NonZeroU64;
use std::ops::Range; use std::ops::Range;
use std::usize;
use merge::ValueMerger; use merge::ValueMerger;
@@ -28,7 +28,13 @@ use crate::value::{RangeValueReader, RangeValueWriter};
pub type TermOrdinal = u64; pub type TermOrdinal = u64;
const DEFAULT_KEY_CAPACITY: usize = 50; const DEFAULT_KEY_CAPACITY: usize = 50;
const SSTABLE_VERSION: u32 = 2; const SSTABLE_VERSION: u32 = 3;
// TODO tune that value. Maybe it's too little?
#[cfg(not(test))]
const DEFAULT_MAX_ROOT_BLOCKS: NonZeroU64 = unsafe { NonZeroU64::new_unchecked(32) };
#[cfg(test)]
const DEFAULT_MAX_ROOT_BLOCKS: NonZeroU64 = unsafe { NonZeroU64::new_unchecked(1) };
/// Given two byte string returns the length of /// Given two byte string returns the length of
/// the longest common prefix. /// the longest common prefix.
@@ -55,7 +61,7 @@ pub trait SSTable: Sized {
} }
fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> { fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
Writer::new(wrt) Writer::new(wrt, DEFAULT_MAX_ROOT_BLOCKS)
} }
fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> { fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> {
@@ -178,6 +184,7 @@ where W: io::Write
delta_writer: DeltaWriter<W, TValueWriter>, delta_writer: DeltaWriter<W, TValueWriter>,
num_terms: u64, num_terms: u64,
first_ordinal_of_the_block: u64, first_ordinal_of_the_block: u64,
index_max_root_blocks: NonZeroU64,
} }
impl<W, TValueWriter> Writer<W, TValueWriter> impl<W, TValueWriter> Writer<W, TValueWriter>
@@ -190,17 +197,18 @@ where
/// TODO remove this function. (See Issue #1727) /// TODO remove this function. (See Issue #1727)
#[doc(hidden)] #[doc(hidden)]
pub fn create(wrt: W) -> io::Result<Self> { pub fn create(wrt: W) -> io::Result<Self> {
Ok(Self::new(wrt)) Ok(Self::new(wrt, DEFAULT_MAX_ROOT_BLOCKS))
} }
/// Creates a new `TermDictionaryBuilder`. /// Creates a new `TermDictionaryBuilder`.
pub fn new(wrt: W) -> Self { pub fn new(wrt: W, index_max_root_blocks: NonZeroU64) -> Self {
Writer { Writer {
previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
num_terms: 0u64, num_terms: 0u64,
index_builder: SSTableIndexBuilder::default(), index_builder: SSTableIndexBuilder::default(),
delta_writer: DeltaWriter::new(wrt), delta_writer: DeltaWriter::new(wrt),
first_ordinal_of_the_block: 0u64, first_ordinal_of_the_block: 0u64,
index_max_root_blocks,
} }
} }
@@ -302,10 +310,15 @@ where
// add a final empty block as an end marker // add a final empty block as an end marker
wrt.write_all(&0u32.to_le_bytes())?; wrt.write_all(&0u32.to_le_bytes())?;
let offset = wrt.written_bytes(); let index_offset = wrt.written_bytes();
self.index_builder.serialize(&mut wrt)?; let (layer_count, layer_offset): (u32, u64) = self
wrt.write_all(&offset.to_le_bytes())?; .index_builder
.serialize(&mut wrt, self.index_max_root_blocks)?;
wrt.write_all(&layer_offset.to_le_bytes())?;
wrt.write_all(&layer_count.to_le_bytes())?;
wrt.write_all(&index_offset.to_le_bytes())?;
wrt.write_all(&self.num_terms.to_le_bytes())?; wrt.write_all(&self.num_terms.to_le_bytes())?;
SSTABLE_VERSION.serialize(&mut wrt)?; SSTABLE_VERSION.serialize(&mut wrt)?;
@@ -389,9 +402,11 @@ mod test {
0, // compression 0, // compression
1, 0, 12, 0, 32, 17, 20, // index block 1, 0, 12, 0, 32, 17, 20, // index block
0, 0, 0, 0, // no more index block 0, 0, 0, 0, // no more index block
0, 0, 0, 0, 0, 0, 0, 0, // first layer offset
1, 0, 0, 0, // layer count
16, 0, 0, 0, 0, 0, 0, 0, // index start offset 16, 0, 0, 0, 0, 0, 0, 0, // index start offset
3, 0, 0, 0, 0, 0, 0, 0, // num term 3, 0, 0, 0, 0, 0, 0, 0, // num term
2, 0, 0, 0, // version 3, 0, 0, 0, // version
] ]
); );
let buffer = OwnedBytes::new(buffer); let buffer = OwnedBytes::new(buffer);

View File

@@ -5,77 +5,188 @@ use common::OwnedBytes;
use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal}; use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Clone)] #[derive(Debug, Clone)]
pub struct SSTableIndex { pub struct SSTableIndex {
blocks: Vec<BlockMeta>, root_blocks: Vec<BlockMeta>,
layer_count: u32,
index_bytes: OwnedBytes,
}
impl Default for SSTableIndex {
fn default() -> Self {
SSTableIndex {
root_blocks: Vec::new(),
layer_count: 1,
index_bytes: OwnedBytes::empty(),
}
}
} }
impl SSTableIndex { impl SSTableIndex {
/// Load an index from its binary representation /// Load an index from its binary representation
pub fn load(data: OwnedBytes) -> Result<SSTableIndex, SSTableDataCorruption> { pub fn load(
let mut reader = IndexSSTable::reader(data); data: OwnedBytes,
let mut blocks = Vec::new(); layer_count: u32,
first_layer_offset: usize,
) -> Result<SSTableIndex, SSTableDataCorruption> {
let (index_bytes, first_layer_slice) = data.split(first_layer_offset);
let mut reader = IndexSSTable::reader(first_layer_slice);
let mut root_blocks = Vec::new();
while reader.advance().map_err(|_| SSTableDataCorruption)? { while reader.advance().map_err(|_| SSTableDataCorruption)? {
blocks.push(BlockMeta { root_blocks.push(BlockMeta {
last_key_or_greater: reader.key().to_vec(), last_key_or_greater: reader.key().to_vec(),
block_addr: reader.value().clone(), block_addr: reader.value().clone(),
}); });
} }
Ok(SSTableIndex { blocks }) Ok(SSTableIndex {
} root_blocks,
layer_count,
/// Get the [`BlockAddr`] of the requested block. index_bytes,
pub(crate) fn get_block(&self, block_id: usize) -> Option<BlockAddr> { // index_bytes: OwnedBytes::empty(),
self.blocks })
.get(block_id)
.map(|block_meta| block_meta.block_addr.clone())
}
/// Get the block id of the block that would contain `key`.
///
/// Returns None if `key` is lexicographically after the last key recorded.
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<usize> {
let pos = self
.blocks
.binary_search_by_key(&key, |block| &block.last_key_or_greater);
match pos {
Ok(pos) => Some(pos),
Err(pos) => {
if pos < self.blocks.len() {
Some(pos)
} else {
// after end of last block: no block matches
None
}
}
}
} }
/// Get the [`BlockAddr`] of the block that would contain `key`. /// Get the [`BlockAddr`] of the block that would contain `key`.
/// ///
/// Returns None if `key` is lexicographically after the last key recorded. /// Returns None if `key` is lexicographically after the last key recorded.
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> { pub fn get_block_with_key(&self, key: &[u8]) -> io::Result<Option<BlockAddr>> {
self.locate_with_key(key).and_then(|id| self.get_block(id)) self.iterate_from_key(key).map(|iter| iter.value().cloned())
}
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize {
let pos = self
.blocks
.binary_search_by_key(&ord, |block| block.block_addr.first_ordinal);
match pos {
Ok(pos) => pos,
// Err(0) can't happen as the sstable starts with ordinal zero
Err(pos) => pos - 1,
}
} }
/// Get the [`BlockAddr`] of the block containing the `ord`-th term. /// Get the [`BlockAddr`] of the block containing the `ord`-th term.
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { pub fn get_block_with_ord(&self, ord: TermOrdinal) -> io::Result<BlockAddr> {
// locate_with_ord always returns an index within range let pos = self
self.get_block(self.locate_with_ord(ord)).unwrap() .root_blocks
.binary_search_by_key(&ord, |block| block.block_addr.first_ordinal);
let root_pos = match pos {
Ok(pos) => pos,
// Err(0) can't happen as the sstable starts with ordinal zero
Err(pos) => pos - 1,
};
if self.layer_count == 1 {
return Ok(self.root_blocks[root_pos].block_addr.clone());
}
let mut next_layer_block_addr = self.root_blocks[root_pos].block_addr.clone();
for _ in 1..self.layer_count {
let mut sstable_delta_reader = IndexSSTable::delta_reader(
self.index_bytes
.slice(next_layer_block_addr.byte_range.clone()),
);
while sstable_delta_reader.advance()? {
if sstable_delta_reader.value().first_ordinal > ord {
break;
}
next_layer_block_addr = sstable_delta_reader.value().clone();
}
}
Ok(next_layer_block_addr)
}
pub(crate) fn iterate_from_key(&self, key: &[u8]) -> io::Result<ReaderOrSlice<'_>> {
let root_pos = self
.root_blocks
.binary_search_by_key(&key, |block| &block.last_key_or_greater);
let root_pos = match root_pos {
Ok(pos) => pos,
Err(pos) => {
if pos < self.root_blocks.len() {
pos
} else {
// after end of last block: no block matches
return Ok(ReaderOrSlice::End);
}
}
};
let mut next_layer_block_addr = self.root_blocks[root_pos].block_addr.clone();
let mut last_delta_reader = None;
for _ in 1..self.layer_count {
// we don't enter this loop for 1 layer index
let mut sstable_delta_reader = IndexSSTable::delta_reader(
self.index_bytes.slice(next_layer_block_addr.byte_range),
);
crate::dictionary::decode_up_to_key(key, &mut sstable_delta_reader)?;
next_layer_block_addr = sstable_delta_reader.value().clone();
last_delta_reader = Some(sstable_delta_reader);
}
if let Some(delta_reader) = last_delta_reader {
// reconstruct the current key. We stopped either on the exact key, or just after
// either way, common_prefix_len is something that did not change between the
// last-key-before-target and the current pos, so those bytes must match the prefix of
// `key`. The next bytes can be obtained from the delta reader
let mut result_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
let common_prefix_len = delta_reader.common_prefix_len();
let suffix = delta_reader.suffix();
let new_len = delta_reader.common_prefix_len() + suffix.len();
result_key.resize(new_len, 0u8);
result_key[..common_prefix_len].copy_from_slice(&key[..common_prefix_len]);
result_key[common_prefix_len..].copy_from_slice(suffix);
let reader = crate::Reader {
key: result_key,
delta_reader,
};
Ok(ReaderOrSlice::Reader(reader))
} else {
// self.layer_count == 1, there is no lvl2 sstable to decode.
Ok(ReaderOrSlice::Iter(&self.root_blocks, root_pos))
}
}
}
pub(crate) enum ReaderOrSlice<'a> {
Reader(crate::Reader<crate::value::index::IndexValueReader>),
Iter(&'a [BlockMeta], usize),
End,
}
impl<'a> ReaderOrSlice<'a> {
pub fn advance(&mut self) -> Result<bool, SSTableDataCorruption> {
match self {
ReaderOrSlice::Reader(reader) => {
let res = reader.advance().map_err(|_| SSTableDataCorruption);
if !matches!(res, Ok(true)) {
*self = ReaderOrSlice::End;
}
res
}
ReaderOrSlice::Iter(slice, index) => {
*index += 1;
if *index < slice.len() {
Ok(true)
} else {
*self = ReaderOrSlice::End;
Ok(false)
}
}
ReaderOrSlice::End => Ok(false),
}
}
/// Get current key. Always Some(_) unless last call to advance returned something else than
/// Ok(true)
pub fn key(&self) -> Option<&[u8]> {
match self {
ReaderOrSlice::Reader(reader) => Some(reader.key()),
ReaderOrSlice::Iter(slice, index) => Some(&slice[*index].last_key_or_greater),
ReaderOrSlice::End => None,
}
}
/// Get current value. Always Some(_) unless last call to advance returned something else than
/// Ok(true)
pub fn value(&self) -> Option<&BlockAddr> {
match self {
ReaderOrSlice::Reader(reader) => Some(reader.value()),
ReaderOrSlice::Iter(slice, index) => Some(&slice[*index].block_addr),
ReaderOrSlice::End => None,
}
} }
} }
@@ -124,13 +235,13 @@ impl SSTableIndexBuilder {
/// try to find a shorter alternative to the last key of the last block /// try to find a shorter alternative to the last key of the last block
/// that is still smaller than the next key. /// that is still smaller than the next key.
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) { pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
if let Some(last_block) = self.index.blocks.last_mut() { if let Some(last_block) = self.index.root_blocks.last_mut() {
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key); find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
} }
} }
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) { pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
self.index.blocks.push(BlockMeta { self.index.root_blocks.push(BlockMeta {
last_key_or_greater: last_key.to_vec(), last_key_or_greater: last_key.to_vec(),
block_addr: BlockAddr { block_addr: BlockAddr {
byte_range, byte_range,
@@ -139,31 +250,90 @@ impl SSTableIndexBuilder {
}) })
} }
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<()> { pub fn serialize<W: std::io::Write>(
// we can't use a plain writer as it would generate an index &self,
let mut sstable_writer = IndexSSTable::delta_writer(wrt); wrt: W,
index_max_root_blocks: std::num::NonZeroU64,
) -> io::Result<(u32, u64)> {
let index_max_root_blocks = index_max_root_blocks.get();
// in tests, set a smaller block size to stress-test let mut wrt = common::CountingWriter::wrap(wrt);
#[cfg(test)] let mut next_layer = write_sstable_layer(&mut wrt, &self.index.root_blocks, 0)?;
sstable_writer.set_block_len(16);
let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY); let mut layer_count = 1;
for block in self.index.blocks.iter() { let mut offset = 0;
let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater); while next_layer.len() as u64 > index_max_root_blocks {
offset = wrt.written_bytes();
layer_count += 1;
sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]); next_layer = write_sstable_layer(&mut wrt, &next_layer, offset as usize)?;
sstable_writer.write_value(&block.block_addr);
sstable_writer.flush_block_if_required()?;
previous_key.clear();
previous_key.extend_from_slice(&block.last_key_or_greater);
} }
sstable_writer.flush_block()?; Ok((layer_count, offset))
sstable_writer.finish().write_all(&0u32.to_le_bytes())?;
Ok(())
} }
} }
fn write_sstable_layer<W: std::io::Write>(
wrt: W,
layer_content: &[BlockMeta],
offset: usize,
) -> io::Result<Vec<BlockMeta>> {
// we can't use a plain writer as it would generate an index
// also disable compression, the index is small anyway, and it's the most costly part of
// opening that kind of sstable
let mut sstable_writer =
crate::DeltaWriter::<_, crate::value::index::IndexValueWriter>::new_no_compression(wrt);
// in tests, set a smaller block size to stress-test
#[cfg(test)]
sstable_writer.set_block_len(16);
let mut next_layer = Vec::new();
let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
let mut first_ordinal = None;
for block in layer_content.iter() {
if first_ordinal.is_none() {
first_ordinal = Some(block.block_addr.first_ordinal);
}
let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater);
sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]);
sstable_writer.write_value(&block.block_addr);
if let Some(range) = sstable_writer.flush_block_if_required()? {
let real_range = (range.start + offset)..(range.end + offset);
let block_meta = BlockMeta {
last_key_or_greater: block.last_key_or_greater.clone(),
block_addr: BlockAddr {
byte_range: real_range,
first_ordinal: first_ordinal.take().unwrap(),
},
};
next_layer.push(block_meta);
previous_key.clear();
} else {
previous_key.extend_from_slice(&block.last_key_or_greater);
previous_key.resize(block.last_key_or_greater.len(), 0u8);
previous_key[keep_len..].copy_from_slice(&block.last_key_or_greater[keep_len..]);
}
}
if let Some(range) = sstable_writer.flush_block()? {
if let Some(last_block) = layer_content.last() {
// not going here means an empty table (?!)
let real_range = (range.start + offset)..(range.end + offset);
let block_meta = BlockMeta {
last_key_or_greater: last_block.last_key_or_greater.clone(),
block_addr: BlockAddr {
byte_range: real_range,
first_ordinal: first_ordinal.take().unwrap(),
},
};
next_layer.push(block_meta);
}
}
sstable_writer.finish().write_all(&0u32.to_le_bytes())?;
Ok(next_layer)
}
/// SSTable representing an index /// SSTable representing an index
/// ///
/// `last_key_or_greater` is used as the key, the value contains the /// `last_key_or_greater` is used as the key, the value contains the
@@ -194,28 +364,77 @@ mod tests {
sstable_builder.add_block(b"ccc", 30..40, 10u64); sstable_builder.add_block(b"ccc", 30..40, 10u64);
sstable_builder.add_block(b"dddd", 40..50, 15u64); sstable_builder.add_block(b"dddd", 40..50, 15u64);
let mut buffer: Vec<u8> = Vec::new(); let mut buffer: Vec<u8> = Vec::new();
sstable_builder.serialize(&mut buffer).unwrap(); sstable_builder
.serialize(&mut buffer, crate::DEFAULT_MAX_ROOT_BLOCKS)
.unwrap();
let buffer = OwnedBytes::new(buffer); let buffer = OwnedBytes::new(buffer);
let sstable_index = SSTableIndex::load(buffer).unwrap(); let sstable_index = SSTableIndex::load(buffer, 1, 0).unwrap();
assert_eq!( assert_eq!(
sstable_index.get_block_with_key(b"bbbde"), sstable_index.get_block_with_key(b"bbbde").unwrap(),
Some(BlockAddr { Some(BlockAddr {
first_ordinal: 10u64, first_ordinal: 10u64,
byte_range: 30..40 byte_range: 30..40
}) })
); );
assert_eq!(sstable_index.locate_with_key(b"aa").unwrap(), 0); assert_eq!(
assert_eq!(sstable_index.locate_with_key(b"aaa").unwrap(), 0); sstable_index
assert_eq!(sstable_index.locate_with_key(b"aab").unwrap(), 1); .get_block_with_key(b"aa")
assert_eq!(sstable_index.locate_with_key(b"ccc").unwrap(), 2); .unwrap()
assert!(sstable_index.locate_with_key(b"e").is_none()); .unwrap()
.first_ordinal,
0
);
assert_eq!(
sstable_index
.get_block_with_key(b"aaa")
.unwrap()
.unwrap()
.first_ordinal,
0
);
assert_eq!(
sstable_index
.get_block_with_key(b"aab")
.unwrap()
.unwrap()
.first_ordinal,
5
);
assert_eq!(
sstable_index
.get_block_with_key(b"ccc")
.unwrap()
.unwrap()
.first_ordinal,
10
);
assert!(sstable_index.get_block_with_key(b"e").unwrap().is_none());
assert_eq!(sstable_index.locate_with_ord(0), 0); assert_eq!(
assert_eq!(sstable_index.locate_with_ord(1), 0); sstable_index.get_block_with_ord(0).unwrap().first_ordinal,
assert_eq!(sstable_index.locate_with_ord(4), 0); 0
assert_eq!(sstable_index.locate_with_ord(5), 1); );
assert_eq!(sstable_index.locate_with_ord(100), 3); assert_eq!(
sstable_index.get_block_with_ord(1).unwrap().first_ordinal,
0
);
assert_eq!(
sstable_index.get_block_with_ord(4).unwrap().first_ordinal,
0
);
assert_eq!(
sstable_index.get_block_with_ord(5).unwrap().first_ordinal,
5
);
assert_eq!(
sstable_index.get_block_with_ord(6).unwrap().first_ordinal,
5
);
assert_eq!(
sstable_index.get_block_with_ord(100).unwrap().first_ordinal,
15
);
} }
#[test] #[test]
@@ -226,10 +445,12 @@ mod tests {
sstable_builder.add_block(b"ccc", 30..40, 10u64); sstable_builder.add_block(b"ccc", 30..40, 10u64);
sstable_builder.add_block(b"dddd", 40..50, 15u64); sstable_builder.add_block(b"dddd", 40..50, 15u64);
let mut buffer: Vec<u8> = Vec::new(); let mut buffer: Vec<u8> = Vec::new();
sstable_builder.serialize(&mut buffer).unwrap(); sstable_builder
.serialize(&mut buffer, crate::DEFAULT_MAX_ROOT_BLOCKS)
.unwrap();
buffer[2] = 9u8; buffer[2] = 9u8;
let buffer = OwnedBytes::new(buffer); let buffer = OwnedBytes::new(buffer);
let data_corruption_err = SSTableIndex::load(buffer).err().unwrap(); let data_corruption_err = SSTableIndex::load(buffer, 1, 0).err().unwrap();
assert!(matches!(data_corruption_err, SSTableDataCorruption)); assert!(matches!(data_corruption_err, SSTableDataCorruption));
} }

View File

@@ -110,7 +110,7 @@ where
Bound::Included(key) | Bound::Excluded(key) => self Bound::Included(key) | Bound::Excluded(key) => self
.term_dict .term_dict
.sstable_index .sstable_index
.get_block_with_key(key) .get_block_with_key(key)?
.map(|block| block.first_ordinal) .map(|block| block.first_ordinal)
.unwrap_or(0), .unwrap_or(0),
Bound::Unbounded => 0, Bound::Unbounded => 0,

View File

@@ -3,6 +3,10 @@ use std::io;
use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter}; use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter};
use crate::{vint, BlockAddr}; use crate::{vint, BlockAddr};
// TODO define a LazyIndexValueReader?
// one which keeps state could be useful for ord_to_block fns,
// one which doesn't at all woud be perfect for term_to_block fns
// pending bench to asses real impact
#[derive(Default)] #[derive(Default)]
pub(crate) struct IndexValueReader { pub(crate) struct IndexValueReader {
vals: Vec<BlockAddr>, vals: Vec<BlockAddr>,