sstable compression (#1946)

* compress sstable with zstd

* add some details to sstable readme

* compress only block which benefit from it

* multiple changes to sstable

make compression optional
use OwnedBytes instead of impl Read in sstable, required for next point
use zstd bulk api, which is much faster on small records

* cleanup and use bulk api for compression

* use dedicated byte for compression

* switch block len and compression flag

* change default zstd level in sstable
This commit is contained in:
trinity-1686a
2023-04-14 16:25:50 +02:00
committed by GitHub
parent 0286ecea09
commit 780e26331d
12 changed files with 194 additions and 112 deletions

View File

@@ -26,7 +26,7 @@ fn test_dataframe_writer_str() {
assert_eq!(columnar.num_columns(), 1); assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap(); let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1); assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 85); assert_eq!(cols[0].num_bytes(), 87);
} }
#[test] #[test]
@@ -40,7 +40,7 @@ fn test_dataframe_writer_bytes() {
assert_eq!(columnar.num_columns(), 1); assert_eq!(columnar.num_columns(), 1);
let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap(); let cols: Vec<DynamicColumnHandle> = columnar.read_columns("my_string").unwrap();
assert_eq!(cols.len(), 1); assert_eq!(cols.len(), 1);
assert_eq!(cols[0].num_bytes(), 85); assert_eq!(cols[0].num_bytes(), 87);
} }
#[test] #[test]

View File

@@ -139,6 +139,16 @@ impl OwnedBytes {
self.advance(8); self.advance(8);
u64::from_le_bytes(octlet) u64::from_le_bytes(octlet)
} }
/// Reads an `u32` encoded as little-endian from the `OwnedBytes` and advance by 4 bytes.
#[inline]
pub fn read_u32(&mut self) -> u32 {
assert!(self.len() > 3);
let quad: [u8; 4] = self.as_slice()[..4].try_into().unwrap();
self.advance(4);
u32::from_le_bytes(quad)
}
} }
impl fmt::Debug for OwnedBytes { impl fmt::Debug for OwnedBytes {

View File

@@ -130,7 +130,7 @@ mod tests {
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 91); assert_eq!(file.len(), 93);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let column = fast_field_readers let column = fast_field_readers
.u64("field") .u64("field")
@@ -180,7 +180,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 119); assert_eq!(file.len(), 121);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers let col = fast_field_readers
.u64("field") .u64("field")
@@ -213,7 +213,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 92); assert_eq!(file.len(), 94);
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let fast_field_reader = fast_field_readers let fast_field_reader = fast_field_readers
.u64("field") .u64("field")
@@ -245,7 +245,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 4487); assert_eq!(file.len(), 4489);
{ {
let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap(); let fast_field_readers = FastFieldReaders::open(file, SCHEMA.clone()).unwrap();
let col = fast_field_readers let col = fast_field_readers
@@ -278,7 +278,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 263); assert_eq!(file.len(), 265);
{ {
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
@@ -772,7 +772,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 100); assert_eq!(file.len(), 102);
let fast_field_readers = FastFieldReaders::open(file, schema).unwrap(); let fast_field_readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = fast_field_readers.bool("field_bool").unwrap(); let bool_col = fast_field_readers.bool("field_bool").unwrap();
assert_eq!(bool_col.first(0), Some(true)); assert_eq!(bool_col.first(0), Some(true));
@@ -804,7 +804,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 112); assert_eq!(file.len(), 114);
let readers = FastFieldReaders::open(file, schema).unwrap(); let readers = FastFieldReaders::open(file, schema).unwrap();
let bool_col = readers.bool("field_bool").unwrap(); let bool_col = readers.bool("field_bool").unwrap();
for i in 0..25 { for i in 0..25 {
@@ -829,7 +829,7 @@ mod tests {
write.terminate().unwrap(); write.terminate().unwrap();
} }
let file = directory.open_read(path).unwrap(); let file = directory.open_read(path).unwrap();
assert_eq!(file.len(), 102); assert_eq!(file.len(), 104);
let fastfield_readers = FastFieldReaders::open(file, schema).unwrap(); let fastfield_readers = FastFieldReaders::open(file, schema).unwrap();
let col = fastfield_readers.bool("field_bool").unwrap(); let col = fastfield_readers.bool("field_bool").unwrap();
assert_eq!(col.first(0), None); assert_eq!(col.first(0), None);

View File

@@ -7,6 +7,8 @@ license = "MIT"
[dependencies] [dependencies]
common = {path="../common", package="tantivy-common"} common = {path="../common", package="tantivy-common"}
tantivy-fst = "0.4" tantivy-fst = "0.4"
# experimental gives us access to Decompressor::upper_bound
zstd = { version = "0.12", features = ["experimental"] }
[dev-dependencies] [dev-dependencies]
proptest = "1" proptest = "1"

View File

@@ -43,12 +43,14 @@ Overview of the SSTable format. Unless noted otherwise, numbers are little-endia
### SSTBlock ### SSTBlock
``` ```
+----------+--------+-------+-------+-----+ +----------+----------+--------+-------+-------+-----+
| BlockLen | Values | Delta | Delta | ... | | BlockLen | Compress | Values | Delta | Delta | ... |
+----------+--------+-------+-------+-----+ +----------+----------+--------+-------+-------+-----+
|----( # of deltas)---| | |----( # of deltas)---|
|------(maybe compressed)------|
``` ```
- BlockLen(u32): length of the block - BlockLen(u32): length of the block, including the compress byte.
- Compress(u8): indicate whether block is compressed. 0 if not compressed, 1 if compressed.
- Values: an application defined format storing a sequence of value, capable of determining it own length - Values: an application defined format storing a sequence of value, capable of determining it own length
- Delta - Delta
@@ -83,7 +85,7 @@ Otherwise:
- Keep(VInt): number of bytes to pop - Keep(VInt): number of bytes to pop
Note: there is no ambiguity between both representation as Add is always guarantee to be non-zero, except for the very first key of an SSTable, where Keep is guaranteed to be zero. Note: as the SSTable does not support redundant keys, there is no ambiguity between both representation. Add is always guaranteed to be non-zero, except for the very first key of an SSTable, where Keep is guaranteed to be zero.
### SSTFooter ### SSTFooter
``` ```
@@ -95,7 +97,7 @@ Note: there is no ambiguity between both representation as Add is always guarant
- Block(SSTBlock): uses IndexValue for its Values format - Block(SSTBlock): uses IndexValue for its Values format
- IndexOffset(u64): Offset to the start of the SSTFooter - IndexOffset(u64): Offset to the start of the SSTFooter
- NumTerm(u64): number of terms in the sstable - NumTerm(u64): number of terms in the sstable
- Version(u32): Currently defined to 0x00\_00\_00\_01 - Version(u32): Currently equal to 2
### IndexValue ### IndexValue
``` ```

View File

@@ -1,21 +1,17 @@
use std::io; use std::io::{self, Read};
use std::ops::Range; use std::ops::Range;
pub struct BlockReader<'a> { use common::OwnedBytes;
use zstd::bulk::Decompressor;
pub struct BlockReader {
buffer: Vec<u8>, buffer: Vec<u8>,
reader: Box<dyn io::Read + 'a>, reader: OwnedBytes,
offset: usize, offset: usize,
} }
#[inline] impl BlockReader {
fn read_u32(read: &mut dyn io::Read) -> io::Result<u32> { pub fn new(reader: OwnedBytes) -> BlockReader {
let mut buf = [0u8; 4];
read.read_exact(&mut buf)?;
Ok(u32::from_le_bytes(buf))
}
impl<'a> BlockReader<'a> {
pub fn new(reader: Box<dyn io::Read + 'a>) -> BlockReader<'a> {
BlockReader { BlockReader {
buffer: Vec::new(), buffer: Vec::new(),
reader, reader,
@@ -36,19 +32,43 @@ impl<'a> BlockReader<'a> {
pub fn read_block(&mut self) -> io::Result<bool> { pub fn read_block(&mut self) -> io::Result<bool> {
self.offset = 0; self.offset = 0;
let block_len_res = read_u32(self.reader.as_mut()); self.buffer.clear();
if let Err(err) = &block_len_res {
if err.kind() == io::ErrorKind::UnexpectedEof { let block_len = match self.reader.len() {
return Ok(false); 0 => return Ok(false),
1..=3 => {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to read block_len",
))
} }
} _ => self.reader.read_u32() as usize,
let block_len = block_len_res?; };
if block_len == 0u32 { if block_len <= 1 {
self.buffer.clear();
return Ok(false); return Ok(false);
} }
self.buffer.resize(block_len as usize, 0u8); let compress = self.reader.read_u8();
self.reader.read_exact(&mut self.buffer[..])?; let block_len = block_len - 1;
if self.reader.len() < block_len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"failed to read block content",
));
}
if compress == 1 {
let required_capacity =
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
self.buffer.reserve(required_capacity);
Decompressor::new()?
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
self.reader.advance(block_len);
} else {
self.buffer.resize(block_len, 0u8);
self.reader.read_exact(&mut self.buffer[..])?;
}
Ok(true) Ok(true)
} }
@@ -68,7 +88,7 @@ impl<'a> BlockReader<'a> {
} }
} }
impl<'a> io::Read for BlockReader<'a> { impl io::Read for BlockReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.buffer().read(buf)?; let len = self.buffer().read(buf)?;
self.advance(len); self.advance(len);

View File

@@ -1,14 +1,15 @@
use std::io::{self, BufWriter, Write}; use std::io::{self, BufWriter, Write};
use std::ops::Range; use std::ops::Range;
use common::CountingWriter; use common::{CountingWriter, OwnedBytes};
use zstd::bulk::Compressor;
use super::value::ValueWriter; use super::value::ValueWriter;
use super::{value, vint, BlockReader}; use super::{value, vint, BlockReader};
const FOUR_BIT_LIMITS: usize = 1 << 4; const FOUR_BIT_LIMITS: usize = 1 << 4;
const VINT_MODE: u8 = 1u8; const VINT_MODE: u8 = 1u8;
const BLOCK_LEN: usize = 32_000; const BLOCK_LEN: usize = 4_000;
pub struct DeltaWriter<W, TValueWriter> pub struct DeltaWriter<W, TValueWriter>
where W: io::Write where W: io::Write
@@ -45,13 +46,41 @@ where
return Ok(None); return Ok(None);
} }
let start_offset = self.write.written_bytes() as usize; let start_offset = self.write.written_bytes() as usize;
let buffer: &mut Vec<u8> = &mut self.stateless_buffer; let buffer: &mut Vec<u8> = &mut self.stateless_buffer;
self.value_writer.serialize_block(buffer); self.value_writer.serialize_block(buffer);
self.value_writer.clear(); self.value_writer.clear();
let block_len = buffer.len() + self.block.len(); let block_len = buffer.len() + self.block.len();
self.write.write_all(&(block_len as u32).to_le_bytes())?;
self.write.write_all(&buffer[..])?; if block_len > 2048 {
self.write.write_all(&self.block[..])?; buffer.extend_from_slice(&self.block);
self.block.clear();
let max_len = zstd::zstd_safe::compress_bound(buffer.len());
self.block.reserve(max_len);
Compressor::new(3)?.compress_to_buffer(buffer, &mut self.block)?;
// verify compression had a positive impact
if self.block.len() < buffer.len() {
self.write
.write_all(&(self.block.len() as u32 + 1).to_le_bytes())?;
self.write.write_all(&[1])?;
self.write.write_all(&self.block[..])?;
} else {
self.write
.write_all(&(block_len as u32 + 1).to_le_bytes())?;
self.write.write_all(&[0])?;
self.write.write_all(&buffer[..])?;
}
} else {
self.write
.write_all(&(block_len as u32 + 1).to_le_bytes())?;
self.write.write_all(&[0])?;
self.write.write_all(&buffer[..])?;
self.write.write_all(&self.block[..])?;
}
let end_offset = self.write.written_bytes() as usize; let end_offset = self.write.written_bytes() as usize;
self.block.clear(); self.block.clear();
buffer.clear(); buffer.clear();
@@ -93,29 +122,29 @@ where
} }
} }
pub struct DeltaReader<'a, TValueReader> { pub struct DeltaReader<TValueReader> {
common_prefix_len: usize, common_prefix_len: usize,
suffix_range: Range<usize>, suffix_range: Range<usize>,
value_reader: TValueReader, value_reader: TValueReader,
block_reader: BlockReader<'a>, block_reader: BlockReader,
idx: usize, idx: usize,
} }
impl<'a, TValueReader> DeltaReader<'a, TValueReader> impl<TValueReader> DeltaReader<TValueReader>
where TValueReader: value::ValueReader where TValueReader: value::ValueReader
{ {
pub fn new<R: io::Read + 'a>(reader: R) -> Self { pub fn new(reader: OwnedBytes) -> Self {
DeltaReader { DeltaReader {
idx: 0, idx: 0,
common_prefix_len: 0, common_prefix_len: 0,
suffix_range: 0..0, suffix_range: 0..0,
value_reader: TValueReader::default(), value_reader: TValueReader::default(),
block_reader: BlockReader::new(Box::new(reader)), block_reader: BlockReader::new(reader),
} }
} }
pub fn empty() -> Self { pub fn empty() -> Self {
DeltaReader::new(&b""[..]) DeltaReader::new(OwnedBytes::empty())
} }
fn deserialize_vint(&mut self) -> u64 { fn deserialize_vint(&mut self) -> u64 {

View File

@@ -61,7 +61,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
pub(crate) fn sstable_reader_block( pub(crate) fn sstable_reader_block(
&self, &self,
block_addr: BlockAddr, block_addr: BlockAddr,
) -> io::Result<Reader<'static, TSSTable::ValueReader>> { ) -> io::Result<Reader<TSSTable::ValueReader>> {
let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?; let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?;
Ok(TSSTable::reader(data)) Ok(TSSTable::reader(data))
} }
@@ -69,7 +69,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
pub(crate) async fn sstable_reader_block_async( pub(crate) async fn sstable_reader_block_async(
&self, &self,
block_addr: BlockAddr, block_addr: BlockAddr,
) -> io::Result<Reader<'static, TSSTable::ValueReader>> { ) -> io::Result<Reader<TSSTable::ValueReader>> {
let data = self let data = self
.sstable_slice .sstable_slice
.read_bytes_slice_async(block_addr.byte_range) .read_bytes_slice_async(block_addr.byte_range)
@@ -81,7 +81,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
&self, &self,
key_range: impl RangeBounds<[u8]>, key_range: impl RangeBounds<[u8]>,
limit: Option<u64>, limit: Option<u64>,
) -> io::Result<DeltaReader<'static, TSSTable::ValueReader>> { ) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let slice = self.file_slice_for_range(key_range, limit); let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes_async().await?; let data = slice.read_bytes_async().await?;
Ok(TSSTable::delta_reader(data)) Ok(TSSTable::delta_reader(data))
@@ -91,7 +91,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
&self, &self,
key_range: impl RangeBounds<[u8]>, key_range: impl RangeBounds<[u8]>,
limit: Option<u64>, limit: Option<u64>,
) -> io::Result<DeltaReader<'static, TSSTable::ValueReader>> { ) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let slice = self.file_slice_for_range(key_range, limit); let slice = self.file_slice_for_range(key_range, limit);
let data = slice.read_bytes()?; let data = slice.read_bytes()?;
Ok(TSSTable::delta_reader(data)) Ok(TSSTable::delta_reader(data))
@@ -100,7 +100,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
pub(crate) fn sstable_delta_reader_block( pub(crate) fn sstable_delta_reader_block(
&self, &self,
block_addr: BlockAddr, block_addr: BlockAddr,
) -> io::Result<DeltaReader<'static, TSSTable::ValueReader>> { ) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?; let data = self.sstable_slice.read_bytes_slice(block_addr.byte_range)?;
Ok(TSSTable::delta_reader(data)) Ok(TSSTable::delta_reader(data))
} }
@@ -197,7 +197,7 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
let (sstable_slice, index_slice) = main_slice.split(index_offset as usize); let (sstable_slice, index_slice) = main_slice.split(index_offset as usize);
let sstable_index_bytes = index_slice.read_bytes()?; let sstable_index_bytes = index_slice.read_bytes()?;
let sstable_index = SSTableIndex::load(sstable_index_bytes.as_slice()) let sstable_index = SSTableIndex::load(sstable_index_bytes)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?; .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "SSTable corruption"))?;
Ok(Dictionary { Ok(Dictionary {
sstable_slice, sstable_slice,
@@ -351,12 +351,12 @@ impl<TSSTable: SSTable> Dictionary<TSSTable> {
/// Returns a range builder, to stream all of the terms /// Returns a range builder, to stream all of the terms
/// within an interval. /// within an interval.
pub fn range(&self) -> StreamerBuilder<'_, TSSTable> { pub fn range(&self) -> StreamerBuilder<TSSTable> {
StreamerBuilder::new(self, AlwaysMatch) StreamerBuilder::new(self, AlwaysMatch)
} }
/// A stream of all the sorted terms. /// A stream of all the sorted terms.
pub fn stream(&self) -> io::Result<Streamer<'_, TSSTable>> { pub fn stream(&self) -> io::Result<Streamer<TSSTable>> {
self.range().into_stream() self.range().into_stream()
} }

View File

@@ -17,7 +17,7 @@ pub use dictionary::Dictionary;
pub use streamer::{Streamer, StreamerBuilder}; pub use streamer::{Streamer, StreamerBuilder};
mod block_reader; mod block_reader;
use common::BinarySerializable; use common::{BinarySerializable, OwnedBytes};
pub use self::block_reader::BlockReader; pub use self::block_reader::BlockReader;
pub use self::delta::{DeltaReader, DeltaWriter}; pub use self::delta::{DeltaReader, DeltaWriter};
@@ -28,7 +28,7 @@ use crate::value::{RangeValueReader, RangeValueWriter};
pub type TermOrdinal = u64; pub type TermOrdinal = u64;
const DEFAULT_KEY_CAPACITY: usize = 50; const DEFAULT_KEY_CAPACITY: usize = 50;
const SSTABLE_VERSION: u32 = 1; const SSTABLE_VERSION: u32 = 2;
/// Given two byte string returns the length of /// Given two byte string returns the length of
/// the longest common prefix. /// the longest common prefix.
@@ -58,11 +58,11 @@ pub trait SSTable: Sized {
Writer::new(wrt) Writer::new(wrt)
} }
fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::ValueReader> { fn delta_reader(reader: OwnedBytes) -> DeltaReader<Self::ValueReader> {
DeltaReader::new(reader) DeltaReader::new(reader)
} }
fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::ValueReader> { fn reader(reader: OwnedBytes) -> Reader<Self::ValueReader> {
Reader { Reader {
key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
delta_reader: Self::delta_reader(reader), delta_reader: Self::delta_reader(reader),
@@ -70,12 +70,12 @@ pub trait SSTable: Sized {
} }
/// Returns an empty static reader. /// Returns an empty static reader.
fn create_empty_reader() -> Reader<'static, Self::ValueReader> { fn create_empty_reader() -> Reader<Self::ValueReader> {
Self::reader(&b""[..]) Self::reader(OwnedBytes::empty())
} }
fn merge<R: io::Read, W: io::Write, M: ValueMerger<Self::Value>>( fn merge<W: io::Write, M: ValueMerger<Self::Value>>(
io_readers: Vec<R>, io_readers: Vec<OwnedBytes>,
w: W, w: W,
merger: M, merger: M,
) -> io::Result<()> { ) -> io::Result<()> {
@@ -132,12 +132,12 @@ impl SSTable for RangeSSTable {
} }
/// SSTable reader. /// SSTable reader.
pub struct Reader<'a, TValueReader> { pub struct Reader<TValueReader> {
key: Vec<u8>, key: Vec<u8>,
delta_reader: DeltaReader<'a, TValueReader>, delta_reader: DeltaReader<TValueReader>,
} }
impl<'a, TValueReader> Reader<'a, TValueReader> impl<TValueReader> Reader<TValueReader>
where TValueReader: ValueReader where TValueReader: ValueReader
{ {
pub fn advance(&mut self) -> io::Result<bool> { pub fn advance(&mut self) -> io::Result<bool> {
@@ -163,7 +163,7 @@ where TValueReader: ValueReader
} }
} }
impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> { impl<TValueReader> AsRef<[u8]> for Reader<TValueReader> {
#[inline(always)] #[inline(always)]
fn as_ref(&self) -> &[u8] { fn as_ref(&self) -> &[u8] {
&self.key &self.key
@@ -320,6 +320,8 @@ mod test {
use std::io; use std::io;
use std::ops::Bound; use std::ops::Bound;
use common::OwnedBytes;
use super::{common_prefix_len, MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable}; use super::{common_prefix_len, MonotonicU64SSTable, SSTable, VoidMerge, VoidSSTable};
fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) { fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
@@ -353,7 +355,8 @@ mod test {
assert!(sstable_writer.insert(&long_key2[..], &()).is_ok()); assert!(sstable_writer.insert(&long_key2[..], &()).is_ok());
assert!(sstable_writer.finish().is_ok()); assert!(sstable_writer.finish().is_ok());
} }
let mut sstable_reader = VoidSSTable::reader(&buffer[..]); let buffer = OwnedBytes::new(buffer);
let mut sstable_reader = VoidSSTable::reader(buffer);
assert!(sstable_reader.advance().unwrap()); assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &long_key[..]); assert_eq!(sstable_reader.key(), &long_key[..]);
assert!(sstable_reader.advance().unwrap()); assert!(sstable_reader.advance().unwrap());
@@ -377,27 +380,22 @@ mod test {
&buffer, &buffer,
&[ &[
// block // block
7u8, 0u8, 0u8, 0u8, // block len 8, 0, 0, 0, // size of block
16u8, 17u8, // keep 0 push 1 | 17 0, // compression
33u8, 18u8, 19u8, // keep 1 push 2 | 18 19 16, 17, 33, 18, 19, 17, 20, // data block
17u8, 20u8, // keep 1 push 1 | 20 0, 0, 0, 0, // no more block
// end of block
0u8, 0u8, 0u8, 0u8, // no more blocks
// index // index
7u8, 0u8, 0u8, 0u8, // block len 8, 0, 0, 0, // size of index block
1, // num blocks 0, // compression
0, // offset 1, 0, 12, 0, 32, 17, 20, // index block
11, // len of 1st block 0, 0, 0, 0, // no more index block
0, // first ord of 1st block 16, 0, 0, 0, 0, 0, 0, 0, // index start offset
32, 17, 20, // keep 0 push 2 | 17 20 3, 0, 0, 0, 0, 0, 0, 0, // num term
// end of block 2, 0, 0, 0, // version
0, 0, 0, 0, // no more blocks
15, 0, 0, 0, 0, 0, 0, 0, // index start offset
3, 0, 0, 0, 0, 0, 0, 0, // num_term
1, 0, 0, 0, // version
] ]
); );
let mut sstable_reader = VoidSSTable::reader(&buffer[..]); let buffer = OwnedBytes::new(buffer);
let mut sstable_reader = VoidSSTable::reader(buffer);
assert!(sstable_reader.advance().unwrap()); assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &[17u8]); assert_eq!(sstable_reader.key(), &[17u8]);
assert!(sstable_reader.advance().unwrap()); assert!(sstable_reader.advance().unwrap());
@@ -425,8 +423,12 @@ mod test {
writer.insert(b"abe", &()).unwrap(); writer.insert(b"abe", &()).unwrap();
writer.finish().unwrap(); writer.finish().unwrap();
} }
let buffer = OwnedBytes::new(buffer);
let mut output = Vec::new(); let mut output = Vec::new();
assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok()); assert!(
VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
.is_ok()
);
assert_eq!(&output[..], &buffer[..]); assert_eq!(&output[..], &buffer[..]);
} }
@@ -442,8 +444,12 @@ mod test {
assert_eq!(writer.last_inserted_key(), b"abe"); assert_eq!(writer.last_inserted_key(), b"abe");
writer.finish().unwrap(); writer.finish().unwrap();
} }
let buffer = OwnedBytes::new(buffer);
let mut output = Vec::new(); let mut output = Vec::new();
assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok()); assert!(
VoidSSTable::merge(vec![buffer.clone(), buffer.clone()], &mut output, VoidMerge)
.is_ok()
);
assert_eq!(&output[..], &buffer[..]); assert_eq!(&output[..], &buffer[..]);
} }
@@ -455,7 +461,8 @@ mod test {
writer.insert(b"abe", &4u64)?; writer.insert(b"abe", &4u64)?;
writer.insert(b"gogo", &4324234234234234u64)?; writer.insert(b"gogo", &4324234234234234u64)?;
writer.finish()?; writer.finish()?;
let mut reader = MonotonicU64SSTable::reader(&buffer[..]); let buffer = OwnedBytes::new(buffer);
let mut reader = MonotonicU64SSTable::reader(buffer);
assert!(reader.advance()?); assert!(reader.advance()?);
assert_eq!(reader.key(), b"abcd"); assert_eq!(reader.key(), b"abcd");
assert_eq!(reader.value(), &1u64); assert_eq!(reader.value(), &1u64);

View File

@@ -71,10 +71,12 @@ mod tests {
use std::collections::{BTreeMap, BTreeSet}; use std::collections::{BTreeMap, BTreeSet};
use std::str; use std::str;
use common::OwnedBytes;
use super::super::{MonotonicU64SSTable, SSTable, VoidSSTable}; use super::super::{MonotonicU64SSTable, SSTable, VoidSSTable};
use super::{U64Merge, VoidMerge}; use super::{U64Merge, VoidMerge};
fn write_sstable(keys: &[&'static str]) -> Vec<u8> { fn write_sstable(keys: &[&'static str]) -> OwnedBytes {
let mut buffer: Vec<u8> = vec![]; let mut buffer: Vec<u8> = vec![];
{ {
let mut sstable_writer = VoidSSTable::writer(&mut buffer); let mut sstable_writer = VoidSSTable::writer(&mut buffer);
@@ -83,10 +85,10 @@ mod tests {
} }
assert!(sstable_writer.finish().is_ok()); assert!(sstable_writer.finish().is_ok());
} }
buffer OwnedBytes::new(buffer)
} }
fn write_sstable_u64(keys: &[(&'static str, u64)]) -> Vec<u8> { fn write_sstable_u64(keys: &[(&'static str, u64)]) -> OwnedBytes {
let mut buffer: Vec<u8> = vec![]; let mut buffer: Vec<u8> = vec![];
{ {
let mut sstable_writer = MonotonicU64SSTable::writer(&mut buffer); let mut sstable_writer = MonotonicU64SSTable::writer(&mut buffer);
@@ -95,12 +97,11 @@ mod tests {
} }
assert!(sstable_writer.finish().is_ok()); assert!(sstable_writer.finish().is_ok());
} }
buffer OwnedBytes::new(buffer)
} }
fn merge_test_aux(arrs: &[&[&'static str]]) { fn merge_test_aux(arrs: &[&[&'static str]]) {
let sstables = arrs.iter().cloned().map(write_sstable).collect::<Vec<_>>(); let sstables = arrs.iter().cloned().map(write_sstable).collect::<Vec<_>>();
let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect();
let mut merged = BTreeSet::new(); let mut merged = BTreeSet::new();
for &arr in arrs.iter() { for &arr in arrs.iter() {
for &s in arr { for &s in arr {
@@ -108,8 +109,9 @@ mod tests {
} }
} }
let mut w = Vec::new(); let mut w = Vec::new();
assert!(VoidSSTable::merge(sstables_ref, &mut w, VoidMerge).is_ok()); assert!(VoidSSTable::merge(sstables, &mut w, VoidMerge).is_ok());
let mut reader = VoidSSTable::reader(&w[..]); let w = OwnedBytes::new(w);
let mut reader = VoidSSTable::reader(w);
for k in merged { for k in merged {
assert!(reader.advance().unwrap()); assert!(reader.advance().unwrap());
assert_eq!(reader.key(), k.as_bytes()); assert_eq!(reader.key(), k.as_bytes());
@@ -123,7 +125,6 @@ mod tests {
.cloned() .cloned()
.map(write_sstable_u64) .map(write_sstable_u64)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let sstables_ref: Vec<&[u8]> = sstables.iter().map(|s| s.as_ref()).collect();
let mut merged = BTreeMap::new(); let mut merged = BTreeMap::new();
for &arr in arrs.iter() { for &arr in arrs.iter() {
for (key, val) in arr { for (key, val) in arr {
@@ -132,8 +133,9 @@ mod tests {
} }
} }
let mut w = Vec::new(); let mut w = Vec::new();
assert!(MonotonicU64SSTable::merge(sstables_ref, &mut w, U64Merge).is_ok()); assert!(MonotonicU64SSTable::merge(sstables, &mut w, U64Merge).is_ok());
let mut reader = MonotonicU64SSTable::reader(&w[..]); let w = OwnedBytes::new(w);
let mut reader = MonotonicU64SSTable::reader(w);
for (k, v) in merged { for (k, v) in merged {
assert!(reader.advance().unwrap()); assert!(reader.advance().unwrap());
assert_eq!(reader.key(), k.as_bytes()); assert_eq!(reader.key(), k.as_bytes());
@@ -145,7 +147,7 @@ mod tests {
#[test] #[test]
fn test_merge_simple_reproduce() { fn test_merge_simple_reproduce() {
let sstable_data = write_sstable(&["a"]); let sstable_data = write_sstable(&["a"]);
let mut reader = VoidSSTable::reader(&sstable_data[..]); let mut reader = VoidSSTable::reader(sstable_data);
assert!(reader.advance().unwrap()); assert!(reader.advance().unwrap());
assert_eq!(reader.key(), b"a"); assert_eq!(reader.key(), b"a");
assert!(!reader.advance().unwrap()); assert!(!reader.advance().unwrap());

View File

@@ -1,6 +1,8 @@
use std::io::{self, Write}; use std::io::{self, Write};
use std::ops::Range; use std::ops::Range;
use common::OwnedBytes;
use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal}; use crate::{common_prefix_len, SSTable, SSTableDataCorruption, TermOrdinal};
#[derive(Default, Debug, Clone)] #[derive(Default, Debug, Clone)]
@@ -10,7 +12,7 @@ pub struct SSTableIndex {
impl SSTableIndex { impl SSTableIndex {
/// Load an index from its binary representation /// Load an index from its binary representation
pub fn load(data: &[u8]) -> Result<SSTableIndex, SSTableDataCorruption> { pub fn load(data: OwnedBytes) -> Result<SSTableIndex, SSTableDataCorruption> {
let mut reader = IndexSSTable::reader(data); let mut reader = IndexSSTable::reader(data);
let mut blocks = Vec::new(); let mut blocks = Vec::new();
@@ -179,6 +181,8 @@ impl SSTable for IndexSSTable {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use common::OwnedBytes;
use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder}; use super::{BlockAddr, SSTableIndex, SSTableIndexBuilder};
use crate::SSTableDataCorruption; use crate::SSTableDataCorruption;
@@ -191,7 +195,8 @@ mod tests {
sstable_builder.add_block(b"dddd", 40..50, 15u64); sstable_builder.add_block(b"dddd", 40..50, 15u64);
let mut buffer: Vec<u8> = Vec::new(); let mut buffer: Vec<u8> = Vec::new();
sstable_builder.serialize(&mut buffer).unwrap(); sstable_builder.serialize(&mut buffer).unwrap();
let sstable_index = SSTableIndex::load(&buffer[..]).unwrap(); let buffer = OwnedBytes::new(buffer);
let sstable_index = SSTableIndex::load(buffer).unwrap();
assert_eq!( assert_eq!(
sstable_index.get_block_with_key(b"bbbde"), sstable_index.get_block_with_key(b"bbbde"),
Some(BlockAddr { Some(BlockAddr {
@@ -222,8 +227,9 @@ mod tests {
sstable_builder.add_block(b"dddd", 40..50, 15u64); sstable_builder.add_block(b"dddd", 40..50, 15u64);
let mut buffer: Vec<u8> = Vec::new(); let mut buffer: Vec<u8> = Vec::new();
sstable_builder.serialize(&mut buffer).unwrap(); sstable_builder.serialize(&mut buffer).unwrap();
buffer[1] = 9u8; buffer[2] = 9u8;
let data_corruption_err = SSTableIndex::load(&buffer[..]).err().unwrap(); let buffer = OwnedBytes::new(buffer);
let data_corruption_err = SSTableIndex::load(buffer).err().unwrap();
assert!(matches!(data_corruption_err, SSTableDataCorruption)); assert!(matches!(data_corruption_err, SSTableDataCorruption));
} }

View File

@@ -80,7 +80,7 @@ where
self self
} }
fn delta_reader(&self) -> io::Result<DeltaReader<'a, TSSTable::ValueReader>> { fn delta_reader(&self) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let key_range = ( let key_range = (
bound_as_byte_slice(&self.lower), bound_as_byte_slice(&self.lower),
bound_as_byte_slice(&self.upper), bound_as_byte_slice(&self.upper),
@@ -89,7 +89,7 @@ where
.sstable_delta_reader_for_key_range(key_range, self.limit) .sstable_delta_reader_for_key_range(key_range, self.limit)
} }
async fn delta_reader_async(&self) -> io::Result<DeltaReader<'a, TSSTable::ValueReader>> { async fn delta_reader_async(&self) -> io::Result<DeltaReader<TSSTable::ValueReader>> {
let key_range = ( let key_range = (
bound_as_byte_slice(&self.lower), bound_as_byte_slice(&self.lower),
bound_as_byte_slice(&self.upper), bound_as_byte_slice(&self.upper),
@@ -101,7 +101,7 @@ where
fn into_stream_given_delta_reader( fn into_stream_given_delta_reader(
self, self,
delta_reader: DeltaReader<'a, <TSSTable as SSTable>::ValueReader>, delta_reader: DeltaReader<<TSSTable as SSTable>::ValueReader>,
) -> io::Result<Streamer<'a, TSSTable, A>> { ) -> io::Result<Streamer<'a, TSSTable, A>> {
let start_state = self.automaton.start(); let start_state = self.automaton.start();
let start_key = bound_as_byte_slice(&self.lower); let start_key = bound_as_byte_slice(&self.lower);
@@ -124,6 +124,7 @@ where
term_ord: first_term.checked_sub(1), term_ord: first_term.checked_sub(1),
lower_bound: self.lower, lower_bound: self.lower,
upper_bound: self.upper, upper_bound: self.upper,
_lifetime: std::marker::PhantomData,
}) })
} }
@@ -151,11 +152,13 @@ where
{ {
automaton: A, automaton: A,
states: Vec<A::State>, states: Vec<A::State>,
delta_reader: crate::DeltaReader<'a, TSSTable::ValueReader>, delta_reader: crate::DeltaReader<TSSTable::ValueReader>,
key: Vec<u8>, key: Vec<u8>,
term_ord: Option<TermOrdinal>, term_ord: Option<TermOrdinal>,
lower_bound: Bound<Vec<u8>>, lower_bound: Bound<Vec<u8>>,
upper_bound: Bound<Vec<u8>>, upper_bound: Bound<Vec<u8>>,
// this field is used to please the type-interface of a dictionary in tantivy
_lifetime: std::marker::PhantomData<&'a ()>,
} }
impl<'a, TSSTable> Streamer<'a, TSSTable, AlwaysMatch> impl<'a, TSSTable> Streamer<'a, TSSTable, AlwaysMatch>
@@ -170,6 +173,7 @@ where TSSTable: SSTable
term_ord: None, term_ord: None,
lower_bound: Bound::Unbounded, lower_bound: Bound::Unbounded,
upper_bound: Bound::Unbounded, upper_bound: Bound::Unbounded,
_lifetime: std::marker::PhantomData,
} }
} }
} }