mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2025-12-28 04:52:55 +00:00
Compare commits
4 Commits
ownedbytes
...
trinity--m
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b686ffa54 | ||
|
|
04b3a27a0a | ||
|
|
710cf1efa6 | ||
|
|
8103790c16 |
@@ -26,7 +26,7 @@ fn test_dataframe_writer_str() {
|
||||
assert_eq!(columnar.num_columns(), 1);
|
||||
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
|
||||
assert_eq!(cols.len(), 1);
|
||||
assert_eq!(cols[0].num_bytes(), 87);
|
||||
assert_eq!(cols[0].num_bytes(), 99);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() {
|
||||
assert_eq!(columnar.num_columns(), 1);
|
||||
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
|
||||
assert_eq!(cols.len(), 1);
|
||||
assert_eq!(cols[0].num_bytes(), 87);
|
||||
assert_eq!(cols[0].num_bytes(), 99);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::convert::TryInto;
|
||||
use std::ops::{Deref, Range};
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::{fmt, io};
|
||||
|
||||
@@ -37,7 +37,7 @@ impl OwnedBytes {
|
||||
/// creates a fileslice that is just a view over a slice of the data.
|
||||
#[must_use]
|
||||
#[inline]
|
||||
pub fn slice(&self, range: Range<usize>) -> Self {
|
||||
pub fn slice(&self, range: impl std::slice::SliceIndex<[u8], Output = [u8]>) -> Self {
|
||||
OwnedBytes {
|
||||
data: &self.data[range],
|
||||
box_stable_deref: self.box_stable_deref.clone(),
|
||||
|
||||
@@ -131,7 +131,7 @@ mod tests {
|
||||
}
|
||||
let file = directory.open_read(path).unwrap();
|
||||
|
||||
assert_eq!(file.len(), 93);
|
||||
assert_eq!(file.len(), 105);
|
||||
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
|
||||
let column = fast_field_readers
|
||||
.u64("field")
|
||||
@@ -181,7 +181,7 @@ mod tests {
|
||||
write.terminate().unwrap();
|
||||
}
|
||||
let file = directory.open_read(path).unwrap();
|
||||
assert_eq!(file.len(), 121);
|
||||
assert_eq!(file.len(), 133);
|
||||
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
|
||||
let col = fast_field_readers
|
||||
.u64("field")
|
||||
@@ -214,7 +214,7 @@ mod tests {
|
||||
write.terminate().unwrap();
|
||||
}
|
||||
let file = directory.open_read(path).unwrap();
|
||||
assert_eq!(file.len(), 94);
|
||||
assert_eq!(file.len(), 106);
|
||||
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
|
||||
let fast_field_reader = fast_field_readers
|
||||
.u64("field")
|
||||
@@ -246,7 +246,7 @@ mod tests {
|
||||
write.terminate().unwrap();
|
||||
}
|
||||
let file = directory.open_read(path).unwrap();
|
||||
assert_eq!(file.len(), 4489);
|
||||
assert_eq!(file.len(), 4501);
|
||||
{
|
||||
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
|
||||
let col = fast_field_readers
|
||||
@@ -279,7 +279,7 @@ mod tests {
|
||||
write.terminate().unwrap();
|
||||
}
|
||||
let file = directory.open_read(path).unwrap();
|
||||
assert_eq!(file.len(), 265);
|
||||
assert_eq!(file.len(), 277);
|
||||
|
||||
{
|
||||
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
|
||||
@@ -773,7 +773,7 @@ mod tests {
|
||||
write.terminate().unwrap();
|
||||
}
|
||||
let file = directory.open_read(path).unwrap();
|
||||
assert_eq!(file.len(), 102);
|
||||
assert_eq!(file.len(), 114);
|
||||
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
|
||||
let bool_col = fast_field_readers.bool("field_bool").unwrap();
|
||||
assert_eq!(bool_col.first(0), Some(true));
|
||||
@@ -805,7 +805,7 @@ mod tests {
|
||||
write.terminate().unwrap();
|
||||
}
|
||||
let file = directory.open_read(path).unwrap();
|
||||
assert_eq!(file.len(), 114);
|
||||
assert_eq!(file.len(), 126);
|
||||
let readers = FastFieldReaders::open(file, schema).unwrap();
|
||||
let bool_col = readers.bool("field_bool").unwrap();
|
||||
for i in 0..25 {
|
||||
@@ -830,7 +830,7 @@ mod tests {
|
||||
write.terminate().unwrap();
|
||||
}
|
||||
let file = directory.open_read(path).unwrap();
|
||||
assert_eq!(file.len(), 104);
|
||||
assert_eq!(file.len(), 116);
|
||||
let fastfield_readers = FastFieldReaders::open(file, schema).unwrap();
|
||||
let col = fastfield_readers.bool("field_bool").unwrap();
|
||||
assert_eq!(col.first(0), None);
|
||||
|
||||
@@ -89,15 +89,21 @@ Note: as the SSTable does not support redundant keys, there is no ambiguity betw
|
||||
|
||||
### SSTFooter
|
||||
```
|
||||
+-------+-------+-----+-------------+---------+---------+
|
||||
| Block | Block | ... | IndexOffset | NumTerm | Version |
|
||||
+-------+-------+-----+-------------+---------+---------+
|
||||
|----( # of blocks)---|
|
||||
+-------+-------+-----+------------------+------------+-------------+---------+---------+
|
||||
| Block | Block | ... | FirstLayerOffset | LayerCount | IndexOffset | NumTerm | Version |
|
||||
+-------+-------+-----+------------------+------------+-------------+---------+---------+
|
||||
|----(# of blocks)----|
|
||||
```
|
||||
- Block(SSTBlock): uses IndexValue for its Values format
|
||||
- FirstLayerOffset(u64): Offset between the start of the footer and the start of the top level index
|
||||
- LayerCount(u32): Number of layers of index (min 1) ## TODO do we want to use 0 as a marker for no layers? It makes small sstables 12 bytes more compact (the 0u32 would alias with the "end of sstable marker")
|
||||
- IndexOffset(u64): Offset to the start of the SSTFooter
|
||||
- NumTerm(u64): number of terms in the sstable
|
||||
- Version(u32): Currently equal to 2
|
||||
- Version(u32): Currently equal to 3
|
||||
|
||||
Blocks referencing the main table and block referencing the index itself are encoded the same way and
|
||||
are not directly differentiated. Offsets in blocks referencing the index are relative to the start of
|
||||
the footer, blocks referencing the main table are relative to the start of that table.
|
||||
|
||||
### IndexValue
|
||||
```
|
||||
|
||||
@@ -20,6 +20,7 @@ where W: io::Write
|
||||
// Only here to avoid allocations.
|
||||
stateless_buffer: Vec<u8>,
|
||||
block_len: usize,
|
||||
compress: bool,
|
||||
}
|
||||
|
||||
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
|
||||
@@ -34,6 +35,18 @@ where
|
||||
value_writer: TValueWriter::default(),
|
||||
stateless_buffer: Vec::new(),
|
||||
block_len: BLOCK_LEN,
|
||||
compress: true,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_no_compression(wrt: W) -> Self {
|
||||
DeltaWriter {
|
||||
block: Vec::with_capacity(BLOCK_LEN * 2),
|
||||
write: CountingWriter::wrap(BufWriter::new(wrt)),
|
||||
value_writer: TValueWriter::default(),
|
||||
stateless_buffer: Vec::new(),
|
||||
block_len: BLOCK_LEN,
|
||||
compress: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +66,7 @@ where
|
||||
|
||||
let block_len = buffer.len() + self.block.len();
|
||||
|
||||
if block_len > 2048 {
|
||||
if block_len > 2048 && self.compress {
|
||||
buffer.extend_from_slice(&self.block);
|
||||
self.block.clear();
|
||||
|
||||
|
||||
@@ -128,52 +128,86 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
key_range: impl RangeBounds<[u8]>,
|
||||
limit: Option<u64>,
|
||||
) -> FileSlice {
|
||||
let first_block_id = match key_range.start_bound() {
|
||||
Bound::Included(key) | Bound::Excluded(key) => {
|
||||
let Some(first_block_id) = self.sstable_index.locate_with_key(key) else {
|
||||
return FileSlice::empty();
|
||||
};
|
||||
Some(first_block_id)
|
||||
}
|
||||
// we don't perform great when limit is set to a large value, and sometime we use u64::MAX
|
||||
// as a marker for no limit, so we'd better capture that.
|
||||
// (not great means we decode up to the whole bottom layer index, which can take dozens of
|
||||
// ms on a 100m term dictionary)
|
||||
let limit = limit.filter(|limit| *limit != u64::MAX);
|
||||
|
||||
// TODO replace unwraps with proper error handling
|
||||
let start_key = match key_range.start_bound() {
|
||||
Bound::Included(key) | Bound::Excluded(key) => key,
|
||||
Bound::Unbounded => &[],
|
||||
};
|
||||
let end_key = match key_range.end_bound() {
|
||||
Bound::Included(key) | Bound::Excluded(key) => Some(key),
|
||||
Bound::Unbounded => None,
|
||||
};
|
||||
|
||||
let last_block_id = match key_range.end_bound() {
|
||||
Bound::Included(key) | Bound::Excluded(key) => self.sstable_index.locate_with_key(key),
|
||||
Bound::Unbounded => None,
|
||||
};
|
||||
|
||||
let start_bound = if let Some(first_block_id) = first_block_id {
|
||||
let Some(block_addr) = self.sstable_index.get_block(first_block_id) else {
|
||||
let bounds = if let Some(limit) = limit {
|
||||
let mut sstable_iterator = self.sstable_index.iterate_from_key(start_key).unwrap();
|
||||
let Some(start_block) = sstable_iterator.value() else {
|
||||
// range_start is after end of table
|
||||
return FileSlice::empty();
|
||||
};
|
||||
Bound::Included(block_addr.byte_range.start)
|
||||
} else {
|
||||
Bound::Unbounded
|
||||
};
|
||||
if let Some(end_key) = end_key {
|
||||
if sstable_iterator.key().unwrap() >= end_key {
|
||||
// the start and end keys are in the same block, return just that block
|
||||
return self.sstable_slice.slice(start_block.byte_range.clone());
|
||||
}
|
||||
}
|
||||
let start_bound = start_block.byte_range.start;
|
||||
|
||||
let last_block_id = if let Some(limit) = limit {
|
||||
let second_block_id = first_block_id.map(|id| id + 1).unwrap_or(0);
|
||||
if let Some(block_addr) = self.sstable_index.get_block(second_block_id) {
|
||||
let ordinal_limit = block_addr.first_ordinal + limit;
|
||||
let last_block_limit = self.sstable_index.locate_with_ord(ordinal_limit);
|
||||
if let Some(last_block_id) = last_block_id {
|
||||
Some(last_block_id.min(last_block_limit))
|
||||
sstable_iterator.advance().unwrap();
|
||||
let Some(second_block) = sstable_iterator.value() else {
|
||||
// we reached the end of the sstable, return everything from start_bound
|
||||
return self.sstable_slice.slice(start_bound..);
|
||||
};
|
||||
let mut end_bound = second_block.byte_range.end;
|
||||
if let Some(end_key) = end_key {
|
||||
if sstable_iterator.key().unwrap() >= end_key {
|
||||
return self.sstable_slice.slice(start_bound..end_bound);
|
||||
}
|
||||
}
|
||||
|
||||
let target_ord = second_block.first_ordinal + limit;
|
||||
|
||||
while sstable_iterator.advance().unwrap() {
|
||||
let block = sstable_iterator.value().unwrap();
|
||||
if block.first_ordinal >= target_ord {
|
||||
break;
|
||||
}
|
||||
end_bound = block.byte_range.end;
|
||||
if let Some(end_key) = end_key {
|
||||
if sstable_iterator.key().unwrap() >= end_key {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let start_bound = Bound::Included(start_bound);
|
||||
let end_bound = Bound::Excluded(end_bound);
|
||||
|
||||
(start_bound, end_bound)
|
||||
} else {
|
||||
let Some(start_block) = self.sstable_index.get_block_with_key(start_key).unwrap()
|
||||
else {
|
||||
// range_start is after end of table
|
||||
return FileSlice::empty();
|
||||
};
|
||||
let start_bound = Bound::Included(start_block.byte_range.start);
|
||||
let end_bound = if let Some(end_key) = end_key {
|
||||
if let Some(end_block) = self.sstable_index.get_block_with_key(end_key).unwrap() {
|
||||
Bound::Excluded(end_block.byte_range.end)
|
||||
} else {
|
||||
Some(last_block_limit)
|
||||
Bound::Unbounded
|
||||
}
|
||||
} else {
|
||||
last_block_id
|
||||
}
|
||||
} else {
|
||||
last_block_id
|
||||
Bound::Unbounded
|
||||
};
|
||||
(start_bound, end_bound)
|
||||
};
|
||||
let end_bound = last_block_id
|
||||
.and_then(|block_id| self.sstable_index.get_block(block_id))
|
||||
.map(|block_addr| Bound::Excluded(block_addr.byte_range.end))
|
||||
.unwrap_or(Bound::Unbounded);
|
||||
|
||||
self.sstable_slice.slice((start_bound, end_bound))
|
||||
self.sstable_slice.slice(bounds)
|
||||
}
|
||||
|
||||
/// Opens a `TermDictionary`.
|
||||
@@ -181,29 +215,57 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(20);
|
||||
let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?;
|
||||
|
||||
// let layer_count = u32::deserialize(&mut footer_len_bytes)?;
|
||||
let index_offset = u64::deserialize(&mut footer_len_bytes)?;
|
||||
let num_terms = u64::deserialize(&mut footer_len_bytes)?;
|
||||
let version = u32::deserialize(&mut footer_len_bytes)?;
|
||||
if version != crate::SSTABLE_VERSION {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Unsuported sstable version, expected {version}, found {}",
|
||||
crate::SSTABLE_VERSION,
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
|
||||
let sstable_index_bytes = index_slice.read_bytes()?;
|
||||
let sstable_index = SSTableIndex::load(sstable_index_bytes)
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
|
||||
Ok(Dictionary {
|
||||
sstable_slice,
|
||||
sstable_index,
|
||||
num_terms,
|
||||
phantom_data: PhantomData,
|
||||
})
|
||||
match version {
|
||||
2 => {
|
||||
// previous format, kept for backward compatibility
|
||||
let sstable_index_bytes = index_slice.read_bytes()?;
|
||||
// on the old format, the 1st layer necessarily start immediately, and there is
|
||||
// only a single layer
|
||||
let sstable_index =
|
||||
SSTableIndex::load(sstable_index_bytes, 1, 0).map_err(|_| {
|
||||
io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption")
|
||||
})?;
|
||||
Ok(Dictionary {
|
||||
sstable_slice,
|
||||
sstable_index,
|
||||
num_terms,
|
||||
phantom_data: PhantomData,
|
||||
})
|
||||
}
|
||||
3 => {
|
||||
let sstable_index_bytes = index_slice.read_bytes()?;
|
||||
let (sstable_index_bytes, mut v3_footer_bytes) = sstable_index_bytes.rsplit(12);
|
||||
let first_layer_offset = v3_footer_bytes.read_u64();
|
||||
let layer_count = v3_footer_bytes.read_u32();
|
||||
|
||||
let sstable_index = SSTableIndex::load(
|
||||
sstable_index_bytes,
|
||||
layer_count,
|
||||
first_layer_offset as usize,
|
||||
)
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
|
||||
Ok(Dictionary {
|
||||
sstable_slice,
|
||||
sstable_index,
|
||||
num_terms,
|
||||
phantom_data: PhantomData,
|
||||
})
|
||||
}
|
||||
_ => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Unsuported sstable version, expected {}, found {version}",
|
||||
crate::SSTABLE_VERSION,
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a term dictionary from the supplied bytes.
|
||||
@@ -227,68 +289,17 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
self.num_terms as usize
|
||||
}
|
||||
|
||||
/// Decode a DeltaReader up to key, returning the number of terms traversed
|
||||
///
|
||||
/// If the key was not found, returns Ok(None).
|
||||
/// After calling this function, it is possible to call `DeltaReader::value` to get the
|
||||
/// associated value.
|
||||
fn decode_up_to_key<K: AsRef<[u8]>>(
|
||||
&self,
|
||||
key: K,
|
||||
sstable_delta_reader: &mut DeltaReader<TSSTable::ValueReader>,
|
||||
) -> io::Result<Option<TermOrdinal>> {
|
||||
let mut term_ord = 0;
|
||||
let key_bytes = key.as_ref();
|
||||
let mut ok_bytes = 0;
|
||||
while sstable_delta_reader.advance()? {
|
||||
let prefix_len = sstable_delta_reader.common_prefix_len();
|
||||
let suffix = sstable_delta_reader.suffix();
|
||||
|
||||
match prefix_len.cmp(&ok_bytes) {
|
||||
Ordering::Less => return Ok(None), // popped bytes already matched => too far
|
||||
Ordering::Equal => (),
|
||||
Ordering::Greater => {
|
||||
// the ok prefix is less than current entry prefix => continue to next elem
|
||||
term_ord += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// we have ok_bytes byte of common prefix, check if this key adds more
|
||||
for (key_byte, suffix_byte) in key_bytes[ok_bytes..].iter().zip(suffix) {
|
||||
match suffix_byte.cmp(key_byte) {
|
||||
Ordering::Less => break, // byte too small
|
||||
Ordering::Equal => ok_bytes += 1, // new matching byte
|
||||
Ordering::Greater => return Ok(None), // too far
|
||||
}
|
||||
}
|
||||
|
||||
if ok_bytes == key_bytes.len() {
|
||||
if prefix_len + suffix.len() == ok_bytes {
|
||||
return Ok(Some(term_ord));
|
||||
} else {
|
||||
// current key is a prefix of current element, not a match
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
term_ord += 1;
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
/// Returns the ordinal associated with a given term.
|
||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
|
||||
let key_bytes = key.as_ref();
|
||||
|
||||
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else {
|
||||
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes)? else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let first_ordinal = block_addr.first_ordinal;
|
||||
let mut sstable_delta_reader = self.sstable_delta_reader_block(block_addr)?;
|
||||
self.decode_up_to_key(key_bytes, &mut sstable_delta_reader)
|
||||
decode_up_to_key(key_bytes, &mut sstable_delta_reader)
|
||||
.map(|opt| opt.map(|ord| ord + first_ordinal))
|
||||
}
|
||||
|
||||
@@ -303,7 +314,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
/// the buffer may be modified.
|
||||
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
|
||||
// find block in which the term would be
|
||||
let block_addr = self.sstable_index.get_block_with_ord(ord);
|
||||
let block_addr = self.sstable_index.get_block_with_ord(ord)?;
|
||||
let first_ordinal = block_addr.first_ordinal;
|
||||
|
||||
// then search inside that block only
|
||||
@@ -321,7 +332,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
/// Returns the number of terms in the dictionary.
|
||||
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<Option<TSSTable::Value>> {
|
||||
// find block in which the term would be
|
||||
let block_addr = self.sstable_index.get_block_with_ord(term_ord);
|
||||
let block_addr = self.sstable_index.get_block_with_ord(term_ord)?;
|
||||
let first_ordinal = block_addr.first_ordinal;
|
||||
|
||||
// then search inside that block only
|
||||
@@ -336,7 +347,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
|
||||
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
|
||||
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref())? {
|
||||
let sstable_reader = self.sstable_delta_reader_block(block_addr)?;
|
||||
return self.do_get(key, sstable_reader);
|
||||
}
|
||||
@@ -345,7 +356,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
|
||||
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
|
||||
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref())? {
|
||||
let sstable_reader = self.sstable_delta_reader_block_async(block_addr).await?;
|
||||
return self.do_get(key, sstable_reader);
|
||||
}
|
||||
@@ -357,7 +368,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
key: K,
|
||||
mut reader: DeltaReader<TSSTable::ValueReader>,
|
||||
) -> io::Result<Option<TSSTable::Value>> {
|
||||
if let Some(_ord) = self.decode_up_to_key(key, &mut reader)? {
|
||||
if let Some(_ord) = decode_up_to_key(key, &mut reader)? {
|
||||
Ok(Some(reader.value().clone()))
|
||||
} else {
|
||||
Ok(None)
|
||||
@@ -394,6 +405,56 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Decode a DeltaReader up to key, returning the number of terms traversed
|
||||
///
|
||||
/// If the key was not found, returns Ok(None).
|
||||
/// After calling this function, it is possible to call `DeltaReader::value` to get the
|
||||
/// associated value.
|
||||
pub(crate) fn decode_up_to_key<K: AsRef<[u8]>, TValueReader: crate::ValueReader>(
|
||||
key: K,
|
||||
sstable_delta_reader: &mut DeltaReader<TValueReader>,
|
||||
) -> io::Result<Option<TermOrdinal>> {
|
||||
let mut term_ord = 0;
|
||||
let key_bytes = key.as_ref();
|
||||
let mut ok_bytes = 0;
|
||||
while sstable_delta_reader.advance()? {
|
||||
let prefix_len = sstable_delta_reader.common_prefix_len();
|
||||
let suffix = sstable_delta_reader.suffix();
|
||||
|
||||
match prefix_len.cmp(&ok_bytes) {
|
||||
Ordering::Less => return Ok(None), // popped bytes already matched => too far
|
||||
Ordering::Equal => (),
|
||||
Ordering::Greater => {
|
||||
// the ok prefix is less than current entry prefix => continue to next elem
|
||||
term_ord += 1;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
// we have ok_bytes byte of common prefix, check if this key adds more
|
||||
for (key_byte, suffix_byte) in key_bytes[ok_bytes..].iter().zip(suffix) {
|
||||
match suffix_byte.cmp(key_byte) {
|
||||
Ordering::Less => break, // byte too small
|
||||
Ordering::Equal => ok_bytes += 1, // new matching byte
|
||||
Ordering::Greater => return Ok(None), // too far
|
||||
}
|
||||
}
|
||||
|
||||
if ok_bytes == key_bytes.len() {
|
||||
if prefix_len + suffix.len() == ok_bytes {
|
||||
return Ok(Some(term_ord));
|
||||
} else {
|
||||
// current key is a prefix of current element, not a match
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
|
||||
term_ord += 1;
|
||||
}
|
||||
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Range;
|
||||
@@ -459,8 +520,6 @@ mod tests {
|
||||
|
||||
let dictionary = Dictionary::<MonotonicU64SSTable>::open(slice).unwrap();
|
||||
|
||||
// if the last block is id 0, tests are meaningless
|
||||
assert_ne!(dictionary.sstable_index.locate_with_ord(u64::MAX), 0);
|
||||
assert_eq!(dictionary.num_terms(), 0x3ffff);
|
||||
(dictionary, table)
|
||||
}
|
||||
@@ -469,7 +528,7 @@ mod tests {
|
||||
fn test_ord_term_conversion() {
|
||||
let (dic, slice) = make_test_sstable();
|
||||
|
||||
let block = dic.sstable_index.get_block_with_ord(100_000);
|
||||
let block = dic.sstable_index.get_block_with_ord(100_000).unwrap();
|
||||
slice.restrict(block.byte_range);
|
||||
|
||||
let mut res = Vec::new();
|
||||
@@ -495,7 +554,11 @@ mod tests {
|
||||
|
||||
// end of a block
|
||||
let ordinal = block.first_ordinal - 1;
|
||||
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
|
||||
let new_range = dic
|
||||
.sstable_index
|
||||
.get_block_with_ord(ordinal)
|
||||
.unwrap()
|
||||
.byte_range;
|
||||
slice.restrict(new_range);
|
||||
assert!(dic.ord_to_term(ordinal, &mut res).unwrap());
|
||||
assert_eq!(res, format!("{ordinal:05X}").into_bytes());
|
||||
@@ -505,7 +568,7 @@ mod tests {
|
||||
|
||||
// before first block
|
||||
// 1st block must be loaded for key-related operations
|
||||
let block = dic.sstable_index.get_block_with_ord(0);
|
||||
let block = dic.sstable_index.get_block_with_ord(0).unwrap();
|
||||
slice.restrict(block.byte_range);
|
||||
|
||||
assert!(dic.get(b"$$$").unwrap().is_none());
|
||||
@@ -514,7 +577,11 @@ mod tests {
|
||||
// after last block
|
||||
// last block must be loaded for ord related operations
|
||||
let ordinal = 0x40000 + 10;
|
||||
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
|
||||
let new_range = dic
|
||||
.sstable_index
|
||||
.get_block_with_ord(ordinal)
|
||||
.unwrap()
|
||||
.byte_range;
|
||||
slice.restrict(new_range);
|
||||
assert!(!dic.ord_to_term(ordinal, &mut res).unwrap());
|
||||
assert!(dic.term_info_from_ord(ordinal).unwrap().is_none());
|
||||
@@ -539,11 +606,13 @@ mod tests {
|
||||
.sstable_index
|
||||
.get_block_with_key(b"10000")
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.byte_range;
|
||||
let end = dic
|
||||
.sstable_index
|
||||
.get_block_with_key(b"18000")
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.byte_range;
|
||||
slice.restrict(start.start..end.end);
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::io::{self, Write};
|
||||
use std::num::NonZeroU64;
|
||||
use std::ops::Range;
|
||||
use std::usize;
|
||||
|
||||
use merge::ValueMerger;
|
||||
|
||||
@@ -28,7 +28,13 @@ use crate::value::{RangeValueReader, RangeValueWriter};
|
||||
pub type TermOrdinal = u64;
|
||||
|
||||
const DEFAULT_KEY_CAPACITY: usize = 50;
|
||||
const SSTABLE_VERSION: u32 = 2;
|
||||
const SSTABLE_VERSION: u32 = 3;
|
||||
|
||||
// TODO tune that value. Maybe it's too little?
|
||||
#[cfg(not(test))]
|
||||
const DEFAULT_MAX_ROOT_BLOCKS: NonZeroU64 = unsafe { NonZeroU64::new_unchecked(32) };
|
||||
#[cfg(test)]
|
||||
const DEFAULT_MAX_ROOT_BLOCKS: NonZeroU64 = unsafe { NonZeroU64::new_unchecked(1) };
|
||||
|
||||
/// Given two byte string returns the length of
|
||||
/// the longest common prefix.
|
||||
@@ -55,7 +61,7 @@ pub trait SSTable: Sized {
|
||||
}
|
||||
|
||||
fn writer<W: io::Write>(wrt: W) -> Writer<W, Self::ValueWriter> {
|
||||
Writer::new(wrt)
|
||||
Writer::new(wrt, DEFAULT_MAX_ROOT_BLOCKS)
|
||||
}
|
||||
|
||||
fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> {
|
||||
@@ -178,6 +184,7 @@ where W: io::Write
|
||||
delta_writer: DeltaWriter<W, TValueWriter>,
|
||||
num_terms: u64,
|
||||
first_ordinal_of_the_block: u64,
|
||||
index_max_root_blocks: NonZeroU64,
|
||||
}
|
||||
|
||||
impl<W, TValueWriter> Writer<W, TValueWriter>
|
||||
@@ -190,17 +197,18 @@ where
|
||||
/// TODO remove this function. (See Issue #1727)
|
||||
#[doc(hidden)]
|
||||
pub fn create(wrt: W) -> io::Result<Self> {
|
||||
Ok(Self::new(wrt))
|
||||
Ok(Self::new(wrt, DEFAULT_MAX_ROOT_BLOCKS))
|
||||
}
|
||||
|
||||
/// Creates a new `TermDictionaryBuilder`.
|
||||
pub fn new(wrt: W) -> Self {
|
||||
pub fn new(wrt: W, index_max_root_blocks: NonZeroU64) -> Self {
|
||||
Writer {
|
||||
previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
|
||||
num_terms: 0u64,
|
||||
index_builder: SSTableIndexBuilder::default(),
|
||||
delta_writer: DeltaWriter::new(wrt),
|
||||
first_ordinal_of_the_block: 0u64,
|
||||
index_max_root_blocks,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -302,10 +310,15 @@ where
|
||||
// add a final empty block as an end marker
|
||||
wrt.write_all(&0u32.to_le_bytes())?;
|
||||
|
||||
let offset = wrt.written_bytes();
|
||||
let index_offset = wrt.written_bytes();
|
||||
|
||||
self.index_builder.serialize(&mut wrt)?;
|
||||
wrt.write_all(&offset.to_le_bytes())?;
|
||||
let (layer_count, layer_offset): (u32, u64) = self
|
||||
.index_builder
|
||||
.serialize(&mut wrt, self.index_max_root_blocks)?;
|
||||
wrt.write_all(&layer_offset.to_le_bytes())?;
|
||||
wrt.write_all(&layer_count.to_le_bytes())?;
|
||||
|
||||
wrt.write_all(&index_offset.to_le_bytes())?;
|
||||
wrt.write_all(&self.num_terms.to_le_bytes())?;
|
||||
|
||||
SSTABLE_VERSION.serialize(&mut wrt)?;
|
||||
@@ -389,9 +402,11 @@ mod test {
|
||||
0, // compression
|
||||
1, 0, 12, 0, 32, 17, 20, // index block
|
||||
0, 0, 0, 0, // no more index block
|
||||
0, 0, 0, 0, 0, 0, 0, 0, // first layer offset
|
||||
1, 0, 0, 0, // layer count
|
||||
16, 0, 0, 0, 0, 0, 0, 0, // index start offset
|
||||
3, 0, 0, 0, 0, 0, 0, 0, // num term
|
||||
2, 0, 0, 0, // version
|
||||
3, 0, 0, 0, // version
|
||||
]
|
||||
);
|
||||
let buffer = OwnedBytes::new(buffer);
|
||||
|
||||
@@ -5,77 +5,188 @@ use common::OwnedBytes;
|
||||
|
||||
use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal};
|
||||
|
||||
#[derive(Default, Debug, Clone)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SSTableIndex {
|
||||
blocks: Vec<BlockMeta>,
|
||||
root_blocks: Vec<BlockMeta>,
|
||||
layer_count: u32,
|
||||
index_bytes: OwnedBytes,
|
||||
}
|
||||
|
||||
impl Default for SSTableIndex {
|
||||
fn default() -> Self {
|
||||
SSTableIndex {
|
||||
root_blocks: Vec::new(),
|
||||
layer_count: 1,
|
||||
index_bytes: OwnedBytes::empty(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl SSTableIndex {
|
||||
/// Load an index from its binary representation
|
||||
pub fn load(data: OwnedBytes) -> Result<SSTableIndex, SSTableDataCorruption> {
|
||||
let mut reader = IndexSSTable::reader(data);
|
||||
let mut blocks = Vec::new();
|
||||
pub fn load(
|
||||
data: OwnedBytes,
|
||||
layer_count: u32,
|
||||
first_layer_offset: usize,
|
||||
) -> Result<SSTableIndex, SSTableDataCorruption> {
|
||||
let (index_bytes, first_layer_slice) = data.split(first_layer_offset);
|
||||
let mut reader = IndexSSTable::reader(first_layer_slice);
|
||||
let mut root_blocks = Vec::new();
|
||||
|
||||
while reader.advance().map_err(|_| SSTableDataCorruption)? {
|
||||
blocks.push(BlockMeta {
|
||||
root_blocks.push(BlockMeta {
|
||||
last_key_or_greater: reader.key().to_vec(),
|
||||
block_addr: reader.value().clone(),
|
||||
});
|
||||
}
|
||||
|
||||
Ok(SSTableIndex { blocks })
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the requested block.
|
||||
pub(crate) fn get_block(&self, block_id: usize) -> Option<BlockAddr> {
|
||||
self.blocks
|
||||
.get(block_id)
|
||||
.map(|block_meta| block_meta.block_addr.clone())
|
||||
}
|
||||
|
||||
/// Get the block id of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<usize> {
|
||||
let pos = self
|
||||
.blocks
|
||||
.binary_search_by_key(&key, |block| &block.last_key_or_greater);
|
||||
match pos {
|
||||
Ok(pos) => Some(pos),
|
||||
Err(pos) => {
|
||||
if pos < self.blocks.len() {
|
||||
Some(pos)
|
||||
} else {
|
||||
// after end of last block: no block matches
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(SSTableIndex {
|
||||
root_blocks,
|
||||
layer_count,
|
||||
index_bytes,
|
||||
// index_bytes: OwnedBytes::empty(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
self.locate_with_key(key).and_then(|id| self.get_block(id))
|
||||
}
|
||||
|
||||
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize {
|
||||
let pos = self
|
||||
.blocks
|
||||
.binary_search_by_key(&ord, |block| block.block_addr.first_ordinal);
|
||||
|
||||
match pos {
|
||||
Ok(pos) => pos,
|
||||
// Err(0) can't happen as the sstable starts with ordinal zero
|
||||
Err(pos) => pos - 1,
|
||||
}
|
||||
pub fn get_block_with_key(&self, key: &[u8]) -> io::Result<Option<BlockAddr>> {
|
||||
self.iterate_from_key(key).map(|iter| iter.value().cloned())
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||
// locate_with_ord always returns an index within range
|
||||
self.get_block(self.locate_with_ord(ord)).unwrap()
|
||||
pub fn get_block_with_ord(&self, ord: TermOrdinal) -> io::Result<BlockAddr> {
|
||||
let pos = self
|
||||
.root_blocks
|
||||
.binary_search_by_key(&ord, |block| block.block_addr.first_ordinal);
|
||||
|
||||
let root_pos = match pos {
|
||||
Ok(pos) => pos,
|
||||
// Err(0) can't happen as the sstable starts with ordinal zero
|
||||
Err(pos) => pos - 1,
|
||||
};
|
||||
|
||||
if self.layer_count == 1 {
|
||||
return Ok(self.root_blocks[root_pos].block_addr.clone());
|
||||
}
|
||||
|
||||
let mut next_layer_block_addr = self.root_blocks[root_pos].block_addr.clone();
|
||||
for _ in 1..self.layer_count {
|
||||
let mut sstable_delta_reader = IndexSSTable::delta_reader(
|
||||
self.index_bytes
|
||||
.slice(next_layer_block_addr.byte_range.clone()),
|
||||
);
|
||||
while sstable_delta_reader.advance()? {
|
||||
if sstable_delta_reader.value().first_ordinal > ord {
|
||||
break;
|
||||
}
|
||||
next_layer_block_addr = sstable_delta_reader.value().clone();
|
||||
}
|
||||
}
|
||||
Ok(next_layer_block_addr)
|
||||
}
|
||||
|
||||
pub(crate) fn iterate_from_key(&self, key: &[u8]) -> io::Result<ReaderOrSlice<'_>> {
|
||||
let root_pos = self
|
||||
.root_blocks
|
||||
.binary_search_by_key(&key, |block| &block.last_key_or_greater);
|
||||
let root_pos = match root_pos {
|
||||
Ok(pos) => pos,
|
||||
Err(pos) => {
|
||||
if pos < self.root_blocks.len() {
|
||||
pos
|
||||
} else {
|
||||
// after end of last block: no block matches
|
||||
return Ok(ReaderOrSlice::End);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let mut next_layer_block_addr = self.root_blocks[root_pos].block_addr.clone();
|
||||
let mut last_delta_reader = None;
|
||||
for _ in 1..self.layer_count {
|
||||
// we don't enter this loop for 1 layer index
|
||||
let mut sstable_delta_reader = IndexSSTable::delta_reader(
|
||||
self.index_bytes.slice(next_layer_block_addr.byte_range),
|
||||
);
|
||||
crate::dictionary::decode_up_to_key(key, &mut sstable_delta_reader)?;
|
||||
next_layer_block_addr = sstable_delta_reader.value().clone();
|
||||
last_delta_reader = Some(sstable_delta_reader);
|
||||
}
|
||||
|
||||
if let Some(delta_reader) = last_delta_reader {
|
||||
// reconstruct the current key. We stopped either on the exact key, or just after
|
||||
// either way, common_prefix_len is something that did not change between the
|
||||
// last-key-before-target and the current pos, so those bytes must match the prefix of
|
||||
// `key`. The next bytes can be obtained from the delta reader
|
||||
let mut result_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
|
||||
let common_prefix_len = delta_reader.common_prefix_len();
|
||||
let suffix = delta_reader.suffix();
|
||||
let new_len = delta_reader.common_prefix_len() + suffix.len();
|
||||
result_key.resize(new_len, 0u8);
|
||||
result_key[..common_prefix_len].copy_from_slice(&key[..common_prefix_len]);
|
||||
result_key[common_prefix_len..].copy_from_slice(suffix);
|
||||
|
||||
let reader = crate::Reader {
|
||||
key: result_key,
|
||||
delta_reader,
|
||||
};
|
||||
Ok(ReaderOrSlice::Reader(reader))
|
||||
} else {
|
||||
// self.layer_count == 1, there is no lvl2 sstable to decode.
|
||||
Ok(ReaderOrSlice::Iter(&self.root_blocks, root_pos))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) enum ReaderOrSlice<'a> {
|
||||
Reader(crate::Reader<crate::value::index::IndexValueReader>),
|
||||
Iter(&'a [BlockMeta], usize),
|
||||
End,
|
||||
}
|
||||
|
||||
impl<'a> ReaderOrSlice<'a> {
|
||||
pub fn advance(&mut self) -> Result<bool, SSTableDataCorruption> {
|
||||
match self {
|
||||
ReaderOrSlice::Reader(reader) => {
|
||||
let res = reader.advance().map_err(|_| SSTableDataCorruption);
|
||||
if !matches!(res, Ok(true)) {
|
||||
*self = ReaderOrSlice::End;
|
||||
}
|
||||
res
|
||||
}
|
||||
ReaderOrSlice::Iter(slice, index) => {
|
||||
*index += 1;
|
||||
if *index < slice.len() {
|
||||
Ok(true)
|
||||
} else {
|
||||
*self = ReaderOrSlice::End;
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
ReaderOrSlice::End => Ok(false),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get current key. Always Some(_) unless last call to advance returned something else than
|
||||
/// Ok(true)
|
||||
pub fn key(&self) -> Option<&[u8]> {
|
||||
match self {
|
||||
ReaderOrSlice::Reader(reader) => Some(reader.key()),
|
||||
ReaderOrSlice::Iter(slice, index) => Some(&slice[*index].last_key_or_greater),
|
||||
ReaderOrSlice::End => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get current value. Always Some(_) unless last call to advance returned something else than
|
||||
/// Ok(true)
|
||||
pub fn value(&self) -> Option<&BlockAddr> {
|
||||
match self {
|
||||
ReaderOrSlice::Reader(reader) => Some(reader.value()),
|
||||
ReaderOrSlice::Iter(slice, index) => Some(&slice[*index].block_addr),
|
||||
ReaderOrSlice::End => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -124,13 +235,13 @@ impl SSTableIndexBuilder {
|
||||
/// try to find a shorter alternative to the last key of the last block
|
||||
/// that is still smaller than the next key.
|
||||
pub(crate) fn shorten_last_block_key_given_next_key(&mut self, next_key: &[u8]) {
|
||||
if let Some(last_block) = self.index.blocks.last_mut() {
|
||||
if let Some(last_block) = self.index.root_blocks.last_mut() {
|
||||
find_shorter_str_in_between(&mut last_block.last_key_or_greater, next_key);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_block(&mut self, last_key: &[u8], byte_range: Range<usize>, first_ordinal: u64) {
|
||||
self.index.blocks.push(BlockMeta {
|
||||
self.index.root_blocks.push(BlockMeta {
|
||||
last_key_or_greater: last_key.to_vec(),
|
||||
block_addr: BlockAddr {
|
||||
byte_range,
|
||||
@@ -139,31 +250,90 @@ impl SSTableIndexBuilder {
|
||||
})
|
||||
}
|
||||
|
||||
pub fn serialize<W: std::io::Write>(&self, wrt: W) -> io::Result<()> {
|
||||
// we can't use a plain writer as it would generate an index
|
||||
let mut sstable_writer = IndexSSTable::delta_writer(wrt);
|
||||
pub fn serialize<W: std::io::Write>(
|
||||
&self,
|
||||
wrt: W,
|
||||
index_max_root_blocks: std::num::NonZeroU64,
|
||||
) -> io::Result<(u32, u64)> {
|
||||
let index_max_root_blocks = index_max_root_blocks.get();
|
||||
|
||||
// in tests, set a smaller block size to stress-test
|
||||
#[cfg(test)]
|
||||
sstable_writer.set_block_len(16);
|
||||
let mut wrt = common::CountingWriter::wrap(wrt);
|
||||
let mut next_layer = write_sstable_layer(&mut wrt, &self.index.root_blocks, 0)?;
|
||||
|
||||
let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
|
||||
for block in self.index.blocks.iter() {
|
||||
let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater);
|
||||
let mut layer_count = 1;
|
||||
let mut offset = 0;
|
||||
while next_layer.len() as u64 > index_max_root_blocks {
|
||||
offset = wrt.written_bytes();
|
||||
layer_count += 1;
|
||||
|
||||
sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]);
|
||||
sstable_writer.write_value(&block.block_addr);
|
||||
sstable_writer.flush_block_if_required()?;
|
||||
|
||||
previous_key.clear();
|
||||
previous_key.extend_from_slice(&block.last_key_or_greater);
|
||||
next_layer = write_sstable_layer(&mut wrt, &next_layer, offset as usize)?;
|
||||
}
|
||||
sstable_writer.flush_block()?;
|
||||
sstable_writer.finish().write_all(&0u32.to_le_bytes())?;
|
||||
Ok(())
|
||||
Ok((layer_count, offset))
|
||||
}
|
||||
}
|
||||
|
||||
fn write_sstable_layer<W: std::io::Write>(
|
||||
wrt: W,
|
||||
layer_content: &[BlockMeta],
|
||||
offset: usize,
|
||||
) -> io::Result<Vec<BlockMeta>> {
|
||||
// we can't use a plain writer as it would generate an index
|
||||
// also disable compression, the index is small anyway, and it's the most costly part of
|
||||
// opening that kind of sstable
|
||||
let mut sstable_writer =
|
||||
crate::DeltaWriter::<_, crate::value::index::IndexValueWriter>::new_no_compression(wrt);
|
||||
|
||||
// in tests, set a smaller block size to stress-test
|
||||
#[cfg(test)]
|
||||
sstable_writer.set_block_len(16);
|
||||
|
||||
let mut next_layer = Vec::new();
|
||||
let mut previous_key = Vec::with_capacity(crate::DEFAULT_KEY_CAPACITY);
|
||||
let mut first_ordinal = None;
|
||||
for block in layer_content.iter() {
|
||||
if first_ordinal.is_none() {
|
||||
first_ordinal = Some(block.block_addr.first_ordinal);
|
||||
}
|
||||
let keep_len = common_prefix_len(&previous_key, &block.last_key_or_greater);
|
||||
|
||||
sstable_writer.write_suffix(keep_len, &block.last_key_or_greater[keep_len..]);
|
||||
sstable_writer.write_value(&block.block_addr);
|
||||
if let Some(range) = sstable_writer.flush_block_if_required()? {
|
||||
let real_range = (range.start + offset)..(range.end + offset);
|
||||
let block_meta = BlockMeta {
|
||||
last_key_or_greater: block.last_key_or_greater.clone(),
|
||||
block_addr: BlockAddr {
|
||||
byte_range: real_range,
|
||||
first_ordinal: first_ordinal.take().unwrap(),
|
||||
},
|
||||
};
|
||||
next_layer.push(block_meta);
|
||||
previous_key.clear();
|
||||
} else {
|
||||
previous_key.extend_from_slice(&block.last_key_or_greater);
|
||||
previous_key.resize(block.last_key_or_greater.len(), 0u8);
|
||||
previous_key[keep_len..].copy_from_slice(&block.last_key_or_greater[keep_len..]);
|
||||
}
|
||||
}
|
||||
if let Some(range) = sstable_writer.flush_block()? {
|
||||
if let Some(last_block) = layer_content.last() {
|
||||
// not going here means an empty table (?!)
|
||||
let real_range = (range.start + offset)..(range.end + offset);
|
||||
let block_meta = BlockMeta {
|
||||
last_key_or_greater: last_block.last_key_or_greater.clone(),
|
||||
block_addr: BlockAddr {
|
||||
byte_range: real_range,
|
||||
first_ordinal: first_ordinal.take().unwrap(),
|
||||
},
|
||||
};
|
||||
next_layer.push(block_meta);
|
||||
}
|
||||
}
|
||||
sstable_writer.finish().write_all(&0u32.to_le_bytes())?;
|
||||
|
||||
Ok(next_layer)
|
||||
}
|
||||
|
||||
/// SSTable representing an index
|
||||
///
|
||||
/// `last_key_or_greater` is used as the key, the value contains the
|
||||
@@ -194,28 +364,77 @@ mod tests {
|
||||
sstable_builder.add_block(b"ccc", 30..40, 10u64);
|
||||
sstable_builder.add_block(b"dddd", 40..50, 15u64);
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
sstable_builder.serialize(&mut buffer).unwrap();
|
||||
sstable_builder
|
||||
.serialize(&mut buffer, crate::DEFAULT_MAX_ROOT_BLOCKS)
|
||||
.unwrap();
|
||||
let buffer = OwnedBytes::new(buffer);
|
||||
let sstable_index = SSTableIndex::load(buffer).unwrap();
|
||||
let sstable_index = SSTableIndex::load(buffer, 1, 0).unwrap();
|
||||
assert_eq!(
|
||||
sstable_index.get_block_with_key(b"bbbde"),
|
||||
sstable_index.get_block_with_key(b"bbbde").unwrap(),
|
||||
Some(BlockAddr {
|
||||
first_ordinal: 10u64,
|
||||
byte_range: 30..40
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(sstable_index.locate_with_key(b"aa").unwrap(), 0);
|
||||
assert_eq!(sstable_index.locate_with_key(b"aaa").unwrap(), 0);
|
||||
assert_eq!(sstable_index.locate_with_key(b"aab").unwrap(), 1);
|
||||
assert_eq!(sstable_index.locate_with_key(b"ccc").unwrap(), 2);
|
||||
assert!(sstable_index.locate_with_key(b"e").is_none());
|
||||
assert_eq!(
|
||||
sstable_index
|
||||
.get_block_with_key(b"aa")
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.first_ordinal,
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
sstable_index
|
||||
.get_block_with_key(b"aaa")
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.first_ordinal,
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
sstable_index
|
||||
.get_block_with_key(b"aab")
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.first_ordinal,
|
||||
5
|
||||
);
|
||||
assert_eq!(
|
||||
sstable_index
|
||||
.get_block_with_key(b"ccc")
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.first_ordinal,
|
||||
10
|
||||
);
|
||||
assert!(sstable_index.get_block_with_key(b"e").unwrap().is_none());
|
||||
|
||||
assert_eq!(sstable_index.locate_with_ord(0), 0);
|
||||
assert_eq!(sstable_index.locate_with_ord(1), 0);
|
||||
assert_eq!(sstable_index.locate_with_ord(4), 0);
|
||||
assert_eq!(sstable_index.locate_with_ord(5), 1);
|
||||
assert_eq!(sstable_index.locate_with_ord(100), 3);
|
||||
assert_eq!(
|
||||
sstable_index.get_block_with_ord(0).unwrap().first_ordinal,
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
sstable_index.get_block_with_ord(1).unwrap().first_ordinal,
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
sstable_index.get_block_with_ord(4).unwrap().first_ordinal,
|
||||
0
|
||||
);
|
||||
assert_eq!(
|
||||
sstable_index.get_block_with_ord(5).unwrap().first_ordinal,
|
||||
5
|
||||
);
|
||||
assert_eq!(
|
||||
sstable_index.get_block_with_ord(6).unwrap().first_ordinal,
|
||||
5
|
||||
);
|
||||
assert_eq!(
|
||||
sstable_index.get_block_with_ord(100).unwrap().first_ordinal,
|
||||
15
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -226,10 +445,12 @@ mod tests {
|
||||
sstable_builder.add_block(b"ccc", 30..40, 10u64);
|
||||
sstable_builder.add_block(b"dddd", 40..50, 15u64);
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
sstable_builder.serialize(&mut buffer).unwrap();
|
||||
sstable_builder
|
||||
.serialize(&mut buffer, crate::DEFAULT_MAX_ROOT_BLOCKS)
|
||||
.unwrap();
|
||||
buffer[2] = 9u8;
|
||||
let buffer = OwnedBytes::new(buffer);
|
||||
let data_corruption_err = SSTableIndex::load(buffer).err().unwrap();
|
||||
let data_corruption_err = SSTableIndex::load(buffer, 1, 0).err().unwrap();
|
||||
assert!(matches!(data_corruption_err, SSTableDataCorruption));
|
||||
}
|
||||
|
||||
|
||||
@@ -110,7 +110,7 @@ where
|
||||
Bound::Included(key) | Bound::Excluded(key) => self
|
||||
.term_dict
|
||||
.sstable_index
|
||||
.get_block_with_key(key)
|
||||
.get_block_with_key(key)?
|
||||
.map(|block| block.first_ordinal)
|
||||
.unwrap_or(0),
|
||||
Bound::Unbounded => 0,
|
||||
|
||||
@@ -3,6 +3,10 @@ use std::io;
|
||||
use crate::value::{deserialize_vint_u64, ValueReader, ValueWriter};
|
||||
use crate::{vint, BlockAddr};
|
||||
|
||||
// TODO define a LazyIndexValueReader?
|
||||
// one which keeps state could be useful for ord_to_block fns,
|
||||
// one which doesn't at all woud be perfect for term_to_block fns
|
||||
// pending bench to asses real impact
|
||||
#[derive(Default)]
|
||||
pub(crate) struct IndexValueReader {
|
||||
vals: Vec<BlockAddr>,
|
||||
|
||||
Reference in New Issue
Block a user