mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 01:02:55 +00:00
* compress sstable with zstd * add some details to sstable readme * compress only block which benefit from it * multiple changes to sstable make compression optional use OwnedBytes instead of impl Read in sstable, required for next point use zstd bulk api, which is much faster on small records * cleanup and use bulk api for compression * use dedicated byte for compression * switch block len and compression flag * change default zstd level in sstable
111 lines
2.8 KiB
Rust
111 lines
2.8 KiB
Rust
use std::io::{self, Read};
|
|
use std::ops::Range;
|
|
|
|
use common::OwnedBytes;
|
|
use zstd::bulk::Decompressor;
|
|
|
|
pub struct BlockReader {
|
|
buffer: Vec<u8>,
|
|
reader: OwnedBytes,
|
|
offset: usize,
|
|
}
|
|
|
|
impl BlockReader {
|
|
pub fn new(reader: OwnedBytes) -> BlockReader {
|
|
BlockReader {
|
|
buffer: Vec::new(),
|
|
reader,
|
|
offset: 0,
|
|
}
|
|
}
|
|
|
|
pub fn deserialize_u64(&mut self) -> u64 {
|
|
let (num_bytes, val) = super::vint::deserialize_read(self.buffer());
|
|
self.advance(num_bytes);
|
|
val
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub fn buffer_from_to(&self, range: Range<usize>) -> &[u8] {
|
|
&self.buffer[range]
|
|
}
|
|
|
|
pub fn read_block(&mut self) -> io::Result<bool> {
|
|
self.offset = 0;
|
|
self.buffer.clear();
|
|
|
|
let block_len = match self.reader.len() {
|
|
0 => return Ok(false),
|
|
1..=3 => {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::UnexpectedEof,
|
|
"failed to read block_len",
|
|
))
|
|
}
|
|
_ => self.reader.read_u32() as usize,
|
|
};
|
|
if block_len <= 1 {
|
|
return Ok(false);
|
|
}
|
|
let compress = self.reader.read_u8();
|
|
let block_len = block_len - 1;
|
|
|
|
if self.reader.len() < block_len {
|
|
return Err(io::Error::new(
|
|
io::ErrorKind::UnexpectedEof,
|
|
"failed to read block content",
|
|
));
|
|
}
|
|
if compress == 1 {
|
|
let required_capacity =
|
|
Decompressor::upper_bound(&self.reader[..block_len]).unwrap_or(1024 * 1024);
|
|
self.buffer.reserve(required_capacity);
|
|
Decompressor::new()?
|
|
.decompress_to_buffer(&self.reader[..block_len], &mut self.buffer)?;
|
|
|
|
self.reader.advance(block_len);
|
|
} else {
|
|
self.buffer.resize(block_len, 0u8);
|
|
self.reader.read_exact(&mut self.buffer[..])?;
|
|
}
|
|
|
|
Ok(true)
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub fn offset(&self) -> usize {
|
|
self.offset
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub fn advance(&mut self, num_bytes: usize) {
|
|
self.offset += num_bytes;
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub fn buffer(&self) -> &[u8] {
|
|
&self.buffer[self.offset..]
|
|
}
|
|
}
|
|
|
|
impl io::Read for BlockReader {
|
|
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
|
|
let len = self.buffer().read(buf)?;
|
|
self.advance(len);
|
|
Ok(len)
|
|
}
|
|
|
|
fn read_to_end(&mut self, buf: &mut Vec<u8>) -> io::Result<usize> {
|
|
let len = self.buffer.len();
|
|
buf.extend_from_slice(self.buffer());
|
|
self.advance(len);
|
|
Ok(len)
|
|
}
|
|
|
|
fn read_exact(&mut self, buf: &mut [u8]) -> io::Result<()> {
|
|
self.buffer().read_exact(buf)?;
|
|
self.advance(buf.len());
|
|
Ok(())
|
|
}
|
|
}
|