diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 594bc0dc3..18559ba9b 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -69,14 +69,16 @@ impl Dictionary { pub(crate) fn sstable_delta_reader_for_key_range( &self, key_range: impl RangeBounds<[u8]>, + limit: Option, ) -> io::Result> { - let slice = self.file_slice_for_range(key_range); + let slice = self.file_slice_for_range(key_range, limit); let data = slice.read_bytes()?; Ok(TSSTable::delta_reader(data)) } /// This function returns a file slice covering a set of sstable blocks - /// that include the key range passed in arguments. + /// that include the key range passed in arguments. Optionally returns + /// only block for up to `limit` matching terms. /// /// It works by identifying /// - `first_block`: the block containing the start boudary key @@ -92,26 +94,56 @@ impl Dictionary { /// On the rare edge case where a user asks for `(start_key, end_key]` /// and `start_key` happens to be the last key of a block, we return a /// slice that is the first block was not necessary. - pub fn file_slice_for_range(&self, key_range: impl RangeBounds<[u8]>) -> FileSlice { - let start_bound: Bound = match key_range.start_bound() { + pub fn file_slice_for_range( + &self, + key_range: impl RangeBounds<[u8]>, + limit: Option, + ) -> FileSlice { + let first_block_id = match key_range.start_bound() { Bound::Included(key) | Bound::Excluded(key) => { - let Some(first_block_addr) = self.sstable_index.search_block(key) else { + let Some(first_block_id) = self.sstable_index.locate_with_key(key) else { return FileSlice::empty(); }; - Bound::Included(first_block_addr.byte_range.start) + Some(first_block_id) } - Bound::Unbounded => Bound::Unbounded, + Bound::Unbounded => None, }; - let end_bound: Bound = match key_range.end_bound() { - Bound::Included(key) | Bound::Excluded(key) => { - if let Some(block_addr) = self.sstable_index.search_block(key) { - Bound::Excluded(block_addr.byte_range.end) + + let last_block_id = match key_range.end_bound() { + Bound::Included(key) | Bound::Excluded(key) => self.sstable_index.locate_with_key(key), + Bound::Unbounded => None, + }; + + let start_bound = if let Some(first_block_id) = first_block_id { + let Some(block_addr) = self.sstable_index.get_block(first_block_id) else { + return FileSlice::empty(); + }; + Bound::Included(block_addr.byte_range.start) + } else { + Bound::Unbounded + }; + + let last_block_id = if let Some(limit) = limit { + let second_block_id = first_block_id.map(|id| id + 1).unwrap_or(0); + if let Some(block_addr) = self.sstable_index.get_block(second_block_id) { + let ordinal_limit = block_addr.first_ordinal + limit; + let last_block_limit = self.sstable_index.locate_with_ord(ordinal_limit); + if let Some(last_block_id) = last_block_id { + Some(last_block_id.min(last_block_limit)) } else { - Bound::Unbounded + Some(last_block_limit) } + } else { + last_block_id } - Bound::Unbounded => Bound::Unbounded, + } else { + last_block_id }; + let end_bound = last_block_id + .and_then(|block_id| self.sstable_index.get_block(block_id)) + .map(|block_addr| Bound::Excluded(block_addr.byte_range.end)) + .unwrap_or(Bound::Unbounded); + self.sstable_slice.slice((start_bound, end_bound)) } @@ -156,10 +188,15 @@ impl Dictionary { /// Returns the ordinal associated with a given term. pub fn term_ord>(&self, key: K) -> io::Result> { - let mut term_ord = 0u64; let key_bytes = key.as_ref(); - let mut sstable_reader = self.sstable_reader()?; - while sstable_reader.advance().unwrap_or(false) { + + let Some(block_addr) = self.sstable_index.get_block_with_key(key_bytes) else { + return Ok(None); + }; + + let mut term_ord = block_addr.first_ordinal; + let mut sstable_reader = self.sstable_reader_block(block_addr)?; + while sstable_reader.advance()? { if sstable_reader.key() == key_bytes { return Ok(Some(term_ord)); } @@ -178,22 +215,32 @@ impl Dictionary { /// Regardless of whether the term is found or not, /// the buffer may be modified. pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec) -> io::Result { - let mut sstable_reader = self.sstable_reader()?; - bytes.clear(); - for _ in 0..(ord + 1) { - if !sstable_reader.advance().unwrap_or(false) { + // find block in which the term would be + let block_addr = self.sstable_index.get_block_with_ord(ord); + let first_ordinal = block_addr.first_ordinal; + + // then search inside that block only + let mut sstable_reader = self.sstable_reader_block(block_addr)?; + for _ in first_ordinal..=ord { + if !sstable_reader.advance()? { return Ok(false); } } + bytes.clear(); bytes.extend_from_slice(sstable_reader.key()); Ok(true) } /// Returns the number of terms in the dictionary. pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result> { - let mut sstable_reader = self.sstable_reader()?; - for _ in 0..(term_ord + 1) { - if !sstable_reader.advance().unwrap_or(false) { + // find block in which the term would be + let block_addr = self.sstable_index.get_block_with_ord(term_ord); + let first_ordinal = block_addr.first_ordinal; + + // then search inside that block only + let mut sstable_reader = self.sstable_reader_block(block_addr)?; + for _ in first_ordinal..=term_ord { + if !sstable_reader.advance()? { return Ok(None); } } @@ -202,10 +249,10 @@ impl Dictionary { /// Lookups the value corresponding to the key. pub fn get>(&self, key: K) -> io::Result> { - if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) { + if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) { let mut sstable_reader = self.sstable_reader_block(block_addr)?; let key_bytes = key.as_ref(); - while sstable_reader.advance().unwrap_or(false) { + while sstable_reader.advance()? { if sstable_reader.key() == key_bytes { let value = sstable_reader.value().clone(); return Ok(Some(value)); @@ -217,10 +264,10 @@ impl Dictionary { /// Lookups the value corresponding to the key. pub async fn get_async>(&self, key: K) -> io::Result> { - if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) { + if let Some(block_addr) = self.sstable_index.get_block_with_key(key.as_ref()) { let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?; let key_bytes = key.as_ref(); - while sstable_reader.advance().unwrap_or(false) { + while sstable_reader.advance()? { if sstable_reader.key() == key_bytes { let value = sstable_reader.value().clone(); return Ok(Some(value)); @@ -259,3 +306,192 @@ impl Dictionary { Ok(()) } } + +#[cfg(test)] +mod tests { + use std::ops::Range; + use std::sync::{Arc, Mutex}; + + use common::OwnedBytes; + + use super::Dictionary; + use crate::MonotonicU64SSTable; + + #[derive(Debug)] + struct PermissionedHandle { + bytes: OwnedBytes, + allowed_range: Mutex>, + } + + impl PermissionedHandle { + fn new(bytes: Vec) -> Self { + let bytes = OwnedBytes::new(bytes); + PermissionedHandle { + allowed_range: Mutex::new(0..bytes.len()), + bytes, + } + } + + fn restrict(&self, range: Range) { + *self.allowed_range.lock().unwrap() = range; + } + } + + impl common::HasLen for PermissionedHandle { + fn len(&self) -> usize { + self.bytes.len() + } + } + + impl common::file_slice::FileHandle for PermissionedHandle { + fn read_bytes(&self, range: Range) -> std::io::Result { + let allowed_range = self.allowed_range.lock().unwrap(); + if !allowed_range.contains(&range.start) || !allowed_range.contains(&(range.end - 1)) { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + format!("invalid range, allowed {allowed_range:?}, requested {range:?}"), + )); + } + + Ok(self.bytes.slice(range)) + } + } + + fn make_test_sstable() -> (Dictionary, Arc) { + let mut builder = Dictionary::::builder(Vec::new()).unwrap(); + + // this makes 256k keys, enough to fill multiple blocks. + for elem in 0..0x3ffff { + let key = format!("{elem:05X}").into_bytes(); + builder.insert_cannot_fail(&key, &elem); + } + + let table = builder.finish().unwrap(); + let table = Arc::new(PermissionedHandle::new(table)); + let slice = common::file_slice::FileSlice::new(table.clone()); + + let dictionary = Dictionary::::open(slice).unwrap(); + + // if the last block is id 0, tests are meaningless + assert_ne!(dictionary.sstable_index.locate_with_ord(u64::MAX), 0); + assert_eq!(dictionary.num_terms(), 0x3ffff); + (dictionary, table) + } + + #[test] + fn test_ord_term_conversion() { + let (dic, slice) = make_test_sstable(); + + let block = dic.sstable_index.get_block_with_ord(100_000); + slice.restrict(block.byte_range); + + let mut res = Vec::new(); + + // middle of a block + assert!(dic.ord_to_term(100_000, &mut res).unwrap()); + assert_eq!(res, format!("{:05X}", 100_000).into_bytes()); + assert_eq!(dic.term_info_from_ord(100_000).unwrap().unwrap(), 100_000); + assert_eq!(dic.get(&res).unwrap().unwrap(), 100_000); + assert_eq!(dic.term_ord(&res).unwrap().unwrap(), 100_000); + + // start of a block + assert!(dic.ord_to_term(block.first_ordinal, &mut res).unwrap()); + assert_eq!(res, format!("{:05X}", block.first_ordinal).into_bytes()); + assert_eq!( + dic.term_info_from_ord(block.first_ordinal) + .unwrap() + .unwrap(), + block.first_ordinal + ); + assert_eq!(dic.get(&res).unwrap().unwrap(), block.first_ordinal); + assert_eq!(dic.term_ord(&res).unwrap().unwrap(), block.first_ordinal); + + // end of a block + let ordinal = block.first_ordinal - 1; + let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range; + slice.restrict(new_range); + assert!(dic.ord_to_term(ordinal, &mut res).unwrap()); + assert_eq!(res, format!("{:05X}", ordinal).into_bytes()); + assert_eq!(dic.term_info_from_ord(ordinal).unwrap().unwrap(), ordinal); + assert_eq!(dic.get(&res).unwrap().unwrap(), ordinal); + assert_eq!(dic.term_ord(&res).unwrap().unwrap(), ordinal); + + // before first block + // 1st block must be loaded for key-related operations + let block = dic.sstable_index.get_block_with_ord(0); + slice.restrict(block.byte_range); + + assert!(dic.get(&b"$$$").unwrap().is_none()); + assert!(dic.term_ord(&b"$$$").unwrap().is_none()); + + // after last block + // last block must be loaded for ord related operations + let ordinal = 0x40000 + 10; + let new_range = dic.sstable_index.get_block_with_ord(ordinal).byte_range; + slice.restrict(new_range); + assert!(!dic.ord_to_term(ordinal, &mut res).unwrap()); + assert!(dic.term_info_from_ord(ordinal).unwrap().is_none()); + + // last block isn't required to be loaded for key related operations + slice.restrict(0..0); + assert!(dic.get(&b"~~~").unwrap().is_none()); + assert!(dic.term_ord(&b"~~~").unwrap().is_none()); + } + + #[test] + fn test_range() { + let (dic, slice) = make_test_sstable(); + + let start = dic + .sstable_index + .get_block_with_key(b"10000") + .unwrap() + .byte_range; + let end = dic + .sstable_index + .get_block_with_key(b"18000") + .unwrap() + .byte_range; + slice.restrict(start.start..end.end); + + let mut stream = dic.range().ge(b"10000").lt(b"18000").into_stream().unwrap(); + + for i in 0x10000..0x18000 { + assert!(stream.advance()); + assert_eq!(stream.term_ord(), i); + assert_eq!(stream.value(), &i); + assert_eq!(stream.key(), format!("{i:05X}").into_bytes()); + } + assert!(!stream.advance()); + + // verify limiting the number of results reduce the size read + slice.restrict(start.start..(end.end - 1)); + + let mut stream = dic + .range() + .ge(b"10000") + .lt(b"18000") + .limit(0xfff) + .into_stream() + .unwrap(); + + for i in 0x10000..0x10fff { + assert!(stream.advance()); + assert_eq!(stream.term_ord(), i); + assert_eq!(stream.value(), &i); + assert_eq!(stream.key(), format!("{i:05X}").into_bytes()); + } + // there might be more successful elements after, though how many is undefined + + slice.restrict(0..slice.bytes.len()); + + let mut stream = dic.stream().unwrap(); + for i in 0..0x3ffff { + assert!(stream.advance()); + assert_eq!(stream.term_ord(), i); + assert_eq!(stream.value(), &i); + assert_eq!(stream.key(), format!("{i:05X}").into_bytes()); + } + assert!(!stream.advance()); + } +} diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index 8e73918f9..ccf997f05 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -3,7 +3,7 @@ use std::ops::Range; use serde::{Deserialize, Serialize}; -use crate::{common_prefix_len, SSTableDataCorruption}; +use crate::{common_prefix_len, SSTableDataCorruption, TermOrdinal}; #[derive(Default, Debug, Serialize, Deserialize)] pub struct SSTableIndex { @@ -11,15 +11,61 @@ pub struct SSTableIndex { } impl SSTableIndex { + /// Load an index from its binary representation pub fn load(data: &[u8]) -> Result { ciborium::de::from_reader(data).map_err(|_| SSTableDataCorruption) } - pub fn search_block(&self, key: &[u8]) -> Option { + /// Get the [`BlockAddr`] of the requested block. + pub(crate) fn get_block(&self, block_id: usize) -> Option { self.blocks - .iter() - .find(|block| &block.last_key_or_greater[..] >= key) - .map(|block| block.block_addr.clone()) + .get(block_id) + .map(|block_meta| block_meta.block_addr.clone()) + } + + /// Get the block id of the block that woudl contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub(crate) fn locate_with_key(&self, key: &[u8]) -> Option { + let pos = self + .blocks + .binary_search_by_key(&key, |block| &block.last_key_or_greater); + match pos { + Ok(pos) => Some(pos), + Err(pos) => { + if pos < self.blocks.len() { + Some(pos) + } else { + // after end of last block: no block matches + None + } + } + } + } + + /// Get the [`BlockAddr`] of the block that would contain `key`. + /// + /// Returns None if `key` is lexicographically after the last key recorded. + pub fn get_block_with_key(&self, key: &[u8]) -> Option { + self.locate_with_key(key).and_then(|id| self.get_block(id)) + } + + pub(crate) fn locate_with_ord(&self, ord: TermOrdinal) -> usize { + let pos = self + .blocks + .binary_search_by_key(&ord, |block| block.block_addr.first_ordinal); + + match pos { + Ok(pos) => pos, + // Err(0) can't happen as the sstable starts with ordinal zero + Err(pos) => pos - 1, + } + } + + /// Get the [`BlockAddr`] of the block containing the `ord`-th term. + pub(crate) fn get_block_with_ord(&self, ord: TermOrdinal) -> BlockAddr { + // locate_with_ord always returns an index within range + self.get_block(self.locate_with_ord(ord)).unwrap() } } @@ -30,7 +76,7 @@ pub struct BlockAddr { } #[derive(Debug, Serialize, Deserialize)] -struct BlockMeta { +pub(crate) struct BlockMeta { /// Any byte string that is lexicographically greater or equal to /// the last key in the block, /// and yet strictly smaller than the first key in the next block. @@ -98,26 +144,38 @@ mod tests { fn test_sstable_index() { let mut sstable_builder = SSTableIndexBuilder::default(); sstable_builder.add_block(b"aaa", 10..20, 0u64); - sstable_builder.add_block(b"bbbbbbb", 20..30, 564); + sstable_builder.add_block(b"bbbbbbb", 20..30, 5u64); sstable_builder.add_block(b"ccc", 30..40, 10u64); sstable_builder.add_block(b"dddd", 40..50, 15u64); let mut buffer: Vec = Vec::new(); sstable_builder.serialize(&mut buffer).unwrap(); let sstable_index = SSTableIndex::load(&buffer[..]).unwrap(); assert_eq!( - sstable_index.search_block(b"bbbde"), + sstable_index.get_block_with_key(b"bbbde"), Some(BlockAddr { first_ordinal: 10u64, byte_range: 30..40 }) ); + + assert_eq!(sstable_index.locate_with_key(b"aa").unwrap(), 0); + assert_eq!(sstable_index.locate_with_key(b"aaa").unwrap(), 0); + assert_eq!(sstable_index.locate_with_key(b"aab").unwrap(), 1); + assert_eq!(sstable_index.locate_with_key(b"ccc").unwrap(), 2); + assert!(sstable_index.locate_with_key(b"e").is_none()); + + assert_eq!(sstable_index.locate_with_ord(0), 0); + assert_eq!(sstable_index.locate_with_ord(1), 0); + assert_eq!(sstable_index.locate_with_ord(4), 0); + assert_eq!(sstable_index.locate_with_ord(5), 1); + assert_eq!(sstable_index.locate_with_ord(100), 3); } #[test] fn test_sstable_with_corrupted_data() { let mut sstable_builder = SSTableIndexBuilder::default(); sstable_builder.add_block(b"aaa", 10..20, 0u64); - sstable_builder.add_block(b"bbbbbbb", 20..30, 564); + sstable_builder.add_block(b"bbbbbbb", 20..30, 5u64); sstable_builder.add_block(b"ccc", 30..40, 10u64); sstable_builder.add_block(b"dddd", 40..50, 15u64); let mut buffer: Vec = Vec::new(); diff --git a/sstable/src/streamer.rs b/sstable/src/streamer.rs index 752b39b64..889f0a4b4 100644 --- a/sstable/src/streamer.rs +++ b/sstable/src/streamer.rs @@ -19,6 +19,7 @@ where automaton: A, lower: Bound>, upper: Bound>, + limit: Option, } fn bound_as_byte_slice(bound: &Bound>) -> Bound<&[u8]> { @@ -41,6 +42,7 @@ where automaton, lower: Bound::Unbounded, upper: Bound::Unbounded, + limit: None, } } @@ -68,24 +70,46 @@ where self } + /// Load no more data than what's required to to get `limit` + /// matching entries. + /// + /// The resulting [`Streamer`] can still return marginaly + /// more than `limit` elements. + pub fn limit(mut self, limit: u64) -> Self { + self.limit = Some(limit); + self + } + /// Creates the stream corresponding to the range /// of terms defined using the `StreamerBuilder`. pub fn into_stream(self) -> io::Result> { // TODO Optimize by skipping to the right first block. let start_state = self.automaton.start(); + let key_range = ( bound_as_byte_slice(&self.lower), bound_as_byte_slice(&self.upper), ); + + let first_term = match &key_range.0 { + Bound::Included(key) | Bound::Excluded(key) => self + .term_dict + .sstable_index + .get_block_with_key(key) + .map(|block| block.first_ordinal) + .unwrap_or(0), + Bound::Unbounded => 0, + }; + let delta_reader = self .term_dict - .sstable_delta_reader_for_key_range(key_range)?; + .sstable_delta_reader_for_key_range(key_range, self.limit)?; Ok(Streamer { automaton: self.automaton, states: vec![start_state], delta_reader, key: Vec::new(), - term_ord: None, + term_ord: first_term.checked_sub(1), lower_bound: self.lower, upper_bound: self.upper, }) diff --git a/sstable/src/value/mod.rs b/sstable/src/value/mod.rs index 54106ec92..35fb940ea 100644 --- a/sstable/src/value/mod.rs +++ b/sstable/src/value/mod.rs @@ -16,7 +16,7 @@ pub trait ValueReader: Default { /// Loads a block. /// - /// Returns the number of bytes that were written. + /// Returns the number of bytes that were read. fn load(&mut self, data: &[u8]) -> io::Result; }