mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-09 02:22:54 +00:00
* document a new sstable format * add support for changing target block size * use new format for sstable index * handle sstable version errror * use very small blocks for proptests * add a footer structure
203 lines
5.8 KiB
Rust
203 lines
5.8 KiB
Rust
use std::io::{self, BufWriter, Write};
|
|
use std::ops::Range;
|
|
|
|
use common::CountingWriter;
|
|
|
|
use super::value::ValueWriter;
|
|
use super::{value, vint, BlockReader};
|
|
|
|
const FOUR_BIT_LIMITS: usize = 1 << 4;
|
|
const VINT_MODE: u8 = 1u8;
|
|
const BLOCK_LEN: usize = 32_000;
|
|
|
|
pub struct DeltaWriter<W, TValueWriter>
|
|
where W: io::Write
|
|
{
|
|
block: Vec<u8>,
|
|
write: CountingWriter<BufWriter<W>>,
|
|
value_writer: TValueWriter,
|
|
// Only here to avoid allocations.
|
|
stateless_buffer: Vec<u8>,
|
|
block_len: usize,
|
|
}
|
|
|
|
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
|
|
where
|
|
W: io::Write,
|
|
TValueWriter: ValueWriter,
|
|
{
|
|
pub fn new(wrt: W) -> Self {
|
|
DeltaWriter {
|
|
block: Vec::with_capacity(BLOCK_LEN * 2),
|
|
write: CountingWriter::wrap(BufWriter::new(wrt)),
|
|
value_writer: TValueWriter::default(),
|
|
stateless_buffer: Vec::new(),
|
|
block_len: BLOCK_LEN,
|
|
}
|
|
}
|
|
|
|
pub fn set_block_len(&mut self, block_len: usize) {
|
|
self.block_len = block_len
|
|
}
|
|
|
|
pub fn flush_block(&mut self) -> io::Result<Option<Range<usize>>> {
|
|
if self.block.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
let start_offset = self.write.written_bytes() as usize;
|
|
let buffer: &mut Vec<u8> = &mut self.stateless_buffer;
|
|
self.value_writer.serialize_block(buffer);
|
|
self.value_writer.clear();
|
|
let block_len = buffer.len() + self.block.len();
|
|
self.write.write_all(&(block_len as u32).to_le_bytes())?;
|
|
self.write.write_all(&buffer[..])?;
|
|
self.write.write_all(&self.block[..])?;
|
|
let end_offset = self.write.written_bytes() as usize;
|
|
self.block.clear();
|
|
buffer.clear();
|
|
Ok(Some(start_offset..end_offset))
|
|
}
|
|
|
|
fn encode_keep_add(&mut self, keep_len: usize, add_len: usize) {
|
|
if keep_len < FOUR_BIT_LIMITS && add_len < FOUR_BIT_LIMITS {
|
|
let b = (keep_len | add_len << 4) as u8;
|
|
self.block.extend_from_slice(&[b])
|
|
} else {
|
|
let mut buf = [VINT_MODE; 20];
|
|
let mut len = 1 + vint::serialize(keep_len as u64, &mut buf[1..]);
|
|
len += vint::serialize(add_len as u64, &mut buf[len..]);
|
|
self.block.extend_from_slice(&buf[..len])
|
|
}
|
|
}
|
|
|
|
pub(crate) fn write_suffix(&mut self, common_prefix_len: usize, suffix: &[u8]) {
|
|
let keep_len = common_prefix_len;
|
|
let add_len = suffix.len();
|
|
self.encode_keep_add(keep_len, add_len);
|
|
self.block.extend_from_slice(suffix);
|
|
}
|
|
|
|
pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) {
|
|
self.value_writer.write(value);
|
|
}
|
|
|
|
pub fn flush_block_if_required(&mut self) -> io::Result<Option<Range<usize>>> {
|
|
if self.block.len() > self.block_len {
|
|
return self.flush_block();
|
|
}
|
|
Ok(None)
|
|
}
|
|
|
|
pub fn finish(self) -> CountingWriter<BufWriter<W>> {
|
|
self.write
|
|
}
|
|
}
|
|
|
|
pub struct DeltaReader<'a, TValueReader> {
|
|
common_prefix_len: usize,
|
|
suffix_range: Range<usize>,
|
|
value_reader: TValueReader,
|
|
block_reader: BlockReader<'a>,
|
|
idx: usize,
|
|
}
|
|
|
|
impl<'a, TValueReader> DeltaReader<'a, TValueReader>
|
|
where TValueReader: value::ValueReader
|
|
{
|
|
pub fn new<R: io::Read + 'a>(reader: R) -> Self {
|
|
DeltaReader {
|
|
idx: 0,
|
|
common_prefix_len: 0,
|
|
suffix_range: 0..0,
|
|
value_reader: TValueReader::default(),
|
|
block_reader: BlockReader::new(Box::new(reader)),
|
|
}
|
|
}
|
|
|
|
pub fn empty() -> Self {
|
|
DeltaReader::new(&b""[..])
|
|
}
|
|
|
|
fn deserialize_vint(&mut self) -> u64 {
|
|
self.block_reader.deserialize_u64()
|
|
}
|
|
|
|
fn read_keep_add(&mut self) -> Option<(usize, usize)> {
|
|
let b = {
|
|
let buf = &self.block_reader.buffer();
|
|
if buf.is_empty() {
|
|
return None;
|
|
}
|
|
buf[0]
|
|
};
|
|
self.block_reader.advance(1);
|
|
match b {
|
|
VINT_MODE => {
|
|
let keep = self.deserialize_vint() as usize;
|
|
let add = self.deserialize_vint() as usize;
|
|
Some((keep, add))
|
|
}
|
|
b => {
|
|
let keep = (b & 0b1111) as usize;
|
|
let add = (b >> 4) as usize;
|
|
Some((keep, add))
|
|
}
|
|
}
|
|
}
|
|
|
|
fn read_delta_key(&mut self) -> bool {
|
|
let Some((keep, add)) = self.read_keep_add() else {
|
|
return false;
|
|
};
|
|
self.common_prefix_len = keep;
|
|
let suffix_start = self.block_reader.offset();
|
|
self.suffix_range = suffix_start..(suffix_start + add);
|
|
self.block_reader.advance(add);
|
|
true
|
|
}
|
|
|
|
pub fn advance(&mut self) -> io::Result<bool> {
|
|
if self.block_reader.buffer().is_empty() {
|
|
if !self.block_reader.read_block()? {
|
|
return Ok(false);
|
|
}
|
|
let consumed_len = self.value_reader.load(self.block_reader.buffer())?;
|
|
self.block_reader.advance(consumed_len);
|
|
self.idx = 0;
|
|
} else {
|
|
self.idx += 1;
|
|
}
|
|
if !self.read_delta_key() {
|
|
return Ok(false);
|
|
}
|
|
Ok(true)
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub fn common_prefix_len(&self) -> usize {
|
|
self.common_prefix_len
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub fn suffix(&self) -> &[u8] {
|
|
self.block_reader.buffer_from_to(self.suffix_range.clone())
|
|
}
|
|
|
|
#[inline(always)]
|
|
pub fn value(&self) -> &TValueReader::Value {
|
|
self.value_reader.value(self.idx)
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::DeltaReader;
|
|
use crate::value::U64MonotonicValueReader;
|
|
|
|
#[test]
|
|
fn test_empty() {
|
|
let mut delta_reader: DeltaReader<U64MonotonicValueReader> = DeltaReader::empty();
|
|
assert!(!delta_reader.advance().unwrap());
|
|
}
|
|
}
|