diff --git a/columnar/src/tests.rs b/columnar/src/tests.rs index 117ebc663..88e97186c 100644 --- a/columnar/src/tests.rs +++ b/columnar/src/tests.rs @@ -26,7 +26,7 @@ fn test_dataframe_writer_str() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("my_string").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 85); + assert_eq!(cols[0].num_bytes(), 87); } #[test] @@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() { assert_eq!(columnar.num_columns(), 1); let cols: Vec = columnar.read_columns("my_string").unwrap(); assert_eq!(cols.len(), 1); - assert_eq!(cols[0].num_bytes(), 85); + assert_eq!(cols[0].num_bytes(), 87); } #[test] diff --git a/ownedbytes/src/lib.rs b/ownedbytes/src/lib.rs index ef0ab72ac..1b6046e2e 100644 --- a/ownedbytes/src/lib.rs +++ b/ownedbytes/src/lib.rs @@ -139,6 +139,16 @@ impl OwnedBytes { self.advance(8); u64::from_le_bytes(octlet) } + + /// Reads an `u32` encoded as little-endian from the `OwnedBytes` and advance by 4 bytes. + #[inline] + pub fn read_u32(&mut self) -> u32 { + assert!(self.len() > 3); + + let quad: [u8; 4] = self.as_slice()[..4].try_into().unwrap(); + self.advance(4); + u32::from_le_bytes(quad) + } } impl fmt::Debug for OwnedBytes { diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 582048bdf..93d16d1bb 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -130,7 +130,7 @@ mod tests { } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 91); + assert_eq!(file.len(), 93); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let column = fast_field_readers .u64("field") @@ -180,7 +180,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 119); + assert_eq!(file.len(), 121); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let col = fast_field_readers .u64("field") @@ -213,7 +213,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 92); + assert_eq!(file.len(), 94); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_reader = fast_field_readers .u64("field") @@ -245,7 +245,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 4487); + assert_eq!(file.len(), 4489); { let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let col = fast_field_readers @@ -278,7 +278,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 263); + assert_eq!(file.len(), 265); { let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); @@ -772,7 +772,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 100); + assert_eq!(file.len(), 102); let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); let bool_col = fast_field_readers.bool("field_bool").unwrap(); assert_eq!(bool_col.first(0), Some(true)); @@ -804,7 +804,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 112); + assert_eq!(file.len(), 114); let readers = FastFieldReaders::open(file, schema).unwrap(); let bool_col = readers.bool("field_bool").unwrap(); for i in 0..25 { @@ -829,7 +829,7 @@ mod tests { write.terminate().unwrap(); } let file = directory.open_read(path).unwrap(); - assert_eq!(file.len(), 102); + assert_eq!(file.len(), 104); let fastfield_readers = FastFieldReaders::open(file, schema).unwrap(); let col = fastfield_readers.bool("field_bool").unwrap(); assert_eq!(col.first(0), None); diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml index 6b5664e3f..fe8c19b93 100644 --- a/sstable/Cargo.toml +++ b/sstable/Cargo.toml @@ -7,6 +7,8 @@ license = "MIT" [dependencies] common = {path="../common", package="tantivy-common"} tantivy-fst = "0.4" +# experimental gives us access to Decompressor::upper_bound +zstd = { version = "0.12", features = ["experimental"] } [dev-dependencies] proptest = "1" diff --git a/sstable/README.md b/sstable/README.md index 83d3c8139..bec6d70f9 100644 --- a/sstable/README.md +++ b/sstable/README.md @@ -43,12 +43,14 @@ Overview of the SSTable format. Unless noted otherwise, numbers are little-endia ### SSTBlock ``` -+----------+--------+-------+-------+-----+ -| BlockLen | Values | Delta | Delta | ... | -+----------+--------+-------+-------+-----+ - |----( # of deltas)---| ++----------+----------+--------+-------+-------+-----+ +| BlockLen | Compress | Values | Delta | Delta | ... | ++----------+----------+--------+-------+-------+-----+ + | |----( # of deltas)---| + |------(maybe compressed)------| ``` -- BlockLen(u32): length of the block +- BlockLen(u32): length of the block, including the compress byte. +- Compress(u8): indicate whether block is compressed. 0 if not compressed, 1 if compressed. - Values: an application defined format storing a sequence of value, capable of determining it own length - Delta @@ -83,7 +85,7 @@ Otherwise: - Keep(VInt): number of bytes to pop -Note: there is no ambiguity between both representation as Add is always guarantee to be non-zero, except for the very first key of an SSTable, where Keep is guaranteed to be zero. +Note: as the SSTable does not support redundant keys, there is no ambiguity between both representation. Add is always guaranteed to be non-zero, except for the very first key of an SSTable, where Keep is guaranteed to be zero. ### SSTFooter ``` @@ -95,7 +97,7 @@ Note: there is no ambiguity between both representation as Add is always guarant - Block(SSTBlock): uses IndexValue for its Values format - IndexOffset(u64): Offset to the start of the SSTFooter - NumTerm(u64): number of terms in the sstable -- Version(u32): Currently defined to 0x00\_00\_00\_01 +- Version(u32): Currently equal to 2 ### IndexValue ``` diff --git a/sstable/src/block_reader.rs b/sstable/src/block_reader.rs index c16440afc..ee01ed570 100644 --- a/sstable/src/block_reader.rs +++ b/sstable/src/block_reader.rs @@ -1,21 +1,17 @@ -use std::io; +use std::io::{self, Read}; use std::ops::Range; -pub struct BlockReader<'a> { +use common::OwnedBytes; +use zstd::bulk::Decompressor; + +pub struct BlockReader { buffer: Vec, - reader: Box, + reader: OwnedBytes, offset: usize, } -#[inline] -fn read_u32(read: &mut dyn io::Read) -> io::Result { - let mut buf = [0u8; 4]; - read.read_exact(&mut buf)?; - Ok(u32::from_le_bytes(buf)) -} - -impl<'a> BlockReader<'a> { - pub fn new(reader: Box) -> BlockReader<'a> { +impl BlockReader { + pub fn new(reader: OwnedBytes) -> BlockReader { BlockReader { buffer: Vec::new(), reader, @@ -36,19 +32,43 @@ impl<'a> BlockReader<'a> { pub fn read_block(&mut self) -> io::Result { self.offset = 0; - let block_len_res = read_u32(self.reader.as_mut()); - if let Err(err) = &block_len_res { - if err.kind() == io::ErrorKind::UnexpectedEof { - return Ok(false); + self.buffer.clear(); + + let block_len = match self.reader.len() { + 0 => return Ok(false), + 1..=3 => { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to read block_len", + )) } - } - let block_len = block_len_res?; - if block_len == 0u32 { - self.buffer.clear(); + _ => self.reader.read_u32() as usize, + }; + if block_len <= 1 { return Ok(false); } - self.buffer.resize(block_len as usize, 0u8); - self.reader.read_exact(&mut self.buffer[..])?; + let compress = self.reader.read_u8(); + let block_len = block_len - 1; + + if self.reader.len() < block_len { + return Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to read block content", + )); + } + if compress == 1 { + let required_capacity = + Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024); + self.buffer.reserve(required_capacity); + Decompressor::new()? + .decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?; + + self.reader.advance(block_len); + } else { + self.buffer.resize(block_len, 0u8); + self.reader.read_exact(&mut self.buffer[..])?; + } + Ok(true) } @@ -68,7 +88,7 @@ impl<'a> BlockReader<'a> { } } -impl<'a> io::Read for BlockReader<'a> { +impl io::Read for BlockReader { fn read(&mut self, buf: &mut [u8]) -> io::Result { let len = self.buffer().read(buf)?; self.advance(len); diff --git a/sstable/src/delta.rs b/sstable/src/delta.rs index 6681b551f..2b8a332d1 100644 --- a/sstable/src/delta.rs +++ b/sstable/src/delta.rs @@ -1,14 +1,15 @@ use std::io::{self, BufWriter, Write}; use std::ops::Range; -use common::CountingWriter; +use common::{CountingWriter, OwnedBytes}; +use zstd::bulk::Compressor; use super::value::ValueWriter; use super::{value, vint, BlockReader}; const FOUR_BIT_LIMITS: usize = 1 << 4; const VINT_MODE: u8 = 1u8; -const BLOCK_LEN: usize = 32_000; +const BLOCK_LEN: usize = 4_000; pub struct DeltaWriter where W: io::Write @@ -45,13 +46,41 @@ where return Ok(None); } let start_offset = self.write.written_bytes() as usize; + let buffer: &mut Vec = &mut self.stateless_buffer; self.value_writer.serialize_block(buffer); self.value_writer.clear(); + let block_len = buffer.len() + self.block.len(); - self.write.write_all(&(block_len as u32).to_le_bytes())?; - self.write.write_all(&buffer[..])?; - self.write.write_all(&self.block[..])?; + + if block_len > 2048 { + buffer.extend_from_slice(&self.block); + self.block.clear(); + + let max_len = zstd::zstd_safe::compress_bound(buffer.len()); + self.block.reserve(max_len); + Compressor::new(3)?.compress_to_buffer(buffer, &mut self.block)?; + + // verify compression had a positive impact + if self.block.len() < buffer.len() { + self.write + .write_all(&(self.block.len() as u32 + 1).to_le_bytes())?; + self.write.write_all(&[1])?; + self.write.write_all(&self.block[..])?; + } else { + self.write + .write_all(&(block_len as u32 + 1).to_le_bytes())?; + self.write.write_all(&[0])?; + self.write.write_all(&buffer[..])?; + } + } else { + self.write + .write_all(&(block_len as u32 + 1).to_le_bytes())?; + self.write.write_all(&[0])?; + self.write.write_all(&buffer[..])?; + self.write.write_all(&self.block[..])?; + } + let end_offset = self.write.written_bytes() as usize; self.block.clear(); buffer.clear(); @@ -93,29 +122,29 @@ where } } -pub struct DeltaReader<'a, TValueReader> { +pub struct DeltaReader { common_prefix_len: usize, suffix_range: Range, value_reader: TValueReader, - block_reader: BlockReader<'a>, + block_reader: BlockReader, idx: usize, } -impl<'a, TValueReader> DeltaReader<'a, TValueReader> +impl DeltaReader where TValueReader: value::ValueReader { - pub fn new(reader: R) -> Self { + pub fn new(reader: OwnedBytes) -> Self { DeltaReader { idx: 0, common_prefix_len: 0, suffix_range: 0..0, value_reader: TValueReader::default(), - block_reader: BlockReader::new(Box::new(reader)), + block_reader: BlockReader::new(reader), } } pub fn empty() -> Self { - DeltaReader::new(&b""[..]) + DeltaReader::new(OwnedBytes::empty()) } fn deserialize_vint(&mut self) -> u64 { diff --git a/sstable/src/dictionary.rs b/sstable/src/dictionary.rs index 3d1c80d60..ef4ab25ee 100644 --- a/sstable/src/dictionary.rs +++ b/sstable/src/dictionary.rs @@ -61,7 +61,7 @@ impl Dictionary { pub(crate) fn sstable_reader_block( &self, block_addr: BlockAddr, - ) -> io::Result> { + ) -> io::Result> { let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?; Ok(TSSTable::reader(data)) } @@ -69,7 +69,7 @@ impl Dictionary { pub(crate) async fn sstable_reader_block_async( &self, block_addr: BlockAddr, - ) -> io::Result> { + ) -> io::Result> { let data = self .sstable_slice .read_bytes_slice_async(block_addr.byte_range) @@ -81,7 +81,7 @@ impl Dictionary { &self, key_range: impl RangeBounds<[u8]>, limit: Option, - ) -> io::Result> { + ) -> io::Result> { let slice = self.file_slice_for_range(key_range, limit); let data = slice.read_bytes_async().await?; Ok(TSSTable::delta_reader(data)) @@ -91,7 +91,7 @@ impl Dictionary { &self, key_range: impl RangeBounds<[u8]>, limit: Option, - ) -> io::Result> { + ) -> io::Result> { let slice = self.file_slice_for_range(key_range, limit); let data = slice.read_bytes()?; Ok(TSSTable::delta_reader(data)) @@ -100,7 +100,7 @@ impl Dictionary { pub(crate) fn sstable_delta_reader_block( &self, block_addr: BlockAddr, - ) -> io::Result> { + ) -> io::Result> { let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?; Ok(TSSTable::delta_reader(data)) } @@ -197,7 +197,7 @@ impl Dictionary { let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); let sstable_index_bytes = index_slice.read_bytes()?; - let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice()) + let sstable_index = SSTableIndex::load(sstable_index_bytes) .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?; Ok(Dictionary { sstable_slice, @@ -351,12 +351,12 @@ impl Dictionary { /// Returns a range builder, to stream all of the terms /// within an interval. - pub fn range(&self) -> StreamerBuilder<'_, TSSTable> { + pub fn range(&self) -> StreamerBuilder { StreamerBuilder::new(self, AlwaysMatch) } /// A stream of all the sorted terms. - pub fn stream(&self) -> io::Result> { + pub fn stream(&self) -> io::Result> { self.range().into_stream() } diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs index 00a88793c..28b6312fe 100644 --- a/sstable/src/lib.rs +++ b/sstable/src/lib.rs @@ -17,7 +17,7 @@ pub use dictionary::Dictionary; pub use streamer::{Streamer, StreamerBuilder}; mod block_reader; -use common::BinarySerializable; +use common::{BinarySerializable, OwnedBytes}; pub use self::block_reader::BlockReader; pub use self::delta::{DeltaReader, DeltaWriter}; @@ -28,7 +28,7 @@ use crate::value::{RangeValueReader, RangeValueWriter}; pub type TermOrdinal = u64; const DEFAULT_KEY_CAPACITY: usize = 50; -const SSTABLE_VERSION: u32 = 1; +const SSTABLE_VERSION: u32 = 2; /// Given two byte string returns the length of /// the longest common prefix. @@ -58,11 +58,11 @@ pub trait SSTable: Sized { Writer::new(wrt) } - fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::ValueReader> { + fn delta_reader(reader: OwnedBytes) -> DeltaReader { DeltaReader::new(reader) } - fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::ValueReader> { + fn reader(reader: OwnedBytes) -> Reader { Reader { key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), delta_reader: Self::delta_reader(reader), @@ -70,12 +70,12 @@ pub trait SSTable: Sized { } /// Returns an empty static reader. - fn create_empty_reader() -> Reader<'static, Self::ValueReader> { - Self::reader(&b""[..]) + fn create_empty_reader() -> Reader { + Self::reader(OwnedBytes::empty()) } - fn merge>( - io_readers: Vec, + fn merge>( + io_readers: Vec, w: W, merger: M, ) -> io::Result<()> { @@ -132,12 +132,12 @@ impl SSTable for RangeSSTable { } /// SSTable reader. -pub struct Reader<'a, TValueReader> { +pub struct Reader { key: Vec, - delta_reader: DeltaReader<'a, TValueReader>, + delta_reader: DeltaReader, } -impl<'a, TValueReader> Reader<'a, TValueReader> +impl Reader where TValueReader: ValueReader { pub fn advance(&mut self) -> io::Result { @@ -163,7 +163,7 @@ where TValueReader: ValueReader } } -impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> { +impl AsRef<[u8]> for Reader { #[inline(always)] fn as_ref(&self) -> &[u8] { &self.key @@ -320,6 +320,8 @@ mod test { use std::io; use std::ops::Bound; + use common::OwnedBytes; + use super::{common_prefix_len, MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable}; fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) { @@ -353,7 +355,8 @@ mod test { assert!(sstable_writer.insert(&long_key2[..], &()).is_ok()); assert!(sstable_writer.finish().is_ok()); } - let mut sstable_reader = VoidSSTable::reader(&buffer[..]); + let buffer = OwnedBytes::new(buffer); + let mut sstable_reader = VoidSSTable::reader(buffer); assert!(sstable_reader.advance().unwrap()); assert_eq!(sstable_reader.key(), &long_key[..]); assert!(sstable_reader.advance().unwrap()); @@ -377,27 +380,22 @@ mod test { &buffer, &[ // block - 7u8, 0u8, 0u8, 0u8, // block len - 16u8, 17u8, // keep 0 push 1 | 17 - 33u8, 18u8, 19u8, // keep 1 push 2 | 18 19 - 17u8, 20u8, // keep 1 push 1 | 20 - // end of block - 0u8, 0u8, 0u8, 0u8, // no more blocks + 8, 0, 0, 0, // size of block + 0, // compression + 16, 17, 33, 18, 19, 17, 20, // data block + 0, 0, 0, 0, // no more block // index - 7u8, 0u8, 0u8, 0u8, // block len - 1, // num blocks - 0, // offset - 11, // len of 1st block - 0, // first ord of 1st block - 32, 17, 20, // keep 0 push 2 | 17 20 - // end of block - 0, 0, 0, 0, // no more blocks - 15, 0, 0, 0, 0, 0, 0, 0, // index start offset - 3, 0, 0, 0, 0, 0, 0, 0, // num_term - 1, 0, 0, 0, // version + 8, 0, 0, 0, // size of index block + 0, // compression + 1, 0, 12, 0, 32, 17, 20, // index block + 0, 0, 0, 0, // no more index block + 16, 0, 0, 0, 0, 0, 0, 0, // index start offset + 3, 0, 0, 0, 0, 0, 0, 0, // num term + 2, 0, 0, 0, // version ] ); - let mut sstable_reader = VoidSSTable::reader(&buffer[..]); + let buffer = OwnedBytes::new(buffer); + let mut sstable_reader = VoidSSTable::reader(buffer); assert!(sstable_reader.advance().unwrap()); assert_eq!(sstable_reader.key(), &[17u8]); assert!(sstable_reader.advance().unwrap()); @@ -425,8 +423,12 @@ mod test { writer.insert(b"abe", &()).unwrap(); writer.finish().unwrap(); } + let buffer = OwnedBytes::new(buffer); let mut output = Vec::new(); - assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok()); + assert!( + VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge) + .is_ok() + ); assert_eq!(&output[..], &buffer[..]); } @@ -442,8 +444,12 @@ mod test { assert_eq!(writer.last_inserted_key(), b"abe"); writer.finish().unwrap(); } + let buffer = OwnedBytes::new(buffer); let mut output = Vec::new(); - assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok()); + assert!( + VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge) + .is_ok() + ); assert_eq!(&output[..], &buffer[..]); } @@ -455,7 +461,8 @@ mod test { writer.insert(b"abe", &4u64)?; writer.insert(b"gogo", &4324234234234234u64)?; writer.finish()?; - let mut reader = MonotonicU64SSTable::reader(&buffer[..]); + let buffer = OwnedBytes::new(buffer); + let mut reader = MonotonicU64SSTable::reader(buffer); assert!(reader.advance()?); assert_eq!(reader.key(), b"abcd"); assert_eq!(reader.value(), &1u64); diff --git a/sstable/src/merge/mod.rs b/sstable/src/merge/mod.rs index 083efb9ad..6661db0bf 100644 --- a/sstable/src/merge/mod.rs +++ b/sstable/src/merge/mod.rs @@ -71,10 +71,12 @@ mod tests { use std::collections::{BTreeMap, BTreeSet}; use std::str; + use common::OwnedBytes; + use super::super::{MonotonicU64SSTable, SSTable, VoidSSTable}; use super::{U64Merge, VoidMerge}; - fn write_sstable(keys: &[&'static str]) -> Vec { + fn write_sstable(keys: &[&'static str]) -> OwnedBytes { let mut buffer: Vec = vec![]; { let mut sstable_writer = VoidSSTable::writer(&mut buffer); @@ -83,10 +85,10 @@ mod tests { } assert!(sstable_writer.finish().is_ok()); } - buffer + OwnedBytes::new(buffer) } - fn write_sstable_u64(keys: &[(&'static str, u64)]) -> Vec { + fn write_sstable_u64(keys: &[(&'static str, u64)]) -> OwnedBytes { let mut buffer: Vec = vec![]; { let mut sstable_writer = MonotonicU64SSTable::writer(&mut buffer); @@ -95,12 +97,11 @@ mod tests { } assert!(sstable_writer.finish().is_ok()); } - buffer + OwnedBytes::new(buffer) } fn merge_test_aux(arrs: &[&[&'static str]]) { let sstables = arrs.iter().cloned().map(write_sstable).collect::>(); - let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect(); let mut merged = BTreeSet::new(); for &arr in arrs.iter() { for &s in arr { @@ -108,8 +109,9 @@ mod tests { } } let mut w = Vec::new(); - assert!(VoidSSTable::merge(sstables_ref, &mut w, VoidMerge).is_ok()); - let mut reader = VoidSSTable::reader(&w[..]); + assert!(VoidSSTable::merge(sstables, &mut w, VoidMerge).is_ok()); + let w = OwnedBytes::new(w); + let mut reader = VoidSSTable::reader(w); for k in merged { assert!(reader.advance().unwrap()); assert_eq!(reader.key(), k.as_bytes()); @@ -123,7 +125,6 @@ mod tests { .cloned() .map(write_sstable_u64) .collect::>(); - let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect(); let mut merged = BTreeMap::new(); for &arr in arrs.iter() { for (key, val) in arr { @@ -132,8 +133,9 @@ mod tests { } } let mut w = Vec::new(); - assert!(MonotonicU64SSTable::merge(sstables_ref, &mut w, U64Merge).is_ok()); - let mut reader = MonotonicU64SSTable::reader(&w[..]); + assert!(MonotonicU64SSTable::merge(sstables, &mut w, U64Merge).is_ok()); + let w = OwnedBytes::new(w); + let mut reader = MonotonicU64SSTable::reader(w); for (k, v) in merged { assert!(reader.advance().unwrap()); assert_eq!(reader.key(), k.as_bytes()); @@ -145,7 +147,7 @@ mod tests { #[test] fn test_merge_simple_reproduce() { let sstable_data = write_sstable(&["a"]); - let mut reader = VoidSSTable::reader(&sstable_data[..]); + let mut reader = VoidSSTable::reader(sstable_data); assert!(reader.advance().unwrap()); assert_eq!(reader.key(), b"a"); assert!(!reader.advance().unwrap()); diff --git a/sstable/src/sstable_index.rs b/sstable/src/sstable_index.rs index b27f6e184..1ce85305c 100644 --- a/sstable/src/sstable_index.rs +++ b/sstable/src/sstable_index.rs @@ -1,6 +1,8 @@ use std::io::{self, Write}; use std::ops::Range; +use common::OwnedBytes; + use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal}; #[derive(Default, Debug, Clone)] @@ -10,7 +12,7 @@ pub struct SSTableIndex { impl SSTableIndex { /// Load an index from its binary representation - pub fn load(data: &[u8]) -> Result { + pub fn load(data: OwnedBytes) -> Result { let mut reader = IndexSSTable::reader(data); let mut blocks = Vec::new(); @@ -179,6 +181,8 @@ impl SSTable for IndexSSTable { #[cfg(test)] mod tests { + use common::OwnedBytes; + use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder}; use crate::SSTableDataCorruption; @@ -191,7 +195,8 @@ mod tests { sstable_builder.add_block(b"dddd", 40..50, 15u64); let mut buffer: Vec = Vec::new(); sstable_builder.serialize(&mut buffer).unwrap(); - let sstable_index = SSTableIndex::load(&buffer[..]).unwrap(); + let buffer = OwnedBytes::new(buffer); + let sstable_index = SSTableIndex::load(buffer).unwrap(); assert_eq!( sstable_index.get_block_with_key(b"bbbde"), Some(BlockAddr { @@ -222,8 +227,9 @@ mod tests { sstable_builder.add_block(b"dddd", 40..50, 15u64); let mut buffer: Vec = Vec::new(); sstable_builder.serialize(&mut buffer).unwrap(); - buffer[1] = 9u8; - let data_corruption_err = SSTableIndex::load(&buffer[..]).err().unwrap(); + buffer[2] = 9u8; + let buffer = OwnedBytes::new(buffer); + let data_corruption_err = SSTableIndex::load(buffer).err().unwrap(); assert!(matches!(data_corruption_err, SSTableDataCorruption)); } diff --git a/sstable/src/streamer.rs b/sstable/src/streamer.rs index 6a6fdd948..e8455c7a5 100644 --- a/sstable/src/streamer.rs +++ b/sstable/src/streamer.rs @@ -80,7 +80,7 @@ where self } - fn delta_reader(&self) -> io::Result> { + fn delta_reader(&self) -> io::Result> { let key_range = ( bound_as_byte_slice(&self.lower), bound_as_byte_slice(&self.upper), @@ -89,7 +89,7 @@ where .sstable_delta_reader_for_key_range(key_range, self.limit) } - async fn delta_reader_async(&self) -> io::Result> { + async fn delta_reader_async(&self) -> io::Result> { let key_range = ( bound_as_byte_slice(&self.lower), bound_as_byte_slice(&self.upper), @@ -101,7 +101,7 @@ where fn into_stream_given_delta_reader( self, - delta_reader: DeltaReader<'a, ::ValueReader>, + delta_reader: DeltaReader<::ValueReader>, ) -> io::Result> { let start_state = self.automaton.start(); let start_key = bound_as_byte_slice(&self.lower); @@ -124,6 +124,7 @@ where term_ord: first_term.checked_sub(1), lower_bound: self.lower, upper_bound: self.upper, + _lifetime: std::marker::PhantomData, }) } @@ -151,11 +152,13 @@ where { automaton: A, states: Vec, - delta_reader: crate::DeltaReader<'a, TSSTable::ValueReader>, + delta_reader: crate::DeltaReader, key: Vec, term_ord: Option, lower_bound: Bound>, upper_bound: Bound>, + // this field is used to please the type-interface of a dictionary in tantivy + _lifetime: std::marker::PhantomData<&'a ()>, } impl<'a, TSSTable> Streamer<'a, TSSTable, AlwaysMatch> @@ -170,6 +173,7 @@ where TSSTable: SSTable term_ord: None, lower_bound: Bound::Unbounded, upper_bound: Bound::Unbounded, + _lifetime: std::marker::PhantomData, } } }