Added sstable sub-crate

This commit is contained in:
Paul Masurel
2020-11-16 15:27:41 +09:00
parent b7c8b07ca4
commit d5145c3a2d
13 changed files with 1067 additions and 2 deletions

View File

@@ -34,6 +34,7 @@ uuid = { version = "0.8", features = ["v4", "serde"] }
crossbeam = "0.8"
futures = {version = "0.3", features=["thread-pool"] }
tantivy-query-grammar = { version="0.14.0-dev", path="./query-grammar" }
tantivy-sstable = {version="0.14.0-dev", path="./sstable"}
stable_deref_trait = "1"
rust-stemmers = "1"
downcast-rs = "1"
@@ -81,7 +82,7 @@ unstable = [] # useful for benches.
wasm-bindgen = ["uuid/wasm-bindgen"]
[workspace]
members = ["query-grammar"]
members = ["query-grammar", "sstable"]
[badges]
travis-ci = { repository = "tantivy-search/tantivy" }

View File

@@ -516,7 +516,7 @@ mod tests {
let field = schema.get_field("num_likes").unwrap();
let mut index = Index::create_from_tempdir(schema)?;
let mut writer = index.writer_for_tests()?;
writer.commit().unwrap();
writer.commit()?;
let reader = index
.reader_builder()
.reload_policy(ReloadPolicy::Manual)

View File

@@ -0,0 +1,5 @@
usse
pub struct TermDictionaryBuilder<W> {
sstable: sstable
}

5
sstable/.gitignore vendored Normal file
View File

@@ -0,0 +1,5 @@
.idea
Cargo.lock
sstable.iml
/target
**/*.rs.bk

27
sstable/Cargo.toml Normal file
View File

@@ -0,0 +1,27 @@
[package]
name = "tantivy-sstable"
version = "0.14.0-dev"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
[dependencies]
slice-deque="0.1"
byteorder = "1.2"
jemallocator = "*"
[[bench]]
name = "merge_benchmark"
harness = false
[profile.bench]
opt-level = 3
debug = false
overflow-checks = false
lto = true
rpath = false
debug-assertions = false
codegen-units = 16
incremental = false
[dev-dependencies]
criterion = "0.2"
rand = "0.6"

View File

@@ -0,0 +1,61 @@
#[macro_use]
extern crate criterion;
extern crate rand;
extern crate sstable;
use sstable::{SSTable, VoidSSTable};
use criterion::Criterion;
use rand::prelude::*;
use std::collections::BTreeSet;
use sstable::VoidMerge;
const NUM_SSTABLE: usize = 18;
fn generate_key(rng: &mut StdRng) -> String {
let len = rng.gen_range(5, 10);
(0..len)
.map(|_| {
let b = rng.gen_range(96u8, 96 + 16u8);
char::from(b)
})
.collect::<String>()
}
fn create_sstables() -> Vec<Vec<u8>> {
let mut keyset = BTreeSet::new();
let seed = [1u8; 32];
let mut rnd = StdRng::from_seed(seed);
while keyset.len() < 10_000 {
keyset.insert(generate_key(&mut rnd));
}
let mut buffers = (0..NUM_SSTABLE).map(|_| Vec::new()).collect::<Vec<Vec<u8>>>();
{
let mut writers: Vec<_> = buffers.iter_mut().map(VoidSSTable::writer).collect();
for key in keyset {
for writer in &mut writers {
if rnd.gen_bool(0.2) {
writer.write(key.as_bytes(), &()).unwrap();
}
}
}
for writer in writers {
writer.finalize().unwrap();
}
}
buffers
}
fn merge_fast(buffers: &[Vec<u8>]) {
let readers: Vec<&[u8]> = buffers.iter().map(|buf| &buf[..]).collect::<Vec<_>>();
let mut buffer = Vec::with_capacity(10_000_000);
assert!(VoidSSTable::merge(readers, &mut buffer, VoidMerge).is_ok());
}
fn criterion_benchmark(c: &mut Criterion) {
let buffers = create_sstables();
c.bench_function("Merge fast", move |b| b.iter(|| merge_fast(&buffers)));
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

View File

@@ -0,0 +1,34 @@
use std::io;
use super::BLOCK_LEN;
use byteorder::{LittleEndian, ReadBytesExt};
pub struct BlockReader<'a> {
buffer: Vec<u8>,
reader: Box<dyn io::Read + 'a>
}
impl<'a> BlockReader<'a> {
pub fn new(reader: Box<dyn io::Read + 'a>) -> BlockReader<'a> {
BlockReader {
buffer: Vec::with_capacity(BLOCK_LEN),
reader
}
}
pub fn read_block(&mut self) -> io::Result<bool> {
let block_len = self.reader.read_u32::<LittleEndian>()?;
if block_len == 0u32 {
self.buffer.clear();
Ok(false)
} else {
self.buffer.resize(block_len as usize, 0u8);
self.reader.read_exact(&mut self.buffer[..])?;
Ok(true)
}
}
pub fn buffer(&self) -> &[u8] {
&self.buffer
}
}

435
sstable/src/lib.rs Normal file
View File

@@ -0,0 +1,435 @@
extern crate jemallocator;
#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;
extern crate slice_deque;
extern crate core;
extern crate byteorder;
use std::io::{self, Write, BufWriter};
use merge::ValueMerger;
use byteorder::{ByteOrder, LittleEndian};
use std::usize;
pub(crate) mod vint;
pub mod value;
pub mod merge;
mod block_reader;
pub use self::block_reader::BlockReader;
pub use self::merge::VoidMerge;
const BLOCK_LEN: usize = 256_000;
const END_CODE: u8 = 0u8;
const VINT_MODE: u8 = 1u8;
const DEFAULT_KEY_CAPACITY: usize = 50;
const FOUR_BIT_LIMITS: usize = 1 << 4;
pub(crate) fn common_prefix_len(left: &[u8], right: &[u8]) -> usize {
left.iter().cloned()
.zip(right.iter().cloned())
.take_while(|(left, right)| left==right)
.count()
}
pub trait SSTable: Sized {
type Value;
type Reader: value::ValueReader<Value=Self::Value>;
type Writer: value::ValueWriter<Value=Self::Value>;
fn delta_writer<W: io::Write>(write: W) -> DeltaWriter<W, Self::Writer> {
DeltaWriter {
block: vec![0u8; 4],
write: BufWriter::new(write),
value_writer: Self::Writer::default()
}
}
fn writer<W: io::Write>(write: W) -> Writer<W, Self::Writer> {
Writer {
previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
delta_writer: Self::delta_writer(write)
}
}
fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::Reader> {
DeltaReader {
common_prefix_len: 0,
suffix_start: 0,
suffix_end: 0,
offset: 0,
value_reader: Self::Reader::default(),
block_reader: BlockReader::new(Box::new(reader)),
}
}
fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::Reader> {
Reader {
key: Vec::with_capacity(DEFAULT_KEY_CAPACITY),
delta_reader: Self::delta_reader(reader)
}
}
fn merge<R: io::Read, W: io::Write, M: ValueMerger<Self::Value>>(io_readers: Vec<R>, w: W, merger: M) -> io::Result<()> {
let mut readers = vec![];
for io_reader in io_readers.into_iter() {
let reader = Self::reader(io_reader);
readers.push(reader)
}
let writer = Self::writer(w);
merge::merge_sstable::<Self, _, _>(readers, writer, merger)
}
}
pub struct VoidSSTable;
impl SSTable for VoidSSTable {
type Value = ();
type Reader = value::VoidReader;
type Writer = value::VoidWriter;
}
pub struct Reader<'a, TValueReader> {
key: Vec<u8>,
delta_reader: DeltaReader<'a, TValueReader>,
}
impl<'a, TValueReader> Reader<'a, TValueReader>
where TValueReader: value::ValueReader {
pub fn advance(&mut self) -> io::Result<bool> {
if self.delta_reader.advance()? {
let common_prefix_len = self.delta_reader.common_prefix_len();
let suffix = self.delta_reader.suffix();
let new_len = self.delta_reader.common_prefix_len() + suffix.len();
self.key.resize(new_len, 0u8);
self.key[common_prefix_len..].copy_from_slice(suffix);
Ok(true)
} else {
Ok(false)
}
}
pub fn key(&self) -> &[u8] {
&self.key
}
pub fn value(&self) -> &TValueReader::Value {
self.delta_reader.value()
}
pub(crate) fn into_delta_reader(self) -> DeltaReader<'a, TValueReader> {
assert!(self.key.is_empty());
self.delta_reader
}
}
impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> {
fn as_ref(&self) -> &[u8] {
&self.key
}
}
pub struct Writer<W, TValueWriter>
where W: io::Write {
previous_key: Vec<u8>,
delta_writer: DeltaWriter<W, TValueWriter>,
}
impl<W, TValueWriter> Writer<W, TValueWriter>
where W: io::Write, TValueWriter: value::ValueWriter {
pub(crate) fn current_key(&self) -> &[u8] {
&self.previous_key[..]
}
pub(crate) fn write_key(&mut self, key: &[u8]) {
let keep_len = common_prefix_len(&self.previous_key, key);
let add_len = key.len() - keep_len;
let increasing_keys =
add_len > 0 &&
(self.previous_key.len() == keep_len ||
self.previous_key[keep_len] < key[keep_len]);
assert!(increasing_keys, "Keys should be increasing. ({:?} > {:?})", self.previous_key, key);
self.previous_key.resize(key.len(), 0u8);
self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]);
self.delta_writer.write_suffix(
keep_len,
&key[keep_len..]);
}
pub(crate) fn into_delta_writer(self) -> DeltaWriter<W, TValueWriter> {
self.delta_writer
}
pub fn write(&mut self, key: &[u8], value: &TValueWriter::Value) -> io::Result<()> {
self.write_key(key);
self.write_value(value);
self.delta_writer.flush_block_if_required()?;
Ok(())
}
pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) {
self.delta_writer.write_value(value)
}
pub fn finalize(self) -> io::Result<()> {
self.delta_writer.finalize()
}
}
pub struct DeltaWriter<W, TValueWriter>
where W: io::Write {
block: Vec<u8>,
write: BufWriter<W>,
value_writer: TValueWriter,
}
impl<W, TValueWriter> DeltaWriter<W, TValueWriter>
where W: io::Write, TValueWriter: value::ValueWriter {
fn flush_block(&mut self) -> io::Result<()> {
let block_len = self.block.len() as u32;
LittleEndian::write_u32(&mut self.block[..4], block_len - 4u32);
self.write.write_all(&mut self.block[..])?;
self.block.resize(4, 0u8);
Ok(())
}
fn encode_keep_add(&mut self, keep_len: usize, add_len: usize) {
if keep_len < FOUR_BIT_LIMITS && add_len < FOUR_BIT_LIMITS {
let b = (keep_len | add_len << 4) as u8;
self.block.extend_from_slice(&[b])
} else {
let mut buf = [1u8; 20];
let mut len = 1 + vint::serialize(keep_len as u64, &mut buf[1..]);
len += vint::serialize(add_len as u64, &mut buf[len..]);
self.block.extend_from_slice(&mut buf[..len])
}
}
pub(crate) fn write_suffix(&mut self, common_prefix_len: usize, suffix: &[u8]) {
let keep_len = common_prefix_len;
let add_len = suffix.len();
self.encode_keep_add(keep_len, add_len);
self.block.extend_from_slice(suffix);
}
pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) {
self.value_writer.write(value, &mut self.block);
}
pub fn write_delta(&mut self, common_prefix_len: usize, suffix: &[u8], value: &TValueWriter::Value) -> io::Result<()> {
self.write_suffix(common_prefix_len, suffix);
self.write_value(value);
self.flush_block_if_required()
}
pub fn flush_block_if_required(&mut self) -> io::Result<()> {
if self.block.len() > BLOCK_LEN {
self.flush_block()?;
}
Ok(())
}
pub fn finalize(mut self) -> io::Result<()> {
if self.block.len() > 4 {
self.flush_block()?;
}
self.flush_block()?;
Ok(())
}
}
pub struct DeltaReader<'a, TValueReader> {
common_prefix_len: usize,
suffix_start: usize,
suffix_end: usize,
offset: usize,
value_reader: TValueReader,
block_reader: BlockReader<'a>,
}
impl<'a, TValueReader> DeltaReader<'a, TValueReader>
where TValueReader: value::ValueReader {
fn deserialize_vint(&mut self) -> u64 {
let (consumed, result) =
vint::deserialize_read(&self.block_reader.buffer()[self.offset..]);
self.offset += consumed;
result
}
fn read_keep_add(&mut self) -> Option<(usize, usize)> {
let b = {
let buf = &self.block_reader.buffer()[self.offset..];
if buf.is_empty() {
return None;
}
buf[0]
};
self.offset += 1;
match b {
END_CODE => {
None
}
VINT_MODE => {
let keep = self.deserialize_vint() as usize;
let add = self.deserialize_vint() as usize;
Some((keep, add))
}
b => {
let keep = (b & 0b1111) as usize;
let add = (b >> 4) as usize;
Some((keep, add))
}
}
}
fn read_delta_key(&mut self) -> bool {
if let Some((keep, add)) = self.read_keep_add() {
self.common_prefix_len = keep;
self.suffix_start = self.offset;
self.suffix_end = self.suffix_start + add;
self.offset += add;
true
} else {
false
}
}
pub fn advance(&mut self) -> io::Result<bool> {
if self.block_reader.buffer().is_empty() {
if !self.block_reader.read_block()? {
return Ok(false);
}
}
if !self.read_delta_key() {
return Ok(false);
}
self.value_reader.read(&mut self.block_reader)?;
Ok(true)
}
pub fn common_prefix_len(&self) -> usize {
self.common_prefix_len
}
pub fn suffix(&self) -> &[u8] {
&self.block_reader.buffer()[self.suffix_start..self.suffix_end]
}
pub fn suffix_from(&self, offset: usize) -> &[u8] {
&self.block_reader.buffer()[self.suffix_start.wrapping_add(offset).wrapping_sub(self.common_prefix_len)..self.suffix_end]
}
pub fn value(&self) -> &TValueReader::Value {
self.value_reader.value()
}
}
#[cfg(test)]
mod test {
use common_prefix_len;
use super::VoidSSTable;
use super::SSTable;
use VoidMerge;
fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) {
assert_eq!(common_prefix_len(left.as_bytes(), right.as_bytes()), expect_len);
assert_eq!(common_prefix_len(right.as_bytes(), left.as_bytes()), expect_len);
}
#[test]
fn test_common_prefix_len() {
aux_test_common_prefix_len("a", "ab", 1);
aux_test_common_prefix_len("", "ab", 0);
aux_test_common_prefix_len("ab", "abc", 2);
aux_test_common_prefix_len("abde", "abce", 2);
}
#[test]
fn test_long_key_diff() {
let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::<Vec<_>>();
let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::<Vec<_>>();
let mut buffer = vec![];
{
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
assert!(sstable_writer.write(&long_key[..], &()).is_ok());
assert!(sstable_writer.write(&[0,3,4], &()).is_ok());
assert!(sstable_writer.write(&long_key2[..], &()).is_ok());
assert!(sstable_writer.finalize().is_ok());
}
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &long_key[..]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &[0,3,4]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &long_key2[..]);
assert!(!sstable_reader.advance().unwrap());
}
#[test]
fn test_simple_sstable() {
let mut buffer = vec![];
{
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
assert!(sstable_writer.write(&[17u8], &()).is_ok());
assert!(sstable_writer.write(&[17u8, 18u8, 19u8], &()).is_ok());
assert!(sstable_writer.write(&[17u8, 20u8], &()).is_ok());
assert!(sstable_writer.finalize().is_ok());
}
assert_eq!(&buffer, &[
7,0,0,0,
16u8, 17u8,
33u8, 18u8, 19u8,
17u8, 20u8,
0u8, 0u8, 0u8, 0u8]);
let mut sstable_reader = VoidSSTable::reader(&buffer[..]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &[17u8]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]);
assert!(sstable_reader.advance().unwrap());
assert_eq!(sstable_reader.key(), &[17u8, 20u8]);
assert!(!sstable_reader.advance().unwrap());
}
#[test]
#[should_panic]
fn test_simple_sstable_non_increasing_key() {
let mut buffer = vec![];
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
assert!(sstable_writer.write(&[17u8], &()).is_ok());
assert!(sstable_writer.write(&[16u8], &()).is_ok());
}
#[test]
fn test_merge_abcd_abe() {
let mut buffer = Vec::new();
{
let mut writer = VoidSSTable::writer(&mut buffer);
writer.write(b"abcd", &()).unwrap();
writer.write(b"abe", &()).unwrap();
writer.finalize().unwrap();
}
let mut output = Vec::new();
assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok());
assert_eq!(&output[..], &buffer[..]);
}
}

View File

@@ -0,0 +1,209 @@
use {SSTable, Reader};
use std::io;
use merge::{ValueMerger, SingleValueMerger};
use Writer;
use std::collections::BinaryHeap;
use std::cmp::Ordering;
use std::cmp::Ord;
use std::option::Option::None;
use std::mem;
use std::fmt::Debug;
use common_prefix_len;
fn pick_lowest_with_ties<'a, 'b, T, FnKey: Fn(&'b T)->K, K>(elements: &'b [T], key: FnKey, ids: &'a mut [usize]) -> (&'a [usize], &'a [usize])
where
FnKey: Fn(&'b T)->K,
K: Ord + Debug + 'b {
debug_assert!(!ids.is_empty());
if ids.len() <= 1 {
return (ids, &[]);
}
let mut smallest_key = key(&elements[ids[0]]);
let mut num_ties = 1;
for i in 1..ids.len() {
let cur = ids[i];
let cur_key = key(&elements[cur]);
match cur_key.cmp(&smallest_key) {
Ordering::Less => {
ids.swap(i, 0);
smallest_key = cur_key;
num_ties = 1;
}
Ordering::Equal => {
ids.swap(i, num_ties);
num_ties += 1;
}
Ordering::Greater => {}
}
}
(&ids[..num_ties], &ids[num_ties..])
}
#[derive(Clone, Copy, Debug)]
struct HeapItem(pub u32);
impl HeapItem {
fn new(common_prefix_len: u32, next_byte: u8) -> Self {
HeapItem(common_prefix_len << 8 | (next_byte as u32))
}
fn common_prefix_len(&self) -> usize {
self.0 as usize >> 8
}
}
struct Queue {
queue: BinaryHeap<u32>,
map: Vec<Vec<usize>>,
spares: Vec<Vec<usize>>,
}
fn heap_item_to_id(heap_item: &HeapItem) -> usize {
heap_item.0 as usize
}
impl Queue {
// helper to trick the borrow checker.
fn push_to_queue(heap_item: HeapItem,
idx: usize,
queue: &mut BinaryHeap<u32>,
map: &mut Vec<Vec<usize>>,
spares: &mut Vec<Vec<usize>>) {
let heap_id = heap_item_to_id(&heap_item);
let ids = &mut map[heap_id];
if ids.is_empty() {
queue.push(heap_item.0);
*ids = spares.pop().unwrap_or_else(Vec::new);
}
ids.push(idx);
}
pub fn with_capacity(capacity: usize) -> Self {
Queue {
queue: BinaryHeap::with_capacity(capacity),
map: (0..256 * 100).map(|_| Vec::new()).collect::<Vec<Vec<usize>>>(),
spares: (0..capacity).map(|_| Vec::with_capacity(capacity)).collect()
}
}
pub fn register(&mut self, common_prefix_len: u32, next_byte: u8, idx: usize) {
let heap_item = HeapItem::new(common_prefix_len, next_byte);
Queue::push_to_queue(heap_item, idx, &mut self.queue, &mut self.map, &mut self.spares);
}
pub fn pop(&mut self, dest: &mut Vec<usize>) -> Option<HeapItem> {
dest.clear();
self.queue
.pop()
.map(|heap_item| {
dest.clear();
let idx = mem::replace(&mut self.map[heap_item as usize], Vec::new());
self.spares.push(mem::replace(dest,idx));
HeapItem(heap_item)
})
}
}
pub fn merge_sstable<SST: SSTable, W: io::Write, M: ValueMerger<SST::Value>>(
unstarted_readers: Vec<Reader<SST::Reader>>,
writer: Writer<W, SST::Writer>,
mut merger: M
) -> io::Result<()> {
let mut delta_writer = writer.into_delta_writer();
let mut readers = vec![];
let mut empty_key_values: Option<M::TSingleValueMerger> = None;
for reader in unstarted_readers {
let mut delta_reader = reader.into_delta_reader();
if delta_reader.advance()? {
if delta_reader.suffix().is_empty() {
if let Some(value_merger) = empty_key_values.as_mut() {
value_merger.add(delta_reader.value());
} // the borrow checker does not allow an else here... that's a bit lame.
if empty_key_values.is_none() {
empty_key_values = Some(merger.new_value(delta_reader.value()));
}
if delta_reader.advance()? {
// duplicate keys are forbidden.
assert!(!delta_reader.suffix().is_empty());
readers.push(delta_reader);
}
} else {
readers.push(delta_reader);
}
}
}
if let Some(value_merger) = empty_key_values {
delta_writer.write_delta(0, &[], &value_merger.finish())?;
}
let mut queue = Queue::with_capacity(readers.len());
for (idx, delta_reader) in readers.iter().enumerate() {
queue.register(0u32, delta_reader.suffix()[0], idx);
}
let mut current_ids = Vec::with_capacity(readers.len());
while let Some(heap_item) = queue.pop(&mut current_ids) {
debug_assert!(!current_ids.is_empty());
let (tie_ids, others) = pick_lowest_with_ties(
&readers[..],
|reader| reader.suffix_from(heap_item.common_prefix_len()),
&mut current_ids[..]);
{
let first_reader = &readers[tie_ids[0]];
let suffix = first_reader.suffix_from(heap_item.common_prefix_len());
if tie_ids.len() > 1 {
let mut single_value_merger = merger.new_value(first_reader.value());
for &min_tie_id in &tie_ids[1..] {
single_value_merger.add(readers[min_tie_id].value());
}
delta_writer.write_delta(heap_item.common_prefix_len(),
suffix,
&single_value_merger.finish())?;
} else {
delta_writer.write_delta(heap_item.common_prefix_len(),
suffix,
first_reader.value())?;
}
for &reader_id in others {
let reader = &readers[reader_id];
let reader_suffix = reader.suffix_from(heap_item.common_prefix_len());
let extra_common_prefix_len = common_prefix_len(reader_suffix, suffix);
let next_byte = reader_suffix[extra_common_prefix_len];
queue.register(heap_item.common_prefix_len() as u32 + extra_common_prefix_len as u32, next_byte, reader_id)
}
}
for &tie_id in tie_ids {
let reader = &mut readers[tie_id];
if reader.advance()? {
queue.register(reader.common_prefix_len() as u32, reader.suffix()[0], tie_id);
}
}
}
delta_writer.finalize()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::pick_lowest_with_ties;
#[test]
fn test_pick_lowest_with_ties() {
{
let mut ids = [0,1,3,2,5,4];
assert_eq!(pick_lowest_with_ties(&[1,4,3,7,1,3,5], |el| *el, &mut ids),
(&[0,4][..], &[3,2,5,1][..]));
}
{
let mut ids = [5,3,2,1,4];
assert_eq!(pick_lowest_with_ties(&[1,4,3,7,1,3,5], |el| *el, &mut ids),
(&[4][..], &[2,3,1,5][..]));
}
}
}

View File

@@ -0,0 +1,70 @@
use {SSTable, Reader, Writer};
use super::SingleValueMerger;
use super::ValueMerger;
use std::io;
use std::collections::BinaryHeap;
use std::cmp::Ordering;
use std::collections::binary_heap::PeekMut;
struct HeapItem<B: AsRef<[u8]>>(B);
impl<B: AsRef<[u8]>> Ord for HeapItem<B> {
fn cmp(&self, other: &Self) -> Ordering {
other.0.as_ref().cmp(self.0.as_ref())
}
}
impl<B: AsRef<[u8]>> PartialOrd for HeapItem<B> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(other.0.as_ref().cmp(self.0.as_ref()))
}
}
impl<B: AsRef<[u8]>> Eq for HeapItem<B> {}
impl<B: AsRef<[u8]>> PartialEq for HeapItem<B> {
fn eq(&self, other: &Self) -> bool {
self.0.as_ref() == other.0.as_ref()
}
}
pub fn merge_sstable<SST: SSTable, W: io::Write, M: ValueMerger<SST::Value>>(
readers: Vec<Reader<SST::Reader>>,
mut writer: Writer<W, SST::Writer>,
mut merger: M) -> io::Result<()> {
let mut heap: BinaryHeap<HeapItem<Reader<SST::Reader>>> = BinaryHeap::with_capacity(readers.len());
for mut reader in readers {
if reader.advance()? {
heap.push(HeapItem(reader));
}
}
loop {
let len = heap.len();
let mut value_merger;
if let Some(mut head) = heap.peek_mut() {
writer.write_key(head.0.key());
value_merger = merger.new_value(head.0.value());
if !head.0.advance()? {
PeekMut::pop(head);
}
} else {
break;
}
for _ in 0..len - 1 {
if let Some(mut head) = heap.peek_mut() {
if head.0.key() == writer.current_key() {
value_merger.add(head.0.value());
if !head.0.advance()? {
PeekMut::pop(head) ;
}
continue;
}
}
break;
}
let value = value_merger.finish();
writer.write_value(&value);
}
writer.finalize()?;
Ok(())
}

112
sstable/src/merge/mod.rs Normal file
View File

@@ -0,0 +1,112 @@
mod fast_merge;
mod heap_merge;
pub use self::fast_merge::merge_sstable;
pub use self::heap_merge::merge_sstable as merge_sstable_heap;
pub trait SingleValueMerger<V> {
fn add(&mut self, v: &V);
fn finish(self) -> V;
}
pub trait ValueMerger<V> {
type TSingleValueMerger: SingleValueMerger<V>;
fn new_value(&mut self, v: &V) -> Self::TSingleValueMerger;
}
#[derive(Default)]
pub struct KeepFirst;
pub struct FirstVal<V>(V);
impl<V: Clone> ValueMerger<V> for KeepFirst {
type TSingleValueMerger = FirstVal<V>;
fn new_value(&mut self, v: &V) -> FirstVal<V> {
FirstVal(v.clone())
}
}
impl<V> SingleValueMerger<V> for FirstVal<V> {
fn add(&mut self, _: &V) {}
fn finish(self) -> V {
self.0
}
}
pub struct VoidMerge;
impl ValueMerger<()> for VoidMerge {
type TSingleValueMerger = ();
fn new_value(&mut self, _: &()) -> () {
()
}
}
impl SingleValueMerger<()> for () {
fn add(&mut self, _: &()) {}
fn finish(self) -> () {
()
}
}
#[cfg(test)]
mod tests {
use VoidSSTable;
use SSTable;
use super::VoidMerge;
use std::str;
use std::collections::BTreeSet;
fn write_sstable(keys: &[&'static str]) -> Vec<u8> {
let mut buffer: Vec<u8> = vec![];
{
let mut sstable_writer = VoidSSTable::writer(&mut buffer);
for &key in keys {
assert!(sstable_writer.write(key.as_bytes(), &()).is_ok());
}
assert!(sstable_writer.finalize().is_ok());
}
buffer
}
fn merge_test_aux(arrs: &[&[&'static str]]) {
let sstables = arrs.iter()
.cloned()
.map(write_sstable)
.collect::<Vec<_>>();
let sstables_ref: Vec<&[u8]> = sstables.iter()
.map(|s| s.as_ref())
.collect();
let mut merged = BTreeSet::new();
for &arr in arrs.iter() {
for &s in arr {
merged.insert(s.to_string());
}
}
let mut w = Vec::new();
assert!(VoidSSTable::merge(sstables_ref, &mut w, VoidMerge).is_ok());
}
#[test]
fn test_merge() {
merge_test_aux(&[]);
merge_test_aux(&[&["a"]]);
merge_test_aux(&[&["a","b"], &["ab"]]); // a, ab, b
merge_test_aux(&[&["a","b"], &["a", "b"]]);
merge_test_aux(&[
&["happy", "hello", "payer", "tax"],
&["habitat", "hello", "zoo"],
&[],
&["a"],
]);
merge_test_aux(&[&["a"]]);
merge_test_aux(&[&["a","b"], &["ab"]]);
merge_test_aux(&[&["a","b"], &["a", "b"]]);
}
}

43
sstable/src/value.rs Normal file
View File

@@ -0,0 +1,43 @@
use std::io;
use BlockReader;
pub trait ValueReader: Default {
type Value;
fn value(&self) -> &Self::Value;
fn read(&mut self, reader: &mut BlockReader) -> io::Result<()>;
}
pub trait ValueWriter: Default {
type Value;
fn write(&mut self, val: &Self::Value, writer: &mut Vec<u8>);
}
#[derive(Default)]
pub struct VoidReader;
impl ValueReader for VoidReader {
type Value = ();
fn value(&self) -> &Self::Value {
&()
}
fn read(&mut self, _reader: &mut BlockReader) -> io::Result<()> {
Ok(())
}
}
#[derive(Default)]
pub struct VoidWriter;
impl ValueWriter for VoidWriter {
type Value = ();
fn write(&mut self, _: &Self::Value, _: &mut Vec<u8>) {}
}

63
sstable/src/vint.rs Normal file
View File

@@ -0,0 +1,63 @@
const CONTINUE_BIT: u8 = 128u8;
pub fn serialize(mut val: u64, buffer: &mut [u8]) -> usize {
for (i, b) in buffer.iter_mut().enumerate() {
let next_byte: u8 = (val & 127u64) as u8;
val = val >> 7;
if val == 0u64 {
*b = next_byte;
return i + 1;
} else {
*b = next_byte | CONTINUE_BIT;
}
}
10 //< actually unreachable
}
// super slow but we don't care
pub fn deserialize_read(buf: &[u8]) -> (usize, u64) {
let mut result = 0u64;
let mut shift = 0u64;
let mut consumed = 0;
for &b in buf {
consumed += 1;
result |= u64::from(b % 128u8) << shift;
if b < CONTINUE_BIT {
break;
}
shift += 7;
}
(consumed, result)
}
#[cfg(test)]
mod tests {
use vint::serialize;
use vint::deserialize_read;
use std::u64;
fn aux_test_int(val: u64, expect_len: usize) {
let mut buffer = [0u8; 14];
assert_eq!(serialize(val, &mut buffer[..]), expect_len);
assert_eq!(deserialize_read(&buffer), (expect_len, val));
}
#[test]
fn test_vint() {
aux_test_int(0u64, 1);
aux_test_int(17u64, 1);
aux_test_int(127u64, 1);
aux_test_int(128u64, 2);
aux_test_int(123423418u64, 4);
for i in 1..63 {
let power_of_two = 1u64 << i;
aux_test_int(power_of_two + 1, (i / 7) + 1);
aux_test_int(power_of_two, (i / 7) + 1 );
aux_test_int(power_of_two - 1, ((i-1) / 7) + 1);
}
aux_test_int(u64::MAX, 10);
}
}