mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-04 00:02:55 +00:00
modify getters for sstable metadata (#1793)
* add way to get up to `limit` terms from sstable * make some function of sstable load less data * add some tests to sstable * add tests on sstable dictionary * fix some bugs with sstable
This commit is contained in:
@@ -69,14 +69,16 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
pub(crate) fn sstable_delta_reader_for_key_range(
|
||||
&self,
|
||||
key_range: impl RangeBounds<[u8]>,
|
||||
limit: Option<u64>,
|
||||
) -> io::Result<DeltaReader<'static, TSSTable::ValueReader>> {
|
||||
let slice = self.file_slice_for_range(key_range);
|
||||
let slice = self.file_slice_for_range(key_range, limit);
|
||||
let data = slice.read_bytes()?;
|
||||
Ok(TSSTable::delta_reader(data))
|
||||
}
|
||||
|
||||
/// This function returns a file slice covering a set of sstable blocks
|
||||
/// that include the key range passed in arguments.
|
||||
/// that include the key range passed in arguments. Optionally returns
|
||||
/// only block for up to `limit` matching terms.
|
||||
///
|
||||
/// It works by identifying
|
||||
/// - `first_block`: the block containing the start boudary key
|
||||
@@ -92,26 +94,56 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
/// On the rare edge case where a user asks for `(start_key, end_key]`
|
||||
/// and `start_key` happens to be the last key of a block, we return a
|
||||
/// slice that is the first block was not necessary.
|
||||
pub fn file_slice_for_range(&self, key_range: impl RangeBounds<[u8]>) -> FileSlice {
|
||||
let start_bound: Bound<usize> = match key_range.start_bound() {
|
||||
pub fn file_slice_for_range(
|
||||
&self,
|
||||
key_range: impl RangeBounds<[u8]>,
|
||||
limit: Option<u64>,
|
||||
) -> FileSlice {
|
||||
let first_block_id = match key_range.start_bound() {
|
||||
Bound::Included(key) | Bound::Excluded(key) => {
|
||||
let Some(first_block_addr) = self.sstable_index.search_block(key) else {
|
||||
let Some(first_block_id) = self.sstable_index.locate_with_key(key) else {
|
||||
return FileSlice::empty();
|
||||
};
|
||||
Bound::Included(first_block_addr.byte_range.start)
|
||||
Some(first_block_id)
|
||||
}
|
||||
Bound::Unbounded => Bound::Unbounded,
|
||||
Bound::Unbounded => None,
|
||||
};
|
||||
let end_bound: Bound<usize> = match key_range.end_bound() {
|
||||
Bound::Included(key) | Bound::Excluded(key) => {
|
||||
if let Some(block_addr) = self.sstable_index.search_block(key) {
|
||||
Bound::Excluded(block_addr.byte_range.end)
|
||||
|
||||
let last_block_id = match key_range.end_bound() {
|
||||
Bound::Included(key) | Bound::Excluded(key) => self.sstable_index.locate_with_key(key),
|
||||
Bound::Unbounded => None,
|
||||
};
|
||||
|
||||
let start_bound = if let Some(first_block_id) = first_block_id {
|
||||
let Some(block_addr) = self.sstable_index.get_block(first_block_id) else {
|
||||
return FileSlice::empty();
|
||||
};
|
||||
Bound::Included(block_addr.byte_range.start)
|
||||
} else {
|
||||
Bound::Unbounded
|
||||
};
|
||||
|
||||
let last_block_id = if let Some(limit) = limit {
|
||||
let second_block_id = first_block_id.map(|id| id + 1).unwrap_or(0);
|
||||
if let Some(block_addr) = self.sstable_index.get_block(second_block_id) {
|
||||
let ordinal_limit = block_addr.first_ordinal + limit;
|
||||
let last_block_limit = self.sstable_index.locate_with_ord(ordinal_limit);
|
||||
if let Some(last_block_id) = last_block_id {
|
||||
Some(last_block_id.min(last_block_limit))
|
||||
} else {
|
||||
Bound::Unbounded
|
||||
Some(last_block_limit)
|
||||
}
|
||||
} else {
|
||||
last_block_id
|
||||
}
|
||||
Bound::Unbounded => Bound::Unbounded,
|
||||
} else {
|
||||
last_block_id
|
||||
};
|
||||
let end_bound = last_block_id
|
||||
.and_then(|block_id| self.sstable_index.get_block(block_id))
|
||||
.map(|block_addr| Bound::Excluded(block_addr.byte_range.end))
|
||||
.unwrap_or(Bound::Unbounded);
|
||||
|
||||
self.sstable_slice.slice((start_bound, end_bound))
|
||||
}
|
||||
|
||||
@@ -156,10 +188,15 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
|
||||
/// Returns the ordinal associated with a given term.
|
||||
pub fn term_ord<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TermOrdinal>> {
|
||||
let mut term_ord = 0u64;
|
||||
let key_bytes = key.as_ref();
|
||||
let mut sstable_reader = self.sstable_reader()?;
|
||||
while sstable_reader.advance().unwrap_or(false) {
|
||||
|
||||
let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let mut term_ord = block_addr.first_ordinal;
|
||||
let mut sstable_reader = self.sstable_reader_block(block_addr)?;
|
||||
while sstable_reader.advance()? {
|
||||
if sstable_reader.key() == key_bytes {
|
||||
return Ok(Some(term_ord));
|
||||
}
|
||||
@@ -178,22 +215,32 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
/// Regardless of whether the term is found or not,
|
||||
/// the buffer may be modified.
|
||||
pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec<u8>) -> io::Result<bool> {
|
||||
let mut sstable_reader = self.sstable_reader()?;
|
||||
bytes.clear();
|
||||
for _ in 0..(ord + 1) {
|
||||
if !sstable_reader.advance().unwrap_or(false) {
|
||||
// find block in which the term would be
|
||||
let block_addr = self.sstable_index.get_block_with_ord(ord);
|
||||
let first_ordinal = block_addr.first_ordinal;
|
||||
|
||||
// then search inside that block only
|
||||
let mut sstable_reader = self.sstable_reader_block(block_addr)?;
|
||||
for _ in first_ordinal..=ord {
|
||||
if !sstable_reader.advance()? {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
bytes.clear();
|
||||
bytes.extend_from_slice(sstable_reader.key());
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Returns the number of terms in the dictionary.
|
||||
pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result<Option<TSSTable::Value>> {
|
||||
let mut sstable_reader = self.sstable_reader()?;
|
||||
for _ in 0..(term_ord + 1) {
|
||||
if !sstable_reader.advance().unwrap_or(false) {
|
||||
// find block in which the term would be
|
||||
let block_addr = self.sstable_index.get_block_with_ord(term_ord);
|
||||
let first_ordinal = block_addr.first_ordinal;
|
||||
|
||||
// then search inside that block only
|
||||
let mut sstable_reader = self.sstable_reader_block(block_addr)?;
|
||||
for _ in first_ordinal..=term_ord {
|
||||
if !sstable_reader.advance()? {
|
||||
return Ok(None);
|
||||
}
|
||||
}
|
||||
@@ -202,10 +249,10 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub fn get<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
|
||||
if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) {
|
||||
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
|
||||
let mut sstable_reader = self.sstable_reader_block(block_addr)?;
|
||||
let key_bytes = key.as_ref();
|
||||
while sstable_reader.advance().unwrap_or(false) {
|
||||
while sstable_reader.advance()? {
|
||||
if sstable_reader.key() == key_bytes {
|
||||
let value = sstable_reader.value().clone();
|
||||
return Ok(Some(value));
|
||||
@@ -217,10 +264,10 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
|
||||
/// Lookups the value corresponding to the key.
|
||||
pub async fn get_async<K: AsRef<[u8]>>(&self, key: K) -> io::Result<Option<TSSTable::Value>> {
|
||||
if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) {
|
||||
if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) {
|
||||
let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?;
|
||||
let key_bytes = key.as_ref();
|
||||
while sstable_reader.advance().unwrap_or(false) {
|
||||
while sstable_reader.advance()? {
|
||||
if sstable_reader.key() == key_bytes {
|
||||
let value = sstable_reader.value().clone();
|
||||
return Ok(Some(value));
|
||||
@@ -259,3 +306,192 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::ops::Range;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use common::OwnedBytes;
|
||||
|
||||
use super::Dictionary;
|
||||
use crate::MonotonicU64SSTable;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct PermissionedHandle {
|
||||
bytes: OwnedBytes,
|
||||
allowed_range: Mutex<Range<usize>>,
|
||||
}
|
||||
|
||||
impl PermissionedHandle {
|
||||
fn new(bytes: Vec<u8>) -> Self {
|
||||
let bytes = OwnedBytes::new(bytes);
|
||||
PermissionedHandle {
|
||||
allowed_range: Mutex::new(0..bytes.len()),
|
||||
bytes,
|
||||
}
|
||||
}
|
||||
|
||||
fn restrict(&self, range: Range<usize>) {
|
||||
*self.allowed_range.lock().unwrap() = range;
|
||||
}
|
||||
}
|
||||
|
||||
impl common::HasLen for PermissionedHandle {
|
||||
fn len(&self) -> usize {
|
||||
self.bytes.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl common::file_slice::FileHandle for PermissionedHandle {
|
||||
fn read_bytes(&self, range: Range<usize>) -> std::io::Result<OwnedBytes> {
|
||||
let allowed_range = self.allowed_range.lock().unwrap();
|
||||
if !allowed_range.contains(&range.start) || !allowed_range.contains(&(range.end - 1)) {
|
||||
return Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("invalid range, allowed {allowed_range:?}, requested {range:?}"),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(self.bytes.slice(range))
|
||||
}
|
||||
}
|
||||
|
||||
fn make_test_sstable() -> (Dictionary<MonotonicU64SSTable>, Arc<PermissionedHandle>) {
|
||||
let mut builder = Dictionary::<MonotonicU64SSTable>::builder(Vec::new()).unwrap();
|
||||
|
||||
// this makes 256k keys, enough to fill multiple blocks.
|
||||
for elem in 0..0x3ffff {
|
||||
let key = format!("{elem:05X}").into_bytes();
|
||||
builder.insert_cannot_fail(&key, &elem);
|
||||
}
|
||||
|
||||
let table = builder.finish().unwrap();
|
||||
let table = Arc::new(PermissionedHandle::new(table));
|
||||
let slice = common::file_slice::FileSlice::new(table.clone());
|
||||
|
||||
let dictionary = Dictionary::<MonotonicU64SSTable>::open(slice).unwrap();
|
||||
|
||||
// if the last block is id 0, tests are meaningless
|
||||
assert_ne!(dictionary.sstable_index.locate_with_ord(u64::MAX), 0);
|
||||
assert_eq!(dictionary.num_terms(), 0x3ffff);
|
||||
(dictionary, table)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_ord_term_conversion() {
|
||||
let (dic, slice) = make_test_sstable();
|
||||
|
||||
let block = dic.sstable_index.get_block_with_ord(100_000);
|
||||
slice.restrict(block.byte_range);
|
||||
|
||||
let mut res = Vec::new();
|
||||
|
||||
// middle of a block
|
||||
assert!(dic.ord_to_term(100_000, &mut res).unwrap());
|
||||
assert_eq!(res, format!("{:05X}", 100_000).into_bytes());
|
||||
assert_eq!(dic.term_info_from_ord(100_000).unwrap().unwrap(), 100_000);
|
||||
assert_eq!(dic.get(&res).unwrap().unwrap(), 100_000);
|
||||
assert_eq!(dic.term_ord(&res).unwrap().unwrap(), 100_000);
|
||||
|
||||
// start of a block
|
||||
assert!(dic.ord_to_term(block.first_ordinal, &mut res).unwrap());
|
||||
assert_eq!(res, format!("{:05X}", block.first_ordinal).into_bytes());
|
||||
assert_eq!(
|
||||
dic.term_info_from_ord(block.first_ordinal)
|
||||
.unwrap()
|
||||
.unwrap(),
|
||||
block.first_ordinal
|
||||
);
|
||||
assert_eq!(dic.get(&res).unwrap().unwrap(), block.first_ordinal);
|
||||
assert_eq!(dic.term_ord(&res).unwrap().unwrap(), block.first_ordinal);
|
||||
|
||||
// end of a block
|
||||
let ordinal = block.first_ordinal - 1;
|
||||
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
|
||||
slice.restrict(new_range);
|
||||
assert!(dic.ord_to_term(ordinal, &mut res).unwrap());
|
||||
assert_eq!(res, format!("{:05X}", ordinal).into_bytes());
|
||||
assert_eq!(dic.term_info_from_ord(ordinal).unwrap().unwrap(), ordinal);
|
||||
assert_eq!(dic.get(&res).unwrap().unwrap(), ordinal);
|
||||
assert_eq!(dic.term_ord(&res).unwrap().unwrap(), ordinal);
|
||||
|
||||
// before first block
|
||||
// 1st block must be loaded for key-related operations
|
||||
let block = dic.sstable_index.get_block_with_ord(0);
|
||||
slice.restrict(block.byte_range);
|
||||
|
||||
assert!(dic.get(&b"$$$").unwrap().is_none());
|
||||
assert!(dic.term_ord(&b"$$$").unwrap().is_none());
|
||||
|
||||
// after last block
|
||||
// last block must be loaded for ord related operations
|
||||
let ordinal = 0x40000 + 10;
|
||||
let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range;
|
||||
slice.restrict(new_range);
|
||||
assert!(!dic.ord_to_term(ordinal, &mut res).unwrap());
|
||||
assert!(dic.term_info_from_ord(ordinal).unwrap().is_none());
|
||||
|
||||
// last block isn't required to be loaded for key related operations
|
||||
slice.restrict(0..0);
|
||||
assert!(dic.get(&b"~~~").unwrap().is_none());
|
||||
assert!(dic.term_ord(&b"~~~").unwrap().is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_range() {
|
||||
let (dic, slice) = make_test_sstable();
|
||||
|
||||
let start = dic
|
||||
.sstable_index
|
||||
.get_block_with_key(b"10000")
|
||||
.unwrap()
|
||||
.byte_range;
|
||||
let end = dic
|
||||
.sstable_index
|
||||
.get_block_with_key(b"18000")
|
||||
.unwrap()
|
||||
.byte_range;
|
||||
slice.restrict(start.start..end.end);
|
||||
|
||||
let mut stream = dic.range().ge(b"10000").lt(b"18000").into_stream().unwrap();
|
||||
|
||||
for i in 0x10000..0x18000 {
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.term_ord(), i);
|
||||
assert_eq!(stream.value(), &i);
|
||||
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
|
||||
// verify limiting the number of results reduce the size read
|
||||
slice.restrict(start.start..(end.end - 1));
|
||||
|
||||
let mut stream = dic
|
||||
.range()
|
||||
.ge(b"10000")
|
||||
.lt(b"18000")
|
||||
.limit(0xfff)
|
||||
.into_stream()
|
||||
.unwrap();
|
||||
|
||||
for i in 0x10000..0x10fff {
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.term_ord(), i);
|
||||
assert_eq!(stream.value(), &i);
|
||||
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
|
||||
}
|
||||
// there might be more successful elements after, though how many is undefined
|
||||
|
||||
slice.restrict(0..slice.bytes.len());
|
||||
|
||||
let mut stream = dic.stream().unwrap();
|
||||
for i in 0..0x3ffff {
|
||||
assert!(stream.advance());
|
||||
assert_eq!(stream.term_ord(), i);
|
||||
assert_eq!(stream.value(), &i);
|
||||
assert_eq!(stream.key(), format!("{i:05X}").into_bytes());
|
||||
}
|
||||
assert!(!stream.advance());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use std::ops::Range;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{common_prefix_len, SSTableDataCorruption};
|
||||
use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal};
|
||||
|
||||
#[derive(Default, Debug, Serialize, Deserialize)]
|
||||
pub struct SSTableIndex {
|
||||
@@ -11,15 +11,61 @@ pub struct SSTableIndex {
|
||||
}
|
||||
|
||||
impl SSTableIndex {
|
||||
/// Load an index from its binary representation
|
||||
pub fn load(data: &[u8]) -> Result<SSTableIndex, SSTableDataCorruption> {
|
||||
ciborium::de::from_reader(data).map_err(|_| SSTableDataCorruption)
|
||||
}
|
||||
|
||||
pub fn search_block(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
/// Get the [`BlockAddr`] of the requested block.
|
||||
pub(crate) fn get_block(&self, block_id: usize) -> Option<BlockAddr> {
|
||||
self.blocks
|
||||
.iter()
|
||||
.find(|block| &block.last_key_or_greater[..] >= key)
|
||||
.map(|block| block.block_addr.clone())
|
||||
.get(block_id)
|
||||
.map(|block_meta| block_meta.block_addr.clone())
|
||||
}
|
||||
|
||||
/// Get the block id of the block that woudl contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option<usize> {
|
||||
let pos = self
|
||||
.blocks
|
||||
.binary_search_by_key(&key, |block| &block.last_key_or_greater);
|
||||
match pos {
|
||||
Ok(pos) => Some(pos),
|
||||
Err(pos) => {
|
||||
if pos < self.blocks.len() {
|
||||
Some(pos)
|
||||
} else {
|
||||
// after end of last block: no block matches
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block that would contain `key`.
|
||||
///
|
||||
/// Returns None if `key` is lexicographically after the last key recorded.
|
||||
pub fn get_block_with_key(&self, key: &[u8]) -> Option<BlockAddr> {
|
||||
self.locate_with_key(key).and_then(|id| self.get_block(id))
|
||||
}
|
||||
|
||||
pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize {
|
||||
let pos = self
|
||||
.blocks
|
||||
.binary_search_by_key(&ord, |block| block.block_addr.first_ordinal);
|
||||
|
||||
match pos {
|
||||
Ok(pos) => pos,
|
||||
// Err(0) can't happen as the sstable starts with ordinal zero
|
||||
Err(pos) => pos - 1,
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the [`BlockAddr`] of the block containing the `ord`-th term.
|
||||
pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr {
|
||||
// locate_with_ord always returns an index within range
|
||||
self.get_block(self.locate_with_ord(ord)).unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -30,7 +76,7 @@ pub struct BlockAddr {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct BlockMeta {
|
||||
pub(crate) struct BlockMeta {
|
||||
/// Any byte string that is lexicographically greater or equal to
|
||||
/// the last key in the block,
|
||||
/// and yet strictly smaller than the first key in the next block.
|
||||
@@ -98,26 +144,38 @@ mod tests {
|
||||
fn test_sstable_index() {
|
||||
let mut sstable_builder = SSTableIndexBuilder::default();
|
||||
sstable_builder.add_block(b"aaa", 10..20, 0u64);
|
||||
sstable_builder.add_block(b"bbbbbbb", 20..30, 564);
|
||||
sstable_builder.add_block(b"bbbbbbb", 20..30, 5u64);
|
||||
sstable_builder.add_block(b"ccc", 30..40, 10u64);
|
||||
sstable_builder.add_block(b"dddd", 40..50, 15u64);
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
sstable_builder.serialize(&mut buffer).unwrap();
|
||||
let sstable_index = SSTableIndex::load(&buffer[..]).unwrap();
|
||||
assert_eq!(
|
||||
sstable_index.search_block(b"bbbde"),
|
||||
sstable_index.get_block_with_key(b"bbbde"),
|
||||
Some(BlockAddr {
|
||||
first_ordinal: 10u64,
|
||||
byte_range: 30..40
|
||||
})
|
||||
);
|
||||
|
||||
assert_eq!(sstable_index.locate_with_key(b"aa").unwrap(), 0);
|
||||
assert_eq!(sstable_index.locate_with_key(b"aaa").unwrap(), 0);
|
||||
assert_eq!(sstable_index.locate_with_key(b"aab").unwrap(), 1);
|
||||
assert_eq!(sstable_index.locate_with_key(b"ccc").unwrap(), 2);
|
||||
assert!(sstable_index.locate_with_key(b"e").is_none());
|
||||
|
||||
assert_eq!(sstable_index.locate_with_ord(0), 0);
|
||||
assert_eq!(sstable_index.locate_with_ord(1), 0);
|
||||
assert_eq!(sstable_index.locate_with_ord(4), 0);
|
||||
assert_eq!(sstable_index.locate_with_ord(5), 1);
|
||||
assert_eq!(sstable_index.locate_with_ord(100), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_sstable_with_corrupted_data() {
|
||||
let mut sstable_builder = SSTableIndexBuilder::default();
|
||||
sstable_builder.add_block(b"aaa", 10..20, 0u64);
|
||||
sstable_builder.add_block(b"bbbbbbb", 20..30, 564);
|
||||
sstable_builder.add_block(b"bbbbbbb", 20..30, 5u64);
|
||||
sstable_builder.add_block(b"ccc", 30..40, 10u64);
|
||||
sstable_builder.add_block(b"dddd", 40..50, 15u64);
|
||||
let mut buffer: Vec<u8> = Vec::new();
|
||||
|
||||
@@ -19,6 +19,7 @@ where
|
||||
automaton: A,
|
||||
lower: Bound<Vec<u8>>,
|
||||
upper: Bound<Vec<u8>>,
|
||||
limit: Option<u64>,
|
||||
}
|
||||
|
||||
fn bound_as_byte_slice(bound: &Bound<Vec<u8>>) -> Bound<&[u8]> {
|
||||
@@ -41,6 +42,7 @@ where
|
||||
automaton,
|
||||
lower: Bound::Unbounded,
|
||||
upper: Bound::Unbounded,
|
||||
limit: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -68,24 +70,46 @@ where
|
||||
self
|
||||
}
|
||||
|
||||
/// Load no more data than what's required to to get `limit`
|
||||
/// matching entries.
|
||||
///
|
||||
/// The resulting [`Streamer`] can still return marginaly
|
||||
/// more than `limit` elements.
|
||||
pub fn limit(mut self, limit: u64) -> Self {
|
||||
self.limit = Some(limit);
|
||||
self
|
||||
}
|
||||
|
||||
/// Creates the stream corresponding to the range
|
||||
/// of terms defined using the `StreamerBuilder`.
|
||||
pub fn into_stream(self) -> io::Result<Streamer<'a, TSSTable, A>> {
|
||||
// TODO Optimize by skipping to the right first block.
|
||||
let start_state = self.automaton.start();
|
||||
|
||||
let key_range = (
|
||||
bound_as_byte_slice(&self.lower),
|
||||
bound_as_byte_slice(&self.upper),
|
||||
);
|
||||
|
||||
let first_term = match &key_range.0 {
|
||||
Bound::Included(key) | Bound::Excluded(key) => self
|
||||
.term_dict
|
||||
.sstable_index
|
||||
.get_block_with_key(key)
|
||||
.map(|block| block.first_ordinal)
|
||||
.unwrap_or(0),
|
||||
Bound::Unbounded => 0,
|
||||
};
|
||||
|
||||
let delta_reader = self
|
||||
.term_dict
|
||||
.sstable_delta_reader_for_key_range(key_range)?;
|
||||
.sstable_delta_reader_for_key_range(key_range, self.limit)?;
|
||||
Ok(Streamer {
|
||||
automaton: self.automaton,
|
||||
states: vec![start_state],
|
||||
delta_reader,
|
||||
key: Vec::new(),
|
||||
term_ord: None,
|
||||
term_ord: first_term.checked_sub(1),
|
||||
lower_bound: self.lower,
|
||||
upper_bound: self.upper,
|
||||
})
|
||||
|
||||
@@ -16,7 +16,7 @@ pub trait ValueReader: Default {
|
||||
|
||||
/// Loads a block.
|
||||
///
|
||||
/// Returns the number of bytes that were written.
|
||||
/// Returns the number of bytes that were read.
|
||||
fn load(&mut self, data: &[u8]) -> io::Result<usize>;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user