From d96a716d209a7ce855c509fc3c9d421b04b9fe41 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 21 Dec 2022 12:16:00 +0900 Subject: [PATCH] Refactoring to prepare for the addition of dynamic fast field --- Cargo.toml | 4 +- bitpacker/src/bitpacker.rs | 2 +- common/Cargo.toml | 5 +- {src/directory => common/src}/file_slice.rs | 75 ++++-- common/src/lib.rs | 3 +- fastfield_codecs/Cargo.toml | 4 +- ownedbytes/Cargo.toml | 2 +- ownedbytes/src/lib.rs | 2 +- src/core/inverted_index_reader.rs | 17 +- src/directory/mod.rs | 3 +- src/error.rs | 22 -- src/lib.rs | 4 - src/store/reader.rs | 2 +- src/termdict/fst_termdict/term_info_store.rs | 2 +- src/termdict/fst_termdict/termdict.rs | 17 +- src/termdict/sstable_termdict/mod.rs | 58 ++-- src/termdict/sstable_termdict/termdict.rs | 253 +----------------- src/termdict/tests.rs | 4 +- sstable/Cargo.toml | 2 +- sstable/src/block_reader.rs | 13 +- sstable/src/delta.rs | 23 +- sstable/src/dictionary.rs | 231 ++++++++++++++++ sstable/src/lib.rs | 130 ++++++--- sstable/src/merge/heap_merge.rs | 12 +- sstable/src/merge/mod.rs | 8 +- sstable/src/sstable_index.rs | 13 +- .../src}/streamer.rs | 126 +++++---- sstable/src/value.rs | 95 ------- sstable/src/value/mod.rs | 75 ++++++ sstable/src/value/range.rs | 95 +++++++ sstable/src/value/u64_monotonic.rs | 73 +++++ sstable/src/value/void.rs | 41 +++ stacker/Cargo.toml | 2 +- 33 files changed, 832 insertions(+), 586 deletions(-) rename {src/directory => common/src}/file_slice.rs (81%) create mode 100644 sstable/src/dictionary.rs rename {src/termdict/sstable_termdict => sstable/src}/streamer.rs (65%) delete mode 100644 sstable/src/value.rs create mode 100644 sstable/src/value/mod.rs create mode 100644 sstable/src/value/range.rs create mode 100644 sstable/src/value/u64_monotonic.rs create mode 100644 sstable/src/value/void.rs diff --git a/Cargo.toml b/Cargo.toml index baf593511..dadb7ee99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,9 +60,9 @@ sstable = { version="0.1", path="./sstable", package ="tantivy-sstable", optiona stacker = { version="0.1", path="./stacker", package ="tantivy-stacker" } tantivy-query-grammar = { version= "0.19.0", path="./query-grammar" } tantivy-bitpacker = { version= "0.3", path="./bitpacker" } -common = { version= "0.4", path = "./common/", package = "tantivy-common" } +common = { version= "0.5", path = "./common/", package = "tantivy-common" } fastfield_codecs = { version= "0.3", path="./fastfield_codecs", default-features = false } -ownedbytes = { version= "0.4", path="./ownedbytes" } +ownedbytes = { version= "0.5", path="./ownedbytes" } [target.'cfg(windows)'.dependencies] winapi = "0.3.9" diff --git a/bitpacker/src/bitpacker.rs b/bitpacker/src/bitpacker.rs index b5058cfac..c8221259f 100644 --- a/bitpacker/src/bitpacker.rs +++ b/bitpacker/src/bitpacker.rs @@ -101,7 +101,7 @@ impl BitUnpacker { .try_into() .unwrap(); let val_unshifted_unmasked: u64 = u64::from_le_bytes(bytes); - let val_shifted = (val_unshifted_unmasked >> bit_shift); + let val_shifted: u64 = val_unshifted_unmasked >> bit_shift; val_shifted & self.mask } } diff --git a/common/Cargo.toml b/common/Cargo.toml index e579a9aab..9d16079e8 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tantivy-common" -version = "0.4.0" +version = "0.5.0" authors = ["Paul Masurel ", "Pascal Seitz "] license = "MIT" edition = "2021" @@ -14,7 +14,8 @@ repository = "https://github.com/quickwit-oss/tantivy" [dependencies] byteorder = "1.4.3" -ownedbytes = { version= "0.4", path="../ownedbytes" } +ownedbytes = { version= "0.5", path="../ownedbytes" } +async-trait = "0.1" [dev-dependencies] proptest = "1.0.0" diff --git a/src/directory/file_slice.rs b/common/src/file_slice.rs similarity index 81% rename from src/directory/file_slice.rs rename to common/src/file_slice.rs index 1d0cd6915..1eb294f88 100644 --- a/src/directory/file_slice.rs +++ b/common/src/file_slice.rs @@ -1,19 +1,18 @@ -use std::ops::{Deref, Range}; +use std::ops::{Deref, Range, RangeBounds}; use std::sync::Arc; use std::{fmt, io}; use async_trait::async_trait; -use common::HasLen; -use stable_deref_trait::StableDeref; +use ownedbytes::{OwnedBytes, StableDeref}; -use crate::directory::OwnedBytes; +use crate::HasLen; /// Objects that represents files sections in tantivy. /// /// By contract, whatever happens to the directory file, as long as a FileHandle /// is alive, the data associated with it cannot be altered or destroyed. /// -/// The underlying behavior is therefore specific to the [`Directory`](crate::Directory) that +/// The underlying behavior is therefore specific to the `Directory` that /// created it. Despite its name, a [`FileSlice`] may or may not directly map to an actual file /// on the filesystem. @@ -24,13 +23,9 @@ pub trait FileHandle: 'static + Send + Sync + HasLen + fmt::Debug { /// This method may panic if the range requested is invalid. fn read_bytes(&self, range: Range) -> io::Result; - #[cfg(feature = "quickwit")] #[doc(hidden)] - async fn read_bytes_async( - &self, - _byte_range: Range, - ) -> crate::AsyncIoResult { - Err(crate::error::AsyncIoError::AsyncUnsupported) + async fn read_bytes_async(&self, byte_range: Range) -> io::Result { + self.read_bytes(byte_range) } } @@ -42,7 +37,7 @@ impl FileHandle for &'static [u8] { } #[cfg(feature = "quickwit")] - async fn read_bytes_async(&self, byte_range: Range) -> crate::AsyncIoResult { + async fn read_bytes_async(&self, byte_range: Range) -> io::Result { Ok(self.read_bytes(byte_range)?) } } @@ -70,6 +65,25 @@ impl fmt::Debug for FileSlice { } } +#[inline] +fn combine_ranges>(orig_range: Range, rel_range: R) -> Range { + let start: usize = orig_range.start + + match rel_range.start_bound().cloned() { + std::ops::Bound::Included(rel_start) => rel_start, + std::ops::Bound::Excluded(rel_start) => rel_start + 1, + std::ops::Bound::Unbounded => 0, + }; + assert!(start <= orig_range.end); + let end: usize = match rel_range.end_bound().cloned() { + std::ops::Bound::Included(rel_end) => orig_range.start + rel_end + 1, + std::ops::Bound::Excluded(rel_end) => orig_range.start + rel_end, + std::ops::Bound::Unbounded => orig_range.end, + }; + assert!(end >= start); + assert!(end <= orig_range.end); + start..end +} + impl FileSlice { /// Wraps a FileHandle. pub fn new(file_handle: Arc) -> Self { @@ -93,11 +107,11 @@ impl FileSlice { /// /// Panics if `byte_range.end` exceeds the filesize. #[must_use] - pub fn slice(&self, byte_range: Range) -> FileSlice { - assert!(byte_range.end <= self.len()); + #[inline] + pub fn slice>(&self, byte_range: R) -> FileSlice { FileSlice { data: self.data.clone(), - range: self.range.start + byte_range.start..self.range.start + byte_range.end, + range: combine_ranges(self.range.clone(), byte_range), } } @@ -117,9 +131,8 @@ impl FileSlice { self.data.read_bytes(self.range.clone()) } - #[cfg(feature = "quickwit")] #[doc(hidden)] - pub async fn read_bytes_async(&self) -> crate::AsyncIoResult { + pub async fn read_bytes_async(&self) -> io::Result { self.data.read_bytes_async(self.range.clone()).await } @@ -137,12 +150,8 @@ impl FileSlice { .read_bytes(self.range.start + range.start..self.range.start + range.end) } - #[cfg(feature = "quickwit")] #[doc(hidden)] - pub async fn read_bytes_slice_async( - &self, - byte_range: Range, - ) -> crate::AsyncIoResult { + pub async fn read_bytes_slice_async(&self, byte_range: Range) -> io::Result { assert!( self.range.start + byte_range.end <= self.range.end, "`to` exceeds the fileslice length" @@ -205,7 +214,7 @@ impl FileHandle for FileSlice { } #[cfg(feature = "quickwit")] - async fn read_bytes_async(&self, byte_range: Range) -> crate::AsyncIoResult { + async fn read_bytes_async(&self, byte_range: Range) -> io::Result { self.read_bytes_slice_async(byte_range).await } } @@ -223,7 +232,7 @@ impl FileHandle for OwnedBytes { } #[cfg(feature = "quickwit")] - async fn read_bytes_async(&self, range: Range) -> crate::AsyncIoResult { + async fn read_bytes_async(&self, range: Range) -> io::Result { let bytes = self.read_bytes(range)?; Ok(bytes) } @@ -234,9 +243,9 @@ mod tests { use std::io; use std::sync::Arc; - use common::HasLen; - use super::{FileHandle, FileSlice}; + use crate::file_slice::combine_ranges; + use crate::HasLen; #[test] fn test_file_slice() -> io::Result<()> { @@ -307,4 +316,18 @@ mod tests { b"bcd" ); } + + #[test] + fn test_combine_range() { + assert_eq!(combine_ranges(1..3, 0..1), 1..2); + assert_eq!(combine_ranges(1..3, 1..), 2..3); + assert_eq!(combine_ranges(1..4, ..2), 1..3); + assert_eq!(combine_ranges(3..10, 2..5), 5..8); + } + + #[test] + #[should_panic] + fn test_combine_range_panics() { + let _ = combine_ranges(3..5, 1..4); + } } diff --git a/common/src/lib.rs b/common/src/lib.rs index 9dac16de1..15aee5691 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -5,11 +5,12 @@ use std::ops::Deref; pub use byteorder::LittleEndian as Endianness; mod bitset; +pub mod file_slice; mod serialize; mod vint; mod writer; - pub use bitset::*; +pub use ownedbytes::OwnedBytes; pub use serialize::{BinarySerializable, DeserializeFrom, FixedSize}; pub use vint::{ deserialize_vint_u128, read_u32_vint, read_u32_vint_no_advance, serialize_vint_u128, diff --git a/fastfield_codecs/Cargo.toml b/fastfield_codecs/Cargo.toml index a56d0f983..76d03bf11 100644 --- a/fastfield_codecs/Cargo.toml +++ b/fastfield_codecs/Cargo.toml @@ -12,9 +12,9 @@ repository = "https://github.com/quickwit-oss/tantivy" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -common = { version = "0.4", path = "../common/", package = "tantivy-common" } +common = { version = "0.5", path = "../common/", package = "tantivy-common" } tantivy-bitpacker = { version= "0.3", path = "../bitpacker/" } -ownedbytes = { version = "0.4.0", path = "../ownedbytes" } +ownedbytes = { version = "0.5", path = "../ownedbytes" } prettytable-rs = {version="0.9.0", optional= true} rand = {version="0.8.3", optional= true} fastdivide = "0.4" diff --git a/ownedbytes/Cargo.toml b/ownedbytes/Cargo.toml index 4bd3206ef..c7cf89301 100644 --- a/ownedbytes/Cargo.toml +++ b/ownedbytes/Cargo.toml @@ -1,7 +1,7 @@ [package] authors = ["Paul Masurel ", "Pascal Seitz "] name = "ownedbytes" -version = "0.4.0" +version = "0.5.0" edition = "2021" description = "Expose data as static slice" license = "MIT" diff --git a/ownedbytes/src/lib.rs b/ownedbytes/src/lib.rs index 622f9e66e..ef0ab72ac 100644 --- a/ownedbytes/src/lib.rs +++ b/ownedbytes/src/lib.rs @@ -3,7 +3,7 @@ use std::ops::{Deref, Range}; use std::sync::Arc; use std::{fmt, io, mem}; -use stable_deref_trait::StableDeref; +pub use stable_deref_trait::StableDeref; /// An OwnedBytes simply wraps an object that owns a slice of data and exposes /// this data as a slice. diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index ee144d68a..b986b7495 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -200,10 +200,7 @@ impl InvertedIndexReader { #[cfg(feature = "quickwit")] impl InvertedIndexReader { - pub(crate) async fn get_term_info_async( - &self, - term: &Term, - ) -> crate::AsyncIoResult> { + pub(crate) async fn get_term_info_async(&self, term: &Term) -> io::Result> { self.termdict.get_async(term.value_bytes()).await } @@ -211,12 +208,8 @@ impl InvertedIndexReader { /// This method is for an advanced usage only. /// /// Most users should prefer using [`Self::read_postings()`] instead. - pub async fn warm_postings( - &self, - term: &Term, - with_positions: bool, - ) -> crate::AsyncIoResult<()> { - let term_info_opt = self.get_term_info_async(term).await?; + pub async fn warm_postings(&self, term: &Term, with_positions: bool) -> io::Result<()> { + let term_info_opt: Option = self.get_term_info_async(term).await?; if let Some(term_info) = term_info_opt { self.postings_file_slice .read_bytes_slice_async(term_info.postings_range.clone()) @@ -234,7 +227,7 @@ impl InvertedIndexReader { /// This method is for an advanced usage only. /// /// If you know which terms to pre-load, prefer using [`Self::warm_postings`] instead. - pub async fn warm_postings_full(&self, with_positions: bool) -> crate::AsyncIoResult<()> { + pub async fn warm_postings_full(&self, with_positions: bool) -> io::Result<()> { self.postings_file_slice.read_bytes_async().await?; if with_positions { self.positions_file_slice.read_bytes_async().await?; @@ -243,7 +236,7 @@ impl InvertedIndexReader { } /// Returns the number of documents containing the term asynchronously. - pub async fn doc_freq_async(&self, term: &Term) -> crate::AsyncIoResult { + pub async fn doc_freq_async(&self, term: &Term) -> io::Result { Ok(self .get_term_info_async(term) .await? diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 6397ea6b0..eb8eedd66 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -5,7 +5,6 @@ mod mmap_directory; mod directory; mod directory_lock; -mod file_slice; mod file_watcher; mod footer; mod managed_directory; @@ -20,13 +19,13 @@ mod composite_file; use std::io::BufWriter; use std::path::PathBuf; +pub use common::file_slice::{FileHandle, FileSlice}; pub use common::{AntiCallToken, TerminatingWrite}; pub use ownedbytes::OwnedBytes; pub(crate) use self::composite_file::{CompositeFile, CompositeWrite}; pub use self::directory::{Directory, DirectoryClone, DirectoryLock}; pub use self::directory_lock::{Lock, INDEX_WRITER_LOCK, META_LOCK}; -pub use self::file_slice::{FileHandle, FileSlice}; pub use self::ram_directory::RamDirectory; pub use self::watch_event_router::{WatchCallback, WatchCallbackList, WatchHandle}; diff --git a/src/error.rs b/src/error.rs index e8136cdef..ec3ceb87f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -104,28 +104,6 @@ pub enum TantivyError { InternalError(String), } -#[cfg(feature = "quickwit")] -#[derive(Error, Debug)] -#[doc(hidden)] -pub enum AsyncIoError { - #[error("io::Error `{0}`")] - Io(#[from] io::Error), - #[error("Asynchronous API is unsupported by this directory")] - AsyncUnsupported, -} - -#[cfg(feature = "quickwit")] -impl From for TantivyError { - fn from(async_io_err: AsyncIoError) -> Self { - match async_io_err { - AsyncIoError::Io(io_err) => TantivyError::from(io_err), - AsyncIoError::AsyncUnsupported => { - TantivyError::SystemError(format!("{:?}", async_io_err)) - } - } - } -} - impl From for TantivyError { fn from(io_err: io::Error) -> TantivyError { TantivyError::IoError(Arc::new(io_err)) diff --git a/src/lib.rs b/src/lib.rs index 53ddde564..79fe68987 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -259,10 +259,6 @@ pub use crate::future_result::FutureResult; /// and instead, refer to this as `crate::Result`. pub type Result = std::result::Result; -/// Result for an Async io operation. -#[cfg(feature = "quickwit")] -pub type AsyncIoResult = std::result::Result; - mod core; mod indexer; diff --git a/src/store/reader.rs b/src/store/reader.rs index c103c37d2..b0a3b9259 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -319,7 +319,7 @@ impl StoreReader { /// In most cases use [`get_async`](Self::get_async) /// /// Loads and decompresses a block asynchronously. - async fn read_block_async(&self, checkpoint: &Checkpoint) -> crate::AsyncIoResult { + async fn read_block_async(&self, checkpoint: &Checkpoint) -> io::Result { let cache_key = checkpoint.byte_range.start; if let Some(block) = self.cache.get_from_cache(checkpoint.byte_range.start) { return Ok(block); diff --git a/src/termdict/fst_termdict/term_info_store.rs b/src/termdict/fst_termdict/term_info_store.rs index fabe47ed1..799479e7d 100644 --- a/src/termdict/fst_termdict/term_info_store.rs +++ b/src/termdict/fst_termdict/term_info_store.rs @@ -121,7 +121,7 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 { } impl TermInfoStore { - pub fn open(term_info_store_file: FileSlice) -> crate::Result { + pub fn open(term_info_store_file: FileSlice) -> io::Result { let (len_slice, main_slice) = term_info_store_file.split(16); let mut bytes = len_slice.read_bytes()?; let len = u64::deserialize(&mut bytes)? as usize; diff --git a/src/termdict/fst_termdict/termdict.rs b/src/termdict/fst_termdict/termdict.rs index 563c9a3bc..01fb9a918 100644 --- a/src/termdict/fst_termdict/termdict.rs +++ b/src/termdict/fst_termdict/termdict.rs @@ -8,7 +8,6 @@ use tantivy_fst::Automaton; use super::term_info_store::{TermInfoStore, TermInfoStoreWriter}; use super::{TermStreamer, TermStreamerBuilder}; use crate::directory::{FileSlice, OwnedBytes}; -use crate::error::DataCorruption; use crate::postings::TermInfo; use crate::termdict::TermOrdinal; @@ -55,7 +54,7 @@ where W: Write /// to insert_key and insert_value. /// /// Prefer using `.insert(key, value)` - pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { + pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { self.fst_builder .insert(key, self.term_ord) .map_err(convert_fst_error)?; @@ -66,7 +65,7 @@ where W: Write /// # Warning /// /// Horribly dangerous internal API. See `.insert_key(...)`. - pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { + pub fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { self.term_info_store_writer.write_term_info(term_info)?; Ok(()) } @@ -86,10 +85,14 @@ where W: Write } } -fn open_fst_index(fst_file: FileSlice) -> crate::Result> { +fn open_fst_index(fst_file: FileSlice) -> io::Result> { let bytes = fst_file.read_bytes()?; - let fst = Fst::new(bytes) - .map_err(|err| DataCorruption::comment_only(format!("Fst data is corrupted: {:?}", err)))?; + let fst = Fst::new(bytes).map_err(|err| { + io::Error::new( + io::ErrorKind::InvalidData, + format!("Fst data is corrupted: {:?}", err), + ) + })?; Ok(tantivy_fst::Map::from(fst)) } @@ -114,7 +117,7 @@ pub struct TermDictionary { impl TermDictionary { /// Opens a `TermDictionary`. - pub fn open(file: FileSlice) -> crate::Result { + pub fn open(file: FileSlice) -> io::Result { let (main_slice, footer_len_slice) = file.split_from_end(8); let mut footer_len_bytes = footer_len_slice.read_bytes()?; let footer_size = u64::deserialize(&mut footer_len_bytes)?; diff --git a/src/termdict/sstable_termdict/mod.rs b/src/termdict/sstable_termdict/mod.rs index 0c08d88c1..4a2a17f9a 100644 --- a/src/termdict/sstable_termdict/mod.rs +++ b/src/termdict/sstable_termdict/mod.rs @@ -1,26 +1,28 @@ use std::io; mod merger; -mod streamer; mod termdict; use std::iter::ExactSizeIterator; use common::VInt; use sstable::value::{ValueReader, ValueWriter}; -use sstable::{BlockReader, SSTable}; +use sstable::SSTable; +use tantivy_fst::automaton::AlwaysMatch; pub use self::merger::TermMerger; -pub use self::streamer::{TermStreamer, TermStreamerBuilder}; -pub use self::termdict::{TermDictionary, TermDictionaryBuilder}; use crate::postings::TermInfo; +pub type TermDictionary = sstable::Dictionary; +pub type TermDictionaryBuilder = sstable::Writer; +pub type TermStreamer<'a, A = AlwaysMatch> = sstable::Streamer<'a, TermSSTable, A>; + pub struct TermSSTable; impl SSTable for TermSSTable { type Value = TermInfo; - type Reader = TermInfoReader; - type Writer = TermInfoWriter; + type ValueReader = TermInfoReader; + type ValueWriter = TermInfoWriter; } #[derive(Default)] @@ -35,15 +37,16 @@ impl ValueReader for TermInfoReader { &self.term_infos[idx] } - fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> { + fn load(&mut self, mut data: &[u8]) -> io::Result { + let len_before = data.len(); self.term_infos.clear(); - let num_els = VInt::deserialize_u64(reader)?; - let mut postings_start = VInt::deserialize_u64(reader)? as usize; - let mut positions_start = VInt::deserialize_u64(reader)? as usize; + let num_els = VInt::deserialize_u64(&mut data)?; + let mut postings_start = VInt::deserialize_u64(&mut data)? as usize; + let mut positions_start = VInt::deserialize_u64(&mut data)? as usize; for _ in 0..num_els { - let doc_freq = VInt::deserialize_u64(reader)? as u32; - let postings_num_bytes = VInt::deserialize_u64(reader)?; - let positions_num_bytes = VInt::deserialize_u64(reader)?; + let doc_freq = VInt::deserialize_u64(&mut data)? as u32; + let postings_num_bytes = VInt::deserialize_u64(&mut data)?; + let positions_num_bytes = VInt::deserialize_u64(&mut data)?; let postings_end = postings_start + postings_num_bytes as usize; let positions_end = positions_start + positions_num_bytes as usize; let term_info = TermInfo { @@ -55,7 +58,8 @@ impl ValueReader for TermInfoReader { postings_start = postings_end; positions_start = positions_end; } - Ok(()) + let consumed_len = len_before - data.len(); + Ok(consumed_len) } } @@ -71,7 +75,7 @@ impl ValueWriter for TermInfoWriter { self.term_infos.push(term_info.clone()); } - fn write_block(&mut self, buffer: &mut Vec) { + fn serialize_block(&mut self, buffer: &mut Vec) { VInt(self.term_infos.len() as u64).serialize_into_vec(buffer); if self.term_infos.is_empty() { return; @@ -89,17 +93,13 @@ impl ValueWriter for TermInfoWriter { #[cfg(test)] mod tests { - use std::io; - use sstable::value::{ValueReader, ValueWriter}; - use super::BlockReader; - use crate::directory::OwnedBytes; use crate::postings::TermInfo; use crate::termdict::sstable_termdict::TermInfoReader; #[test] - fn test_block_terminfos() -> io::Result<()> { + fn test_block_terminfos() { let mut term_info_writer = super::TermInfoWriter::default(); term_info_writer.write(&TermInfo { doc_freq: 120u32, @@ -117,10 +117,10 @@ mod tests { positions_range: 1100..1302, }); let mut buffer = Vec::new(); - term_info_writer.write_block(&mut buffer); - let mut block_reader = make_block_reader(&buffer[..]); + term_info_writer.serialize_block(&mut buffer); + // let mut block_reader = make_block_reader(&buffer[..]); let mut term_info_reader = TermInfoReader::default(); - term_info_reader.read(&mut block_reader)?; + let num_bytes: usize = term_info_reader.load(&buffer[..]).unwrap(); assert_eq!( term_info_reader.value(0), &TermInfo { @@ -129,16 +129,6 @@ mod tests { positions_range: 10..122 } ); - assert!(block_reader.buffer().is_empty()); - Ok(()) - } - - fn make_block_reader(data: &[u8]) -> BlockReader { - let mut buffer = (data.len() as u32).to_le_bytes().to_vec(); - buffer.extend_from_slice(data); - let owned_bytes = OwnedBytes::new(buffer); - let mut block_reader = BlockReader::new(Box::new(owned_bytes)); - block_reader.read_block().unwrap(); - block_reader + assert_eq!(buffer.len(), num_bytes); } } diff --git a/src/termdict/sstable_termdict/termdict.rs b/src/termdict/sstable_termdict/termdict.rs index c618d7ecf..e2119d9a2 100644 --- a/src/termdict/sstable_termdict/termdict.rs +++ b/src/termdict/sstable_termdict/termdict.rs @@ -1,256 +1,11 @@ -use std::io; -use std::sync::Arc; +use sstable::SSTable; -use common::BinarySerializable; -use once_cell::sync::Lazy; -use sstable::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, Writer}; -use tantivy_fst::automaton::AlwaysMatch; -use tantivy_fst::Automaton; - -use crate::directory::{FileSlice, OwnedBytes}; use crate::postings::TermInfo; -use crate::termdict::sstable_termdict::{ - TermInfoReader, TermInfoWriter, TermSSTable, TermStreamer, TermStreamerBuilder, -}; -use crate::termdict::TermOrdinal; -use crate::AsyncIoResult; +use crate::termdict::sstable_termdict::{TermInfoReader, TermInfoWriter}; pub struct TermInfoSSTable; impl SSTable for TermInfoSSTable { type Value = TermInfo; - type Reader = TermInfoReader; - type Writer = TermInfoWriter; -} - -/// Builder for the new term dictionary. -pub struct TermDictionaryBuilder { - sstable_writer: Writer, -} - -impl TermDictionaryBuilder { - /// Creates a new `TermDictionaryBuilder` - pub fn create(w: W) -> io::Result { - let sstable_writer = TermSSTable::writer(w); - Ok(TermDictionaryBuilder { sstable_writer }) - } - - /// Inserts a `(key, value)` pair in the term dictionary. - /// - /// *Keys have to be inserted in order.* - pub fn insert>(&mut self, key_ref: K, value: &TermInfo) -> io::Result<()> { - let key = key_ref.as_ref(); - self.insert_key(key)?; - self.insert_value(value)?; - Ok(()) - } - - /// # Warning - /// Horribly dangerous internal API - /// - /// If used, it must be used by systematically alternating calls - /// to insert_key and insert_value. - /// - /// Prefer using `.insert(key, value)` - #[allow(clippy::unnecessary_wraps)] - pub(crate) fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { - self.sstable_writer.write_key(key); - Ok(()) - } - - /// # Warning - /// - /// Horribly dangerous internal API. See `.insert_key(...)`. - pub(crate) fn insert_value(&mut self, term_info: &TermInfo) -> io::Result<()> { - self.sstable_writer.write_value(term_info) - } - - /// Finalize writing the builder, and returns the underlying - /// `Write` object. - pub fn finish(self) -> io::Result { - self.sstable_writer.finalize() - } -} - -static EMPTY_TERM_DICT_FILE: Lazy = Lazy::new(|| { - let term_dictionary_data: Vec = TermDictionaryBuilder::create(Vec::::new()) - .expect("Creating a TermDictionaryBuilder in a Vec should never fail") - .finish() - .expect("Writing in a Vec should never fail"); - FileSlice::from(term_dictionary_data) -}); - -/// The term dictionary contains all of the terms in -/// `tantivy index` in a sorted manner. -/// -/// The `Fst` crate is used to associate terms to their -/// respective `TermOrdinal`. The `TermInfoStore` then makes it -/// possible to fetch the associated `TermInfo`. -pub struct TermDictionary { - sstable_slice: FileSlice, - sstable_index: SSTableIndex, - num_terms: u64, -} - -impl TermDictionary { - pub(crate) fn sstable_reader(&self) -> io::Result> { - let data = self.sstable_slice.read_bytes()?; - Ok(TermInfoSSTable::reader(data)) - } - - pub(crate) fn sstable_reader_block( - &self, - block_addr: BlockAddr, - ) -> io::Result> { - let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?; - Ok(TermInfoSSTable::reader(data)) - } - - pub(crate) async fn sstable_reader_block_async( - &self, - block_addr: BlockAddr, - ) -> AsyncIoResult> { - let data = self - .sstable_slice - .read_bytes_slice_async(block_addr.byte_range) - .await?; - Ok(TermInfoSSTable::reader(data)) - } - - pub(crate) fn sstable_delta_reader(&self) -> io::Result> { - let data = self.sstable_slice.read_bytes()?; - Ok(TermInfoSSTable::delta_reader(data)) - } - - /// Opens a `TermDictionary`. - pub fn open(term_dictionary_file: FileSlice) -> crate::Result { - let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16); - let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?; - let index_offset = u64::deserialize(&mut footer_len_bytes)?; - let num_terms = u64::deserialize(&mut footer_len_bytes)?; - let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); - let sstable_index_bytes = index_slice.read_bytes()?; - let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice()) - .map_err(|_| crate::error::DataCorruption::comment_only("SSTable corruption"))?; - Ok(TermDictionary { - sstable_slice, - sstable_index, - num_terms, - }) - } - - /// Creates a term dictionary from the supplied bytes. - pub fn from_bytes(owned_bytes: OwnedBytes) -> crate::Result { - TermDictionary::open(FileSlice::new(Arc::new(owned_bytes))) - } - - /// Creates an empty term dictionary which contains no terms. - pub fn empty() -> Self { - TermDictionary::open(EMPTY_TERM_DICT_FILE.clone()).unwrap() - } - - /// Returns the number of terms in the dictionary. - /// Term ordinals range from 0 to `num_terms() - 1`. - pub fn num_terms(&self) -> usize { - self.num_terms as usize - } - - /// Returns the ordinal associated with a given term. - pub fn term_ord>(&self, key: K) -> io::Result> { - let mut term_ord = 0u64; - let key_bytes = key.as_ref(); - let mut sstable_reader = self.sstable_reader()?; - while sstable_reader.advance().unwrap_or(false) { - if sstable_reader.key() == key_bytes { - return Ok(Some(term_ord)); - } - term_ord += 1; - } - Ok(None) - } - - /// Returns the term associated with a given term ordinal. - /// - /// Term ordinals are defined as the position of the term in - /// the sorted list of terms. - /// - /// Returns true if and only if the term has been found. - /// - /// Regardless of whether the term is found or not, - /// the buffer may be modified. - pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec) -> io::Result { - let mut sstable_reader = self.sstable_reader()?; - bytes.clear(); - for _ in 0..(ord + 1) { - if !sstable_reader.advance().unwrap_or(false) { - return Ok(false); - } - } - bytes.extend_from_slice(sstable_reader.key()); - Ok(true) - } - - /// Returns the number of terms in the dictionary. - pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result { - let mut sstable_reader = self.sstable_reader()?; - for _ in 0..(term_ord + 1) { - if !sstable_reader.advance().unwrap_or(false) { - return Ok(TermInfo::default()); - } - } - Ok(sstable_reader.value().clone()) - } - - /// Lookups the value corresponding to the key. - pub fn get>(&self, key: K) -> io::Result> { - if let Some(block_addr) = self.sstable_index.search(key.as_ref()) { - let mut sstable_reader = self.sstable_reader_block(block_addr)?; - let key_bytes = key.as_ref(); - while sstable_reader.advance().unwrap_or(false) { - if sstable_reader.key() == key_bytes { - let term_info = sstable_reader.value().clone(); - return Ok(Some(term_info)); - } - } - } - Ok(None) - } - - /// Lookups the value corresponding to the key. - pub async fn get_async>(&self, key: K) -> AsyncIoResult> { - if let Some(block_addr) = self.sstable_index.search(key.as_ref()) { - let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?; - let key_bytes = key.as_ref(); - while sstable_reader.advance().unwrap_or(false) { - if sstable_reader.key() == key_bytes { - let term_info = sstable_reader.value().clone(); - return Ok(Some(term_info)); - } - } - } - Ok(None) - } - - /// Returns a range builder, to stream all of the terms - /// within an interval. - pub fn range(&self) -> TermStreamerBuilder<'_> { - TermStreamerBuilder::new(self, AlwaysMatch) - } - - /// A stream of all the sorted terms. - pub fn stream(&self) -> io::Result> { - self.range().into_stream() - } - - /// Returns a search builder, to stream all of the terms - /// within the Automaton - pub fn search<'a, A: Automaton + 'a>(&'a self, automaton: A) -> TermStreamerBuilder<'a, A> - where A::State: Clone { - TermStreamerBuilder::::new(self, automaton) - } - - #[doc(hidden)] - pub async fn warm_up_dictionary(&self) -> AsyncIoResult<()> { - self.sstable_slice.read_bytes_async().await?; - Ok(()) - } + type ValueReader = TermInfoReader; + type ValueWriter = TermInfoWriter; } diff --git a/src/termdict/tests.rs b/src/termdict/tests.rs index 50d040998..44f8dbe03 100644 --- a/src/termdict/tests.rs +++ b/src/termdict/tests.rs @@ -1,5 +1,5 @@ use std::path::PathBuf; -use std::str; +use std::{io, str}; use super::{TermDictionary, TermDictionaryBuilder, TermStreamer}; use crate::directory::{Directory, FileSlice, RamDirectory, TerminatingWrite}; @@ -247,7 +247,7 @@ fn test_empty_string() -> crate::Result<()> { Ok(()) } -fn stream_range_test_dict() -> crate::Result { +fn stream_range_test_dict() -> io::Result { let buffer: Vec = { let mut term_dictionary_builder = TermDictionaryBuilder::create(Vec::new())?; for i in 0u8..10u8 { diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 1f50efa7d..70d20aa1f 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" [dependencies] common = {path="../common", package="tantivy-common"} ciborium = "0.2" -byteorder = "1" serde = "1" +tantivy-fst = "0.4" [dev-dependencies] proptest = "1" diff --git a/sstable/src/block_reader.rs b/sstable/src/block_reader.rs index 58ccf457a..19fc53980 100644 --- a/sstable/src/block_reader.rs +++ b/sstable/src/block_reader.rs @@ -1,6 +1,4 @@ -use std::io::{self, Read}; - -use byteorder::{LittleEndian, ReadBytesExt}; +use std::io; pub struct BlockReader<'a> { buffer: Vec, @@ -8,6 +6,13 @@ pub struct BlockReader<'a> { offset: usize, } +#[inline] +fn read_u32(read: &mut dyn io::Read) -> io::Result { + let mut buf = [0u8; 4]; + read.read_exact(&mut buf)?; + Ok(u32::from_le_bytes(buf)) +} + impl<'a> BlockReader<'a> { pub fn new(reader: Box) -> BlockReader<'a> { BlockReader { @@ -30,7 +35,7 @@ impl<'a> BlockReader<'a> { pub fn read_block(&mut self) -> io::Result { self.offset = 0; - let block_len_res = self.reader.read_u32::(); + let block_len_res = read_u32(self.reader.as_mut()); if let Err(err) = &block_len_res { if err.kind() == io::ErrorKind::UnexpectedEof { return Ok(false); diff --git a/sstable/src/delta.rs b/sstable/src/delta.rs index 775ebe09b..4cc83caaf 100644 --- a/sstable/src/delta.rs +++ b/sstable/src/delta.rs @@ -44,7 +44,7 @@ where let start_offset = self.write.written_bytes() as usize; // TODO avoid buffer allocation let mut buffer = Vec::new(); - self.value_writer.write_block(&mut buffer); + self.value_writer.serialize_block(&mut buffer); let block_len = buffer.len() + self.block.len(); self.write.write_all(&(block_len as u32).to_le_bytes())?; self.write.write_all(&buffer[..])?; @@ -84,7 +84,7 @@ where Ok(None) } - pub fn finalize(self) -> CountingWriter> { + pub fn finish(self) -> CountingWriter> { self.write } } @@ -112,6 +112,10 @@ where TValueReader: value::ValueReader } } + pub fn empty() -> Self { + DeltaReader::new(&b""[..]) + } + fn deserialize_vint(&mut self) -> u64 { self.block_reader.deserialize_u64() } @@ -156,7 +160,8 @@ where TValueReader: value::ValueReader if !self.block_reader.read_block()? { return Ok(false); } - self.value_reader.read(&mut self.block_reader)?; + let consumed_len = self.value_reader.load(self.block_reader.buffer())?; + self.block_reader.advance(consumed_len); self.idx = 0; } else { self.idx += 1; @@ -180,3 +185,15 @@ where TValueReader: value::ValueReader self.value_reader.value(self.idx) } } + +#[cfg(test)] +mod tests { + use super::DeltaReader; + use crate::value::U64MonotonicReader; + + #[test] + fn test_empty() { + let mut delta_reader: DeltaReader = DeltaReader::empty(); + assert!(!delta_reader.advance().unwrap()); + } +} diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs new file mode 100644 index 000000000..7f0c40164 --- /dev/null +++ b/sstable/src/dictionary.rs @@ -0,0 +1,231 @@ +use std::io; +use std::marker::PhantomData; +use std::ops::{Bound, RangeBounds}; +use std::sync::Arc; + +use common::file_slice::FileSlice; +use common::{BinarySerializable, OwnedBytes}; +use tantivy_fst::automaton::AlwaysMatch; +use tantivy_fst::Automaton; + +use crate::streamer::{Streamer, StreamerBuilder}; +use crate::{BlockAddr, DeltaReader, Reader, SSTable, SSTableIndex, TermOrdinal}; + +/// The term dictionary contains all of the terms in +/// `tantivy index` in a sorted manner. +/// +/// The `Fst` crate is used to associate terms to their +/// respective `TermOrdinal`. The `TermInfoStore` then makes it +/// possible to fetch the associated `TermInfo`. +pub struct Dictionary { + pub sstable_slice: FileSlice, + pub sstable_index: SSTableIndex, + num_terms: u64, + phantom_data: PhantomData, +} + +impl Dictionary { + pub fn builder(wrt: W) -> io::Result> { + Ok(TSSTable::writer(wrt)) + } + + pub(crate) fn sstable_reader(&self) -> io::Result> { + let data = self.sstable_slice.read_bytes()?; + Ok(TSSTable::reader(data)) + } + + pub(crate) fn sstable_reader_block( + &self, + block_addr: BlockAddr, + ) -> io::Result> { + let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?; + Ok(TSSTable::reader(data)) + } + + pub(crate) async fn sstable_reader_block_async( + &self, + block_addr: BlockAddr, + ) -> io::Result> { + let data = self + .sstable_slice + .read_bytes_slice_async(block_addr.byte_range) + .await?; + Ok(TSSTable::reader(data)) + } + + pub(crate) fn sstable_delta_reader_for_key_range( + &self, + key_range: impl RangeBounds<[u8]>, + ) -> io::Result> { + let slice = self.file_slice_for_range(key_range); + let data = slice.read_bytes()?; + Ok(TSSTable::delta_reader(data)) + } + + fn file_slice_for_range(&self, key_range: impl RangeBounds<[u8]>) -> FileSlice { + let start_bound: Bound = match key_range.start_bound() { + Bound::Included(key) | Bound::Excluded(key) => { + let Some(first_block_addr) = self.sstable_index.search_block(key) else { + return FileSlice::empty(); + }; + Bound::Included(first_block_addr.byte_range.start) + } + Bound::Unbounded => Bound::Unbounded, + }; + let end_bound: Bound = match key_range.end_bound() { + Bound::Included(key) | Bound::Excluded(key) => { + if let Some(block_addr) = self.sstable_index.search_block(key) { + Bound::Excluded(block_addr.byte_range.end) + } else { + Bound::Unbounded + } + } + Bound::Unbounded => Bound::Unbounded, + }; + self.sstable_slice.slice((start_bound, end_bound)) + } + + /// Opens a `TermDictionary`. + pub fn open(term_dictionary_file: FileSlice) -> io::Result { + let (main_slice, footer_len_slice) = term_dictionary_file.split_from_end(16); + let mut footer_len_bytes: OwnedBytes = footer_len_slice.read_bytes()?; + let index_offset = u64::deserialize(&mut footer_len_bytes)?; + let num_terms = u64::deserialize(&mut footer_len_bytes)?; + let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); + let sstable_index_bytes = index_slice.read_bytes()?; + let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice()) + .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?; + Ok(Dictionary { + sstable_slice, + sstable_index, + num_terms, + phantom_data: PhantomData, + }) + } + + /// Creates a term dictionary from the supplied bytes. + pub fn from_bytes(owned_bytes: OwnedBytes) -> io::Result { + Dictionary::open(FileSlice::new(Arc::new(owned_bytes))) + } + + /// Creates an empty term dictionary which contains no terms. + pub fn empty() -> Self { + let term_dictionary_data: Vec = Self::builder(Vec::::new()) + .expect("Creating a TermDictionaryBuilder in a Vec should never fail") + .finish() + .expect("Writing in a Vec should never fail"); + let empty_dict_file = FileSlice::from(term_dictionary_data); + Dictionary::open(empty_dict_file).unwrap() + } + + /// Returns the number of terms in the dictionary. + /// Term ordinals range from 0 to `num_terms() - 1`. + pub fn num_terms(&self) -> usize { + self.num_terms as usize + } + + /// Returns the ordinal associated with a given term. + pub fn term_ord>(&self, key: K) -> io::Result> { + let mut term_ord = 0u64; + let key_bytes = key.as_ref(); + let mut sstable_reader = self.sstable_reader()?; + while sstable_reader.advance().unwrap_or(false) { + if sstable_reader.key() == key_bytes { + return Ok(Some(term_ord)); + } + term_ord += 1; + } + Ok(None) + } + + /// Returns the term associated with a given term ordinal. + /// + /// Term ordinals are defined as the position of the term in + /// the sorted list of terms. + /// + /// Returns true if and only if the term has been found. + /// + /// Regardless of whether the term is found or not, + /// the buffer may be modified. + pub fn ord_to_term(&self, ord: TermOrdinal, bytes: &mut Vec) -> io::Result { + let mut sstable_reader = self.sstable_reader()?; + bytes.clear(); + for _ in 0..(ord + 1) { + if !sstable_reader.advance().unwrap_or(false) { + return Ok(false); + } + } + bytes.extend_from_slice(sstable_reader.key()); + Ok(true) + } + + /// Returns the number of terms in the dictionary. + pub fn term_info_from_ord(&self, term_ord: TermOrdinal) -> io::Result> { + let mut sstable_reader = self.sstable_reader()?; + for _ in 0..(term_ord + 1) { + if !sstable_reader.advance().unwrap_or(false) { + return Ok(None); + } + } + Ok(Some(sstable_reader.value().clone())) + } + + /// Lookups the value corresponding to the key. + pub fn get>(&self, key: K) -> io::Result> { + if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) { + let mut sstable_reader = self.sstable_reader_block(block_addr)?; + let key_bytes = key.as_ref(); + while sstable_reader.advance().unwrap_or(false) { + if sstable_reader.key() == key_bytes { + let value = sstable_reader.value().clone(); + return Ok(Some(value)); + } + } + } + Ok(None) + } + + /// Lookups the value corresponding to the key. + pub async fn get_async>(&self, key: K) -> io::Result> { + if let Some(block_addr) = self.sstable_index.search_block(key.as_ref()) { + let mut sstable_reader = self.sstable_reader_block_async(block_addr).await?; + let key_bytes = key.as_ref(); + while sstable_reader.advance().unwrap_or(false) { + if sstable_reader.key() == key_bytes { + let value = sstable_reader.value().clone(); + return Ok(Some(value)); + } + } + } + Ok(None) + } + + /// Returns a range builder, to stream all of the terms + /// within an interval. + pub fn range(&self) -> StreamerBuilder<'_, TSSTable> { + StreamerBuilder::new(self, AlwaysMatch) + } + + /// A stream of all the sorted terms. + pub fn stream(&self) -> io::Result> { + self.range().into_stream() + } + + /// Returns a search builder, to stream all of the terms + /// within the Automaton + pub fn search<'a, A: Automaton + 'a>( + &'a self, + automaton: A, + ) -> StreamerBuilder<'a, TSSTable, A> + where + A::State: Clone, + { + StreamerBuilder::::new(self, automaton) + } + + #[doc(hidden)] + pub async fn warm_up_dictionary(&self) -> io::Result<()> { + self.sstable_slice.read_bytes_async().await?; + Ok(()) + } +} diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 59288f165..6e861f985 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -1,21 +1,29 @@ use std::io::{self, Write}; +use std::ops::Range; use std::usize; use merge::ValueMerger; mod delta; +mod dictionary; pub mod merge; +mod streamer; pub mod value; mod sstable_index; pub use sstable_index::{BlockAddr, SSTableIndex, SSTableIndexBuilder}; pub(crate) mod vint; +pub use dictionary::Dictionary; +pub use streamer::{Streamer, StreamerBuilder}; mod block_reader; pub use self::block_reader::BlockReader; pub use self::delta::{DeltaReader, DeltaWriter}; pub use self::merge::VoidMerge; use self::value::{U64MonotonicReader, U64MonotonicWriter, ValueReader, ValueWriter}; +use crate::value::{RangeReader, RangeWriter}; + +pub type TermOrdinal = u64; const DEFAULT_KEY_CAPACITY: usize = 50; @@ -31,15 +39,15 @@ fn common_prefix_len(left: &[u8], right: &[u8]) -> usize { pub struct SSTableDataCorruption; pub trait SSTable: Sized { - type Value; - type Reader: ValueReader; - type Writer: ValueWriter; + type Value: Clone; + type ValueReader: ValueReader; + type ValueWriter: ValueWriter; - fn delta_writer(write: W) -> DeltaWriter { + fn delta_writer(write: W) -> DeltaWriter { DeltaWriter::new(write) } - fn writer(write: W) -> Writer { + fn writer(write: W) -> Writer { Writer { previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), num_terms: 0u64, @@ -49,17 +57,22 @@ pub trait SSTable: Sized { } } - fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::Reader> { + fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::ValueReader> { DeltaReader::new(reader) } - fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::Reader> { + fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::ValueReader> { Reader { key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), delta_reader: Self::delta_reader(reader), } } + /// Returns an empty static reader. + fn create_empty_reader() -> Reader<'static, Self::ValueReader> { + Self::reader(&b""[..]) + } + fn merge>( io_readers: Vec, w: W, @@ -76,8 +89,8 @@ pub struct VoidSSTable; impl SSTable for VoidSSTable { type Value = (); - type Reader = value::VoidReader; - type Writer = value::VoidWriter; + type ValueReader = value::VoidReader; + type ValueWriter = value::VoidWriter; } #[allow(dead_code)] @@ -86,9 +99,20 @@ pub struct SSTableMonotonicU64; impl SSTable for SSTableMonotonicU64 { type Value = u64; - type Reader = U64MonotonicReader; + type ValueReader = U64MonotonicReader; - type Writer = U64MonotonicWriter; + type ValueWriter = U64MonotonicWriter; +} + +/// Retpresent +pub struct SSTableRange; + +impl SSTable for SSTableRange { + type Value = Range; + + type ValueReader = RangeReader; + + type ValueWriter = RangeWriter; } pub struct Reader<'a, TValueReader> { @@ -141,11 +165,23 @@ where W: io::Write, TValueWriter: value::ValueWriter, { + pub fn create(wrt: W) -> io::Result { + Ok(Writer { + previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), + num_terms: 0u64, + index_builder: SSTableIndexBuilder::default(), + delta_writer: DeltaWriter::new(wrt), + first_ordinal_of_the_block: 0u64, + }) + } + + #[inline(always)] pub(crate) fn current_key(&self) -> &[u8] { &self.previous_key[..] } - pub fn write_key(&mut self, key: &[u8]) { + #[inline] + pub fn insert_key(&mut self, key: &[u8]) -> io::Result<()> { // If this is the first key in the block, we use it to // shorten the last term in the last block. if self.first_ordinal_of_the_block == self.num_terms { @@ -165,16 +201,22 @@ where self.previous_key.resize(key.len(), 0u8); self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]); self.delta_writer.write_suffix(keep_len, &key[keep_len..]); - } - - #[allow(dead_code)] - pub fn write(&mut self, key: &[u8], value: &TValueWriter::Value) -> io::Result<()> { - self.write_key(key); - self.write_value(value)?; Ok(()) } - pub fn write_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> { + #[inline] + pub fn insert>( + &mut self, + key: K, + value: &TValueWriter::Value, + ) -> io::Result<()> { + self.insert_key(key.as_ref())?; + self.insert_value(value)?; + Ok(()) + } + + #[inline] + pub fn insert_value(&mut self, value: &TValueWriter::Value) -> io::Result<()> { self.delta_writer.write_value(value); self.num_terms += 1u64; self.flush_block_if_required() @@ -193,7 +235,7 @@ where Ok(()) } - pub fn finalize(mut self) -> io::Result { + pub fn finish(mut self) -> io::Result { if let Some(byte_range) = self.delta_writer.flush_block()? { self.index_builder.add_block( &self.previous_key[..], @@ -202,7 +244,7 @@ where ); self.first_ordinal_of_the_block = self.num_terms; } - let mut wrt = self.delta_writer.finalize(); + let mut wrt = self.delta_writer.finish(); wrt.write_all(&0u32.to_le_bytes())?; let offset = wrt.written_bytes(); @@ -246,10 +288,10 @@ mod test { let mut buffer = vec![]; { let mut sstable_writer = VoidSSTable::writer(&mut buffer); - assert!(sstable_writer.write(&long_key[..], &()).is_ok()); - assert!(sstable_writer.write(&[0, 3, 4], &()).is_ok()); - assert!(sstable_writer.write(&long_key2[..], &()).is_ok()); - assert!(sstable_writer.finalize().is_ok()); + assert!(sstable_writer.insert(&long_key[..], &()).is_ok()); + assert!(sstable_writer.insert(&[0, 3, 4], &()).is_ok()); + assert!(sstable_writer.insert(&long_key2[..], &()).is_ok()); + assert!(sstable_writer.finish().is_ok()); } let mut sstable_reader = VoidSSTable::reader(&buffer[..]); assert!(sstable_reader.advance().unwrap()); @@ -266,10 +308,10 @@ mod test { let mut buffer = vec![]; { let mut sstable_writer = VoidSSTable::writer(&mut buffer); - assert!(sstable_writer.write(&[17u8], &()).is_ok()); - assert!(sstable_writer.write(&[17u8, 18u8, 19u8], &()).is_ok()); - assert!(sstable_writer.write(&[17u8, 20u8], &()).is_ok()); - assert!(sstable_writer.finalize().is_ok()); + assert!(sstable_writer.insert(&[17u8], &()).is_ok()); + assert!(sstable_writer.insert(&[17u8, 18u8, 19u8], &()).is_ok()); + assert!(sstable_writer.insert(&[17u8, 20u8], &()).is_ok()); + assert!(sstable_writer.finish().is_ok()); } assert_eq!( &buffer, @@ -304,8 +346,8 @@ mod test { fn test_simple_sstable_non_increasing_key() { let mut buffer = vec![]; let mut sstable_writer = VoidSSTable::writer(&mut buffer); - assert!(sstable_writer.write(&[17u8], &()).is_ok()); - assert!(sstable_writer.write(&[16u8], &()).is_ok()); + assert!(sstable_writer.insert(&[17u8], &()).is_ok()); + assert!(sstable_writer.insert(&[16u8], &()).is_ok()); } #[test] @@ -313,9 +355,9 @@ mod test { let mut buffer = Vec::new(); { let mut writer = VoidSSTable::writer(&mut buffer); - writer.write(b"abcd", &()).unwrap(); - writer.write(b"abe", &()).unwrap(); - writer.finalize().unwrap(); + writer.insert(b"abcd", &()).unwrap(); + writer.insert(b"abe", &()).unwrap(); + writer.finish().unwrap(); } let mut output = Vec::new(); assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok()); @@ -327,9 +369,9 @@ mod test { let mut buffer = Vec::new(); { let mut writer = VoidSSTable::writer(&mut buffer); - writer.write(b"abcd", &()).unwrap(); - writer.write(b"abe", &()).unwrap(); - writer.finalize().unwrap(); + writer.insert(b"abcd", &()).unwrap(); + writer.insert(b"abe", &()).unwrap(); + writer.finish().unwrap(); } let mut output = Vec::new(); assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok()); @@ -340,10 +382,10 @@ mod test { fn test_sstable_u64() -> io::Result<()> { let mut buffer = Vec::new(); let mut writer = SSTableMonotonicU64::writer(&mut buffer); - writer.write(b"abcd", &1u64)?; - writer.write(b"abe", &4u64)?; - writer.write(b"gogo", &4324234234234234u64)?; - writer.finalize()?; + writer.insert(b"abcd", &1u64)?; + writer.insert(b"abe", &4u64)?; + writer.insert(b"gogo", &4324234234234234u64)?; + writer.finish()?; let mut reader = SSTableMonotonicU64::reader(&buffer[..]); assert!(reader.advance()?); assert_eq!(reader.key(), b"abcd"); @@ -357,4 +399,10 @@ mod test { assert!(!reader.advance()?); Ok(()) } + + #[test] + fn test_sstable_empty() { + let mut sstable_range_empty = crate::SSTableRange::create_empty_reader(); + assert!(!sstable_range_empty.advance().unwrap()); + } } diff --git a/sstable/src/merge/heap_merge.rs b/sstable/src/merge/heap_merge.rs index 9e852918d..895a8e082 100644 --- a/sstable/src/merge/heap_merge.rs +++ b/sstable/src/merge/heap_merge.rs @@ -28,11 +28,11 @@ impl> PartialEq for HeapItem { #[allow(dead_code)] pub fn merge_sstable>( - readers: Vec>, - mut writer: Writer, + readers: Vec>, + mut writer: Writer, mut merger: M, ) -> io::Result<()> { - let mut heap: BinaryHeap>> = + let mut heap: BinaryHeap>> = BinaryHeap::with_capacity(readers.len()); for mut reader in readers { if reader.advance()? { @@ -43,7 +43,7 @@ pub fn merge_sstable>( let len = heap.len(); let mut value_merger; if let Some(mut head) = heap.peek_mut() { - writer.write_key(head.0.key()); + writer.insert_key(head.0.key()).unwrap(); value_merger = merger.new_value(head.0.value()); if !head.0.advance()? { PeekMut::pop(head); @@ -64,9 +64,9 @@ pub fn merge_sstable>( break; } let value = value_merger.finish(); - writer.write_value(&value)?; + writer.insert_value(&value)?; writer.flush_block_if_required()?; } - writer.finalize()?; + writer.finish()?; Ok(()) } diff --git a/sstable/src/merge/mod.rs b/sstable/src/merge/mod.rs index 3170500b4..72542e4c4 100644 --- a/sstable/src/merge/mod.rs +++ b/sstable/src/merge/mod.rs @@ -79,9 +79,9 @@ mod tests { { let mut sstable_writer = VoidSSTable::writer(&mut buffer); for &key in keys { - assert!(sstable_writer.write(key.as_bytes(), &()).is_ok()); + assert!(sstable_writer.insert(key.as_bytes(), &()).is_ok()); } - assert!(sstable_writer.finalize().is_ok()); + assert!(sstable_writer.finish().is_ok()); } buffer } @@ -91,9 +91,9 @@ mod tests { { let mut sstable_writer = SSTableMonotonicU64::writer(&mut buffer); for (key, val) in keys { - assert!(sstable_writer.write(key.as_bytes(), val).is_ok()); + assert!(sstable_writer.insert(key.as_bytes(), val).is_ok()); } - assert!(sstable_writer.finalize().is_ok()); + assert!(sstable_writer.finish().is_ok()); } buffer } diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index b283b961c..f08aef23f 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -15,10 +15,17 @@ impl SSTableIndex { ciborium::de::from_reader(data).map_err(|_| SSTableDataCorruption) } - pub fn search(&self, key: &[u8]) -> Option { + pub fn search_block(&self, key: &[u8]) -> Option { + self.search_block_from(key).next() + } + + pub fn search_block_from<'key, 'slf: 'key>( + &'slf self, + key: &'key [u8], + ) -> impl Iterator + Clone + 'key { self.blocks .iter() - .find(|block| &block.last_key_or_greater[..] >= key) + .skip_while(|block| &block.last_key_or_greater[..] < key) .map(|block| block.block_addr.clone()) } } @@ -105,7 +112,7 @@ mod tests { sstable_builder.serialize(&mut buffer).unwrap(); let sstable_index = SSTableIndex::load(&buffer[..]).unwrap(); assert_eq!( - sstable_index.search(b"bbbde"), + sstable_index.search_block(b"bbbde"), Some(BlockAddr { first_ordinal: 10u64, byte_range: 30..40 diff --git a/src/termdict/sstable_termdict/streamer.rs b/sstable/src/streamer.rs similarity index 65% rename from src/termdict/sstable_termdict/streamer.rs rename to sstable/src/streamer.rs index 8289862ec..e8be2050b 100644 --- a/src/termdict/sstable_termdict/streamer.rs +++ b/sstable/src/streamer.rs @@ -4,31 +4,39 @@ use std::ops::Bound; use tantivy_fst::automaton::AlwaysMatch; use tantivy_fst::Automaton; -use super::TermDictionary; -use crate::postings::TermInfo; -use crate::termdict::sstable_termdict::TermInfoReader; -use crate::termdict::TermOrdinal; +use crate::dictionary::Dictionary; +use crate::{SSTable, TermOrdinal}; -/// `TermStreamerBuilder` is a helper object used to define +/// `StreamerBuilder` is a helper object used to define /// a range of terms that should be streamed. -pub struct TermStreamerBuilder<'a, A = AlwaysMatch> +pub struct StreamerBuilder<'a, TSSTable, A = AlwaysMatch> where A: Automaton, A::State: Clone, + TSSTable: SSTable, { - term_dict: &'a TermDictionary, + term_dict: &'a Dictionary, automaton: A, lower: Bound>, upper: Bound>, } -impl<'a, A> TermStreamerBuilder<'a, A> +fn bound_as_byte_slice(bound: &Bound>) -> Bound<&[u8]> { + match bound.as_ref() { + Bound::Included(key) => Bound::Included(key.as_slice()), + Bound::Excluded(key) => Bound::Excluded(key.as_slice()), + Bound::Unbounded => Bound::Unbounded, + } +} + +impl<'a, TSSTable, A> StreamerBuilder<'a, TSSTable, A> where A: Automaton, A::State: Clone, + TSSTable: SSTable, { - pub(crate) fn new(term_dict: &'a TermDictionary, automaton: A) -> Self { - TermStreamerBuilder { + pub(crate) fn new(term_dict: &'a Dictionary, automaton: A) -> Self { + StreamerBuilder { term_dict, automaton, lower: Bound::Unbounded, @@ -61,12 +69,18 @@ where } /// Creates the stream corresponding to the range - /// of terms defined using the `TermStreamerBuilder`. - pub fn into_stream(self) -> io::Result> { + /// of terms defined using the `StreamerBuilder`. + pub fn into_stream(self) -> io::Result> { // TODO Optimize by skipping to the right first block. let start_state = self.automaton.start(); - let delta_reader = self.term_dict.sstable_delta_reader()?; - Ok(TermStreamer { + let key_range = ( + bound_as_byte_slice(&self.lower), + bound_as_byte_slice(&self.upper), + ); + let delta_reader = self + .term_dict + .sstable_delta_reader_for_key_range(key_range)?; + Ok(Streamer { automaton: self.automaton, states: vec![start_state], delta_reader, @@ -78,26 +92,28 @@ where } } -/// `TermStreamer` acts as a cursor over a range of terms of a segment. +/// `Streamer` acts as a cursor over a range of terms of a segment. /// Terms are guaranteed to be sorted. -pub struct TermStreamer<'a, A = AlwaysMatch> +pub struct Streamer<'a, TSSTable, A = AlwaysMatch> where A: Automaton, A::State: Clone, + TSSTable: SSTable, { automaton: A, states: Vec, - delta_reader: sstable::DeltaReader<'a, TermInfoReader>, + delta_reader: crate::DeltaReader<'a, TSSTable::ValueReader>, key: Vec, term_ord: Option, lower_bound: Bound>, upper_bound: Bound>, } -impl<'a, A> TermStreamer<'a, A> +impl<'a, TSSTable, A> Streamer<'a, TSSTable, A> where A: Automaton, A::State: Clone, + TSSTable: SSTable, { /// Advance position the stream on the next item. /// Before the first call to `.advance()`, the stream @@ -174,13 +190,13 @@ where /// /// Calling `.value()` before the first call to `.advance()` returns /// `V::default()`. - pub fn value(&self) -> &TermInfo { + pub fn value(&self) -> &TSSTable::Value { self.delta_reader.value() } /// Return the next `(key, value)` pair. #[allow(clippy::should_implement_trait)] - pub fn next(&mut self) -> Option<(&[u8], &TermInfo)> { + pub fn next(&mut self) -> Option<(&[u8], &TSSTable::Value)> { if self.advance() { Some((self.key(), self.value())) } else { @@ -191,60 +207,54 @@ where #[cfg(test)] mod tests { - use super::super::TermDictionary; - use crate::directory::OwnedBytes; - use crate::postings::TermInfo; + use std::io; - fn make_term_info(i: usize) -> TermInfo { - TermInfo { - doc_freq: 1000u32 + i as u32, - postings_range: (i + 10) * (i * 10)..((i + 1) + 10) * ((i + 1) * 10), - positions_range: i * 500..(i + 1) * 500, - } - } + use common::OwnedBytes; - fn create_test_term_dictionary() -> crate::Result { - let mut term_dict_builder = super::super::TermDictionaryBuilder::create(Vec::new())?; - term_dict_builder.insert(b"abaisance", &make_term_info(0))?; - term_dict_builder.insert(b"abalation", &make_term_info(1))?; - term_dict_builder.insert(b"abalienate", &make_term_info(2))?; - term_dict_builder.insert(b"abandon", &make_term_info(3))?; - let buffer = term_dict_builder.finish()?; + use crate::{Dictionary, SSTableMonotonicU64}; + + fn create_test_dictionary() -> io::Result> { + let mut dict_builder = Dictionary::::builder(Vec::new())?; + dict_builder.insert(b"abaisance", &0)?; + dict_builder.insert(b"abalation", &1)?; + dict_builder.insert(b"abalienate", &2)?; + dict_builder.insert(b"abandon", &3)?; + let buffer = dict_builder.finish()?; let owned_bytes = OwnedBytes::new(buffer); - TermDictionary::from_bytes(owned_bytes) + Dictionary::from_bytes(owned_bytes) } #[test] - fn test_sstable_stream() -> crate::Result<()> { - let term_dict = create_test_term_dictionary()?; - let mut term_streamer = term_dict.stream()?; - assert!(term_streamer.advance()); - assert_eq!(term_streamer.key(), b"abaisance"); - assert_eq!(term_streamer.value().doc_freq, 1000u32); - assert!(term_streamer.advance()); - assert_eq!(term_streamer.key(), b"abalation"); - assert_eq!(term_streamer.value().doc_freq, 1001u32); - assert!(term_streamer.advance()); - assert_eq!(term_streamer.key(), b"abalienate"); - assert_eq!(term_streamer.value().doc_freq, 1002u32); - assert!(term_streamer.advance()); - assert_eq!(term_streamer.key(), b"abandon"); - assert_eq!(term_streamer.value().doc_freq, 1003u32); - assert!(!term_streamer.advance()); + fn test_sstable_stream() -> io::Result<()> { + let dict = create_test_dictionary()?; + let mut streamer = dict.stream()?; + assert!(streamer.advance()); + assert_eq!(streamer.key(), b"abaisance"); + assert_eq!(streamer.value(), &0); + assert!(streamer.advance()); + assert_eq!(streamer.key(), b"abalation"); + assert_eq!(streamer.value(), &1); + assert!(streamer.advance()); + assert_eq!(streamer.key(), b"abalienate"); + assert_eq!(streamer.value(), &2); + assert!(streamer.advance()); + assert_eq!(streamer.key(), b"abandon"); + assert_eq!(streamer.value(), &3); + assert!(!streamer.advance()); Ok(()) } #[test] - fn test_sstable_search() -> crate::Result<()> { - let term_dict = create_test_term_dictionary()?; + fn test_sstable_search() -> io::Result<()> { + let term_dict = create_test_dictionary()?; let ptn = tantivy_fst::Regex::new("ab.*t.*").unwrap(); let mut term_streamer = term_dict.search(ptn).into_stream()?; assert!(term_streamer.advance()); assert_eq!(term_streamer.key(), b"abalation"); - assert_eq!(term_streamer.value().doc_freq, 1001u32); + assert_eq!(term_streamer.value(), &1u64); assert!(term_streamer.advance()); assert_eq!(term_streamer.key(), b"abalienate"); - assert_eq!(term_streamer.value().doc_freq, 1002u32); + assert_eq!(term_streamer.value(), &2u64); assert!(!term_streamer.advance()); Ok(()) } diff --git a/sstable/src/value.rs b/sstable/src/value.rs deleted file mode 100644 index 05d0d12de..000000000 --- a/sstable/src/value.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::io; - -use super::{vint, BlockReader}; - -pub trait ValueReader: Default { - type Value; - - fn value(&self, idx: usize) -> &Self::Value; - - fn read(&mut self, reader: &mut BlockReader) -> io::Result<()>; -} - -pub trait ValueWriter: Default { - type Value; - - fn write(&mut self, val: &Self::Value); - - fn write_block(&mut self, writer: &mut Vec); -} - -#[derive(Default)] -pub struct VoidReader; - -impl ValueReader for VoidReader { - type Value = (); - - fn value(&self, _idx: usize) -> &() { - &() - } - - fn read(&mut self, _reader: &mut BlockReader) -> io::Result<()> { - Ok(()) - } -} - -#[derive(Default)] -pub struct VoidWriter; - -impl ValueWriter for VoidWriter { - type Value = (); - - fn write(&mut self, _val: &()) {} - - fn write_block(&mut self, _writer: &mut Vec) {} -} - -#[derive(Default)] -pub struct U64MonotonicWriter { - vals: Vec, -} - -impl ValueWriter for U64MonotonicWriter { - type Value = u64; - - fn write(&mut self, val: &Self::Value) { - self.vals.push(*val); - } - - fn write_block(&mut self, writer: &mut Vec) { - let mut prev_val = 0u64; - vint::serialize_into_vec(self.vals.len() as u64, writer); - for &val in &self.vals { - let delta = val - prev_val; - vint::serialize_into_vec(delta, writer); - prev_val = val; - } - self.vals.clear(); - } -} - -#[derive(Default)] -pub struct U64MonotonicReader { - vals: Vec, -} - -impl ValueReader for U64MonotonicReader { - type Value = u64; - - fn value(&self, idx: usize) -> &Self::Value { - &self.vals[idx] - } - - fn read(&mut self, reader: &mut BlockReader) -> io::Result<()> { - let len = reader.deserialize_u64() as usize; - self.vals.clear(); - let mut prev_val = 0u64; - for _ in 0..len { - let delta = reader.deserialize_u64(); - let val = prev_val + delta; - self.vals.push(val); - prev_val = val; - } - Ok(()) - } -} diff --git a/sstable/src/value/mod.rs b/sstable/src/value/mod.rs new file mode 100644 index 000000000..dfe98714a --- /dev/null +++ b/sstable/src/value/mod.rs @@ -0,0 +1,75 @@ +mod range; +mod u64_monotonic; +mod void; + +use std::io; + +/// `ValueReader` is a trait describing the contract of something +/// reading blocks of value, and offering random access within this values. +pub trait ValueReader: Default { + /// Type of the value being read. + type Value; + + /// Access the value at index `idx`, in the last block that was read + /// via a call to `ValueReader::read`. + fn value(&self, idx: usize) -> &Self::Value; + + /// Loads a block. + /// + /// Returns the number of bytes that were written. + fn load(&mut self, data: &[u8]) -> io::Result; +} + +pub trait ValueWriter: Default { + /// Type of the value being written. + type Value; + + /// Records a new value. + /// This method usually just accumulates data in a `Vec`, + /// only to be serialized on the call to `ValueWriter::write_block`. + fn write(&mut self, val: &Self::Value); + + /// Serializes the accumulated values into the output buffer. + fn serialize_block(&mut self, output: &mut Vec); +} + +pub use range::{RangeReader, RangeWriter}; +pub use u64_monotonic::{U64MonotonicReader, U64MonotonicWriter}; +pub use void::{VoidReader, VoidWriter}; + +fn deserialize_u64(data: &mut &[u8]) -> u64 { + let (num_bytes, val) = super::vint::deserialize_read(data); + *data = &data[num_bytes..]; + val +} + +#[cfg(test)] +pub(crate) mod tests { + use std::fmt; + + use super::{ValueReader, ValueWriter}; + + pub(crate) fn test_value_reader_writer< + V: Eq + fmt::Debug, + TReader: ValueReader, + TWriter: ValueWriter, + >( + value_block: &[V], + ) { + let mut buffer = Vec::new(); + { + let mut writer = TWriter::default(); + for value in value_block { + writer.write(value); + } + writer.serialize_block(&mut buffer); + } + let data_len = buffer.len(); + buffer.extend_from_slice(&b"extradata"[..]); + let mut reader = TReader::default(); + assert_eq!(reader.load(&buffer[..]).unwrap(), data_len); + for (i, val) in value_block.iter().enumerate() { + assert_eq!(reader.value(i), val); + } + } +} diff --git a/sstable/src/value/range.rs b/sstable/src/value/range.rs new file mode 100644 index 000000000..2cc5adcbb --- /dev/null +++ b/sstable/src/value/range.rs @@ -0,0 +1,95 @@ +use std::io; +use std::ops::Range; + +use crate::value::{deserialize_u64, ValueReader, ValueWriter}; + +#[derive(Default)] +pub struct RangeReader { + vals: Vec>, +} + +impl ValueReader for RangeReader { + type Value = Range; + + fn value(&self, idx: usize) -> &Range { + &self.vals[idx] + } + + fn load(&mut self, mut data: &[u8]) -> io::Result { + self.vals.clear(); + let original_num_bytes = data.len(); + let len = deserialize_u64(&mut data) as usize; + if len != 0 { + let mut prev_val = deserialize_u64(&mut data); + for _ in 1..len { + let next_val = prev_val + deserialize_u64(&mut data); + self.vals.push(prev_val..next_val); + prev_val = next_val; + } + } + Ok(original_num_bytes - data.len()) + } +} + +#[derive(Default)] +pub struct RangeWriter { + vals: Vec, +} + +impl ValueWriter for RangeWriter { + type Value = Range; + + fn write(&mut self, val: &Range) { + if let Some(previous_offset) = self.vals.last().copied() { + assert_eq!(previous_offset, val.start); + self.vals.push(val.end); + } else { + self.vals.push(val.start); + self.vals.push(val.end) + } + } + + fn serialize_block(&mut self, writer: &mut Vec) { + let mut prev_val = 0u64; + crate::vint::serialize_into_vec(self.vals.len() as u64, writer); + for &val in &self.vals { + let delta = val - prev_val; + crate::vint::serialize_into_vec(delta, writer); + prev_val = val; + } + self.vals.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_range_reader_writer() { + crate::value::tests::test_value_reader_writer::<_, RangeReader, RangeWriter>(&[]); + crate::value::tests::test_value_reader_writer::<_, RangeReader, RangeWriter>(&[0..3]); + crate::value::tests::test_value_reader_writer::<_, RangeReader, RangeWriter>(&[ + 0..3, + 3..10, + ]); + crate::value::tests::test_value_reader_writer::<_, RangeReader, RangeWriter>(&[ + 0..0, + 0..10, + ]); + crate::value::tests::test_value_reader_writer::<_, RangeReader, RangeWriter>(&[ + 100..110, + 110..121, + 121..1250, + ]); + } + + #[test] + #[should_panic] + fn test_range_reader_writer_panics() { + crate::value::tests::test_value_reader_writer::<_, RangeReader, RangeWriter>(&[ + 1..3, + 4..10, + ]); + } +} diff --git a/sstable/src/value/u64_monotonic.rs b/sstable/src/value/u64_monotonic.rs new file mode 100644 index 000000000..fb280af79 --- /dev/null +++ b/sstable/src/value/u64_monotonic.rs @@ -0,0 +1,73 @@ +use std::io; + +use crate::value::{deserialize_u64, ValueReader, ValueWriter}; +use crate::vint; + +#[derive(Default)] +pub struct U64MonotonicReader { + vals: Vec, +} + +impl ValueReader for U64MonotonicReader { + type Value = u64; + + fn value(&self, idx: usize) -> &Self::Value { + &self.vals[idx] + } + + fn load(&mut self, mut data: &[u8]) -> io::Result { + let original_num_bytes = data.len(); + let num_vals = deserialize_u64(&mut data) as usize; + self.vals.clear(); + let mut prev_val = 0u64; + for _ in 0..num_vals { + let delta = deserialize_u64(&mut data); + let val = prev_val + delta; + self.vals.push(val); + prev_val = val; + } + Ok(original_num_bytes - data.len()) + } +} + +#[derive(Default)] +pub struct U64MonotonicWriter { + vals: Vec, +} + +impl ValueWriter for U64MonotonicWriter { + type Value = u64; + + fn write(&mut self, val: &Self::Value) { + self.vals.push(*val); + } + + fn serialize_block(&mut self, output: &mut Vec) { + let mut prev_val = 0u64; + vint::serialize_into_vec(self.vals.len() as u64, output); + for &val in &self.vals { + let delta = val - prev_val; + vint::serialize_into_vec(delta, output); + prev_val = val; + } + self.vals.clear(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_u64_monotonic_reader_writer() { + crate::value::tests::test_value_reader_writer::<_, U64MonotonicReader, U64MonotonicWriter>( + &[], + ); + crate::value::tests::test_value_reader_writer::<_, U64MonotonicReader, U64MonotonicWriter>( + &[5], + ); + crate::value::tests::test_value_reader_writer::<_, U64MonotonicReader, U64MonotonicWriter>( + &[1u64, 30u64], + ); + } +} diff --git a/sstable/src/value/void.rs b/sstable/src/value/void.rs new file mode 100644 index 000000000..2da7196b5 --- /dev/null +++ b/sstable/src/value/void.rs @@ -0,0 +1,41 @@ +use std::io; + +use crate::value::{ValueReader, ValueWriter}; + +#[derive(Default)] +pub struct VoidReader; + +impl ValueReader for VoidReader { + type Value = (); + + fn value(&self, _idx: usize) -> &() { + &() + } + + fn load(&mut self, _data: &[u8]) -> io::Result { + Ok(0) + } +} + +#[derive(Default)] +pub struct VoidWriter; + +impl ValueWriter for VoidWriter { + type Value = (); + + fn write(&mut self, _val: &()) {} + + fn serialize_block(&mut self, _output: &mut Vec) {} +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_range_reader_writer() { + crate::value::tests::test_value_reader_writer::<_, VoidReader, VoidWriter>(&[]); + crate::value::tests::test_value_reader_writer::<_, VoidReader, VoidWriter>(&[()]); + crate::value::tests::test_value_reader_writer::<_, VoidReader, VoidWriter>(&[(), (), ()]); + } +} diff --git a/stacker/Cargo.toml b/stacker/Cargo.toml index f683ff5ec..933eb0737 100644 --- a/stacker/Cargo.toml +++ b/stacker/Cargo.toml @@ -6,4 +6,4 @@ edition = "2021" [dependencies] murmurhash32 = "0.2" byteorder = "1" -common = { version = "0.4", path = "../common/", package = "tantivy-common" } +common = { version = "0.5", path = "../common/", package = "tantivy-common" }