diff --git a/Cargo.toml b/Cargo.toml index cfa706a4f..66f707d52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ uuid = { version = "0.8", features = ["v4", "serde"] } crossbeam = "0.8" futures = {version = "0.3", features=["thread-pool"] } tantivy-query-grammar = { version="0.14.0-dev", path="./query-grammar" } +tantivy-sstable = {version="0.14.0-dev", path="./sstable"} stable_deref_trait = "1" rust-stemmers = "1" downcast-rs = "1" @@ -81,7 +82,7 @@ unstable = [] # useful for benches. wasm-bindgen = ["uuid/wasm-bindgen"] [workspace] -members = ["query-grammar"] +members = ["query-grammar", "sstable"] [badges] travis-ci = { repository = "tantivy-search/tantivy" } diff --git a/src/core/index.rs b/src/core/index.rs index ad2d29969..12ef5e37f 100644 --- a/src/core/index.rs +++ b/src/core/index.rs @@ -516,7 +516,7 @@ mod tests { let field = schema.get_field("num_likes").unwrap(); let mut index = Index::create_from_tempdir(schema)?; let mut writer = index.writer_for_tests()?; - writer.commit().unwrap(); + writer.commit()?; let reader = index .reader_builder() .reload_policy(ReloadPolicy::Manual) diff --git a/src/termdict/sstable_termdict/term_dict.rs b/src/termdict/sstable_termdict/term_dict.rs new file mode 100644 index 000000000..5fceacfeb --- /dev/null +++ b/src/termdict/sstable_termdict/term_dict.rs @@ -0,0 +1,5 @@ +usse + +pub struct TermDictionaryBuilder { + sstable: sstable +} \ No newline at end of file diff --git a/sstable/.gitignore b/sstable/.gitignore new file mode 100644 index 000000000..f8100ca94 --- /dev/null +++ b/sstable/.gitignore @@ -0,0 +1,5 @@ +.idea +Cargo.lock +sstable.iml +/target +**/*.rs.bk diff --git a/sstable/Cargo.toml b/sstable/Cargo.toml new file mode 100644 index 000000000..d9f2ae61f --- /dev/null +++ b/sstable/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "tantivy-sstable" +version = "0.14.0-dev" +authors = ["Paul Masurel "] + +[dependencies] +slice-deque="0.1" +byteorder = "1.2" +jemallocator = "*" + +[[bench]] +name = "merge_benchmark" +harness = false + +[profile.bench] +opt-level = 3 +debug = false +overflow-checks = false +lto = true +rpath = false +debug-assertions = false +codegen-units = 16 +incremental = false + +[dev-dependencies] +criterion = "0.2" +rand = "0.6" \ No newline at end of file diff --git a/sstable/benches/merge_benchmark.rs b/sstable/benches/merge_benchmark.rs new file mode 100644 index 000000000..0b8974ebc --- /dev/null +++ b/sstable/benches/merge_benchmark.rs @@ -0,0 +1,61 @@ +#[macro_use] +extern crate criterion; +extern crate rand; +extern crate sstable; + +use sstable::{SSTable, VoidSSTable}; +use criterion::Criterion; +use rand::prelude::*; + +use std::collections::BTreeSet; +use sstable::VoidMerge; + +const NUM_SSTABLE: usize = 18; + +fn generate_key(rng: &mut StdRng) -> String { + let len = rng.gen_range(5, 10); + (0..len) + .map(|_| { + let b = rng.gen_range(96u8, 96 + 16u8); + char::from(b) + }) + .collect::() +} + +fn create_sstables() -> Vec> { + let mut keyset = BTreeSet::new(); + let seed = [1u8; 32]; + let mut rnd = StdRng::from_seed(seed); + while keyset.len() < 10_000 { + keyset.insert(generate_key(&mut rnd)); + } + let mut buffers = (0..NUM_SSTABLE).map(|_| Vec::new()).collect::>>(); + { + let mut writers: Vec<_> = buffers.iter_mut().map(VoidSSTable::writer).collect(); + for key in keyset { + for writer in &mut writers { + if rnd.gen_bool(0.2) { + writer.write(key.as_bytes(), &()).unwrap(); + } + } + } + for writer in writers { + writer.finalize().unwrap(); + } + } + buffers +} + +fn merge_fast(buffers: &[Vec]) { + let readers: Vec<&[u8]> = buffers.iter().map(|buf| &buf[..]).collect::>(); + let mut buffer = Vec::with_capacity(10_000_000); + assert!(VoidSSTable::merge(readers, &mut buffer, VoidMerge).is_ok()); +} + +fn criterion_benchmark(c: &mut Criterion) { + let buffers = create_sstables(); + c.bench_function("Merge fast", move |b| b.iter(|| merge_fast(&buffers))); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); \ No newline at end of file diff --git a/sstable/src/block_reader.rs b/sstable/src/block_reader.rs new file mode 100644 index 000000000..a26b959e9 --- /dev/null +++ b/sstable/src/block_reader.rs @@ -0,0 +1,34 @@ +use std::io; +use super::BLOCK_LEN; +use byteorder::{LittleEndian, ReadBytesExt}; + +pub struct BlockReader<'a> { + buffer: Vec, + reader: Box +} + +impl<'a> BlockReader<'a> { + + pub fn new(reader: Box) -> BlockReader<'a> { + BlockReader { + buffer: Vec::with_capacity(BLOCK_LEN), + reader + } + } + + pub fn read_block(&mut self) -> io::Result { + let block_len = self.reader.read_u32::()?; + if block_len == 0u32 { + self.buffer.clear(); + Ok(false) + } else { + self.buffer.resize(block_len as usize, 0u8); + self.reader.read_exact(&mut self.buffer[..])?; + Ok(true) + } + } + + pub fn buffer(&self) -> &[u8] { + &self.buffer + } +} diff --git a/sstable/src/lib.rs b/sstable/src/lib.rs new file mode 100644 index 000000000..5d3ff17e3 --- /dev/null +++ b/sstable/src/lib.rs @@ -0,0 +1,435 @@ +extern crate jemallocator; + +#[global_allocator] +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; + +extern crate slice_deque; +extern crate core; +extern crate byteorder; + +use std::io::{self, Write, BufWriter}; +use merge::ValueMerger; +use byteorder::{ByteOrder, LittleEndian}; +use std::usize; + +pub(crate) mod vint; +pub mod value; +pub mod merge; +mod block_reader; + +pub use self::block_reader::BlockReader; + +pub use self::merge::VoidMerge; + +const BLOCK_LEN: usize = 256_000; +const END_CODE: u8 = 0u8; +const VINT_MODE: u8 = 1u8; + +const DEFAULT_KEY_CAPACITY: usize = 50; +const FOUR_BIT_LIMITS: usize = 1 << 4; + +pub(crate) fn common_prefix_len(left: &[u8], right: &[u8]) -> usize { + left.iter().cloned() + .zip(right.iter().cloned()) + .take_while(|(left, right)| left==right) + .count() +} + +pub trait SSTable: Sized { + + type Value; + type Reader: value::ValueReader; + type Writer: value::ValueWriter; + + fn delta_writer(write: W) -> DeltaWriter { + DeltaWriter { + block: vec![0u8; 4], + write: BufWriter::new(write), + value_writer: Self::Writer::default() + } + } + + fn writer(write: W) -> Writer { + Writer { + previous_key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), + delta_writer: Self::delta_writer(write) + } + } + + fn delta_reader<'a, R: io::Read + 'a>(reader: R) -> DeltaReader<'a, Self::Reader> { + DeltaReader { + common_prefix_len: 0, + suffix_start: 0, + suffix_end: 0, + offset: 0, + value_reader: Self::Reader::default(), + block_reader: BlockReader::new(Box::new(reader)), + } + } + + fn reader<'a, R: io::Read + 'a>(reader: R) -> Reader<'a, Self::Reader> { + Reader { + key: Vec::with_capacity(DEFAULT_KEY_CAPACITY), + delta_reader: Self::delta_reader(reader) + } + } + + fn merge>(io_readers: Vec, w: W, merger: M) -> io::Result<()> { + let mut readers = vec![]; + for io_reader in io_readers.into_iter() { + let reader = Self::reader(io_reader); + readers.push(reader) + } + let writer = Self::writer(w); + merge::merge_sstable::(readers, writer, merger) + } +} + +pub struct VoidSSTable; + +impl SSTable for VoidSSTable { + type Value = (); + type Reader = value::VoidReader; + type Writer = value::VoidWriter; +} + + +pub struct Reader<'a, TValueReader> { + key: Vec, + delta_reader: DeltaReader<'a, TValueReader>, +} + +impl<'a, TValueReader> Reader<'a, TValueReader> + where TValueReader: value::ValueReader { + + pub fn advance(&mut self) -> io::Result { + if self.delta_reader.advance()? { + let common_prefix_len = self.delta_reader.common_prefix_len(); + let suffix = self.delta_reader.suffix(); + let new_len = self.delta_reader.common_prefix_len() + suffix.len(); + self.key.resize(new_len, 0u8); + self.key[common_prefix_len..].copy_from_slice(suffix); + Ok(true) + } else { + Ok(false) + } + + } + + pub fn key(&self) -> &[u8] { + &self.key + } + + pub fn value(&self) -> &TValueReader::Value { + self.delta_reader.value() + } + + pub(crate) fn into_delta_reader(self) -> DeltaReader<'a, TValueReader> { + assert!(self.key.is_empty()); + self.delta_reader + } +} + +impl<'a, TValueReader> AsRef<[u8]> for Reader<'a, TValueReader> { + fn as_ref(&self) -> &[u8] { + &self.key + } +} + + +pub struct Writer + where W: io::Write { + previous_key: Vec, + delta_writer: DeltaWriter, +} + +impl Writer + where W: io::Write, TValueWriter: value::ValueWriter { + + pub(crate) fn current_key(&self) -> &[u8] { + &self.previous_key[..] + } + + pub(crate) fn write_key(&mut self, key: &[u8]) { + let keep_len = common_prefix_len(&self.previous_key, key); + let add_len = key.len() - keep_len; + let increasing_keys = + add_len > 0 && + (self.previous_key.len() == keep_len || + self.previous_key[keep_len] < key[keep_len]); + assert!(increasing_keys, "Keys should be increasing. ({:?} > {:?})", self.previous_key, key); + self.previous_key.resize(key.len(), 0u8); + self.previous_key[keep_len..].copy_from_slice(&key[keep_len..]); + self.delta_writer.write_suffix( + keep_len, + &key[keep_len..]); + } + + pub(crate) fn into_delta_writer(self) -> DeltaWriter { + self.delta_writer + } + + pub fn write(&mut self, key: &[u8], value: &TValueWriter::Value) -> io::Result<()> { + self.write_key(key); + self.write_value(value); + self.delta_writer.flush_block_if_required()?; + Ok(()) + } + + pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) { + self.delta_writer.write_value(value) + } + + pub fn finalize(self) -> io::Result<()> { + self.delta_writer.finalize() + } +} + + +pub struct DeltaWriter + where W: io::Write { + block: Vec, + write: BufWriter, + value_writer: TValueWriter, +} + +impl DeltaWriter + where W: io::Write, TValueWriter: value::ValueWriter { + + fn flush_block(&mut self) -> io::Result<()> { + let block_len = self.block.len() as u32; + LittleEndian::write_u32(&mut self.block[..4], block_len - 4u32); + self.write.write_all(&mut self.block[..])?; + self.block.resize(4, 0u8); + Ok(()) + } + + fn encode_keep_add(&mut self, keep_len: usize, add_len: usize) { + if keep_len < FOUR_BIT_LIMITS && add_len < FOUR_BIT_LIMITS { + let b = (keep_len | add_len << 4) as u8; + self.block.extend_from_slice(&[b]) + } else { + let mut buf = [1u8; 20]; + let mut len = 1 + vint::serialize(keep_len as u64, &mut buf[1..]); + len += vint::serialize(add_len as u64, &mut buf[len..]); + self.block.extend_from_slice(&mut buf[..len]) + } + } + + pub(crate) fn write_suffix(&mut self, common_prefix_len: usize, suffix: &[u8]) { + let keep_len = common_prefix_len; + let add_len = suffix.len(); + self.encode_keep_add(keep_len, add_len); + self.block.extend_from_slice(suffix); + } + + pub(crate) fn write_value(&mut self, value: &TValueWriter::Value) { + self.value_writer.write(value, &mut self.block); + } + + pub fn write_delta(&mut self, common_prefix_len: usize, suffix: &[u8], value: &TValueWriter::Value) -> io::Result<()> { + self.write_suffix(common_prefix_len, suffix); + self.write_value(value); + self.flush_block_if_required() + } + + pub fn flush_block_if_required(&mut self) -> io::Result<()> { + if self.block.len() > BLOCK_LEN { + self.flush_block()?; + } + Ok(()) + } + + pub fn finalize(mut self) -> io::Result<()> { + if self.block.len() > 4 { + self.flush_block()?; + } + self.flush_block()?; + Ok(()) + } +} + + +pub struct DeltaReader<'a, TValueReader> { + common_prefix_len: usize, + suffix_start: usize, + suffix_end: usize, + offset: usize, + value_reader: TValueReader, + block_reader: BlockReader<'a>, +} + +impl<'a, TValueReader> DeltaReader<'a, TValueReader> + where TValueReader: value::ValueReader { + + fn deserialize_vint(&mut self) -> u64 { + let (consumed, result) = + vint::deserialize_read(&self.block_reader.buffer()[self.offset..]); + self.offset += consumed; + result + } + + fn read_keep_add(&mut self) -> Option<(usize, usize)> { + let b = { + let buf = &self.block_reader.buffer()[self.offset..]; + if buf.is_empty() { + return None; + } + buf[0] + }; + self.offset += 1; + match b { + END_CODE => { + None + } + VINT_MODE => { + let keep = self.deserialize_vint() as usize; + let add = self.deserialize_vint() as usize; + Some((keep, add)) + } + b => { + let keep = (b & 0b1111) as usize; + let add = (b >> 4) as usize; + Some((keep, add)) + } + } + } + + fn read_delta_key(&mut self) -> bool { + if let Some((keep, add)) = self.read_keep_add() { + self.common_prefix_len = keep; + self.suffix_start = self.offset; + self.suffix_end = self.suffix_start + add; + self.offset += add; + true + } else { + false + } + } + + + pub fn advance(&mut self) -> io::Result { + if self.block_reader.buffer().is_empty() { + if !self.block_reader.read_block()? { + return Ok(false); + } + } + if !self.read_delta_key() { + return Ok(false); + } + self.value_reader.read(&mut self.block_reader)?; + Ok(true) + } + + pub fn common_prefix_len(&self) -> usize { + self.common_prefix_len + } + + pub fn suffix(&self) -> &[u8] { + &self.block_reader.buffer()[self.suffix_start..self.suffix_end] + } + + pub fn suffix_from(&self, offset: usize) -> &[u8] { + &self.block_reader.buffer()[self.suffix_start.wrapping_add(offset).wrapping_sub(self.common_prefix_len)..self.suffix_end] + } + + pub fn value(&self) -> &TValueReader::Value { + self.value_reader.value() + } +} + + +#[cfg(test)] +mod test { + use common_prefix_len; + use super::VoidSSTable; + use super::SSTable; + use VoidMerge; + + fn aux_test_common_prefix_len(left: &str, right: &str, expect_len: usize) { + assert_eq!(common_prefix_len(left.as_bytes(), right.as_bytes()), expect_len); + assert_eq!(common_prefix_len(right.as_bytes(), left.as_bytes()), expect_len); + } + + #[test] + fn test_common_prefix_len() { + aux_test_common_prefix_len("a", "ab", 1); + aux_test_common_prefix_len("", "ab", 0); + aux_test_common_prefix_len("ab", "abc", 2); + aux_test_common_prefix_len("abde", "abce", 2); + } + + + #[test] + fn test_long_key_diff() { + let long_key = (0..1_024).map(|x| (x % 255) as u8).collect::>(); + let long_key2 = (1..300).map(|x| (x % 255) as u8).collect::>(); + let mut buffer = vec![]; + { + let mut sstable_writer = VoidSSTable::writer(&mut buffer); + assert!(sstable_writer.write(&long_key[..], &()).is_ok()); + assert!(sstable_writer.write(&[0,3,4], &()).is_ok()); + assert!(sstable_writer.write(&long_key2[..], &()).is_ok()); + assert!(sstable_writer.finalize().is_ok()); + } + let mut sstable_reader = VoidSSTable::reader(&buffer[..]); + assert!(sstable_reader.advance().unwrap()); + assert_eq!(sstable_reader.key(), &long_key[..]); + assert!(sstable_reader.advance().unwrap()); + assert_eq!(sstable_reader.key(), &[0,3,4]); + assert!(sstable_reader.advance().unwrap()); + assert_eq!(sstable_reader.key(), &long_key2[..]); + assert!(!sstable_reader.advance().unwrap()); + } + + #[test] + fn test_simple_sstable() { + let mut buffer = vec![]; + { + let mut sstable_writer = VoidSSTable::writer(&mut buffer); + assert!(sstable_writer.write(&[17u8], &()).is_ok()); + assert!(sstable_writer.write(&[17u8, 18u8, 19u8], &()).is_ok()); + assert!(sstable_writer.write(&[17u8, 20u8], &()).is_ok()); + assert!(sstable_writer.finalize().is_ok()); + } + assert_eq!(&buffer, &[ + 7,0,0,0, + 16u8, 17u8, + 33u8, 18u8, 19u8, + 17u8, 20u8, + 0u8, 0u8, 0u8, 0u8]); + let mut sstable_reader = VoidSSTable::reader(&buffer[..]); + assert!(sstable_reader.advance().unwrap()); + assert_eq!(sstable_reader.key(), &[17u8]); + assert!(sstable_reader.advance().unwrap()); + assert_eq!(sstable_reader.key(), &[17u8, 18u8, 19u8]); + assert!(sstable_reader.advance().unwrap()); + assert_eq!(sstable_reader.key(), &[17u8, 20u8]); + assert!(!sstable_reader.advance().unwrap()); + } + + + #[test] + #[should_panic] + fn test_simple_sstable_non_increasing_key() { + let mut buffer = vec![]; + let mut sstable_writer = VoidSSTable::writer(&mut buffer); + assert!(sstable_writer.write(&[17u8], &()).is_ok()); + assert!(sstable_writer.write(&[16u8], &()).is_ok()); + } + + #[test] + fn test_merge_abcd_abe() { + let mut buffer = Vec::new(); + { + let mut writer = VoidSSTable::writer(&mut buffer); + writer.write(b"abcd", &()).unwrap(); + writer.write(b"abe", &()).unwrap(); + writer.finalize().unwrap(); + } + let mut output = Vec::new(); + assert!(VoidSSTable::merge(vec![&buffer[..], &buffer[..]], &mut output, VoidMerge).is_ok()); + assert_eq!(&output[..], &buffer[..]); + } + +} \ No newline at end of file diff --git a/sstable/src/merge/fast_merge.rs b/sstable/src/merge/fast_merge.rs new file mode 100644 index 000000000..40fbd3ad7 --- /dev/null +++ b/sstable/src/merge/fast_merge.rs @@ -0,0 +1,209 @@ +use {SSTable, Reader}; +use std::io; +use merge::{ValueMerger, SingleValueMerger}; +use Writer; +use std::collections::BinaryHeap; +use std::cmp::Ordering; +use std::cmp::Ord; +use std::option::Option::None; +use std::mem; +use std::fmt::Debug; +use common_prefix_len; + +fn pick_lowest_with_ties<'a, 'b, T, FnKey: Fn(&'b T)->K, K>(elements: &'b [T], key: FnKey, ids: &'a mut [usize]) -> (&'a [usize], &'a [usize]) + where + FnKey: Fn(&'b T)->K, + K: Ord + Debug + 'b { + debug_assert!(!ids.is_empty()); + if ids.len() <= 1 { + return (ids, &[]); + } + let mut smallest_key = key(&elements[ids[0]]); + let mut num_ties = 1; + for i in 1..ids.len() { + let cur = ids[i]; + let cur_key = key(&elements[cur]); + match cur_key.cmp(&smallest_key) { + Ordering::Less => { + ids.swap(i, 0); + smallest_key = cur_key; + num_ties = 1; + } + Ordering::Equal => { + ids.swap(i, num_ties); + num_ties += 1; + } + Ordering::Greater => {} + } + } + (&ids[..num_ties], &ids[num_ties..]) +} + + +#[derive(Clone, Copy, Debug)] +struct HeapItem(pub u32); + +impl HeapItem { + fn new(common_prefix_len: u32, next_byte: u8) -> Self { + HeapItem(common_prefix_len << 8 | (next_byte as u32)) + } + + fn common_prefix_len(&self) -> usize { + self.0 as usize >> 8 + } +} + +struct Queue { + queue: BinaryHeap, + map: Vec>, + spares: Vec>, +} + + +fn heap_item_to_id(heap_item: &HeapItem) -> usize { + heap_item.0 as usize +} + +impl Queue { + + // helper to trick the borrow checker. + fn push_to_queue(heap_item: HeapItem, + idx: usize, + queue: &mut BinaryHeap, + map: &mut Vec>, + spares: &mut Vec>) { + let heap_id = heap_item_to_id(&heap_item); + let ids = &mut map[heap_id]; + if ids.is_empty() { + queue.push(heap_item.0); + *ids = spares.pop().unwrap_or_else(Vec::new); + } + ids.push(idx); + } + + pub fn with_capacity(capacity: usize) -> Self { + Queue { + queue: BinaryHeap::with_capacity(capacity), + map: (0..256 * 100).map(|_| Vec::new()).collect::>>(), + spares: (0..capacity).map(|_| Vec::with_capacity(capacity)).collect() + } + } + + pub fn register(&mut self, common_prefix_len: u32, next_byte: u8, idx: usize) { + let heap_item = HeapItem::new(common_prefix_len, next_byte); + Queue::push_to_queue(heap_item, idx, &mut self.queue, &mut self.map, &mut self.spares); + } + + pub fn pop(&mut self, dest: &mut Vec) -> Option { + dest.clear(); + self.queue + .pop() + .map(|heap_item| { + dest.clear(); + let idx = mem::replace(&mut self.map[heap_item as usize], Vec::new()); + self.spares.push(mem::replace(dest,idx)); + HeapItem(heap_item) + }) + } +} + +pub fn merge_sstable>( + unstarted_readers: Vec>, + writer: Writer, + mut merger: M +) -> io::Result<()> { + let mut delta_writer = writer.into_delta_writer(); + let mut readers = vec![]; + let mut empty_key_values: Option = None; + for reader in unstarted_readers { + let mut delta_reader = reader.into_delta_reader(); + if delta_reader.advance()? { + if delta_reader.suffix().is_empty() { + if let Some(value_merger) = empty_key_values.as_mut() { + value_merger.add(delta_reader.value()); + } // the borrow checker does not allow an else here... that's a bit lame. + if empty_key_values.is_none() { + empty_key_values = Some(merger.new_value(delta_reader.value())); + } + if delta_reader.advance()? { + // duplicate keys are forbidden. + assert!(!delta_reader.suffix().is_empty()); + readers.push(delta_reader); + } + } else { + readers.push(delta_reader); + } + } + } + if let Some(value_merger) = empty_key_values { + delta_writer.write_delta(0, &[], &value_merger.finish())?; + } + + let mut queue = Queue::with_capacity(readers.len()); + + for (idx, delta_reader) in readers.iter().enumerate() { + queue.register(0u32, delta_reader.suffix()[0], idx); + } + + let mut current_ids = Vec::with_capacity(readers.len()); + while let Some(heap_item) = queue.pop(&mut current_ids) { + debug_assert!(!current_ids.is_empty()); + let (tie_ids, others) = pick_lowest_with_ties( + &readers[..], + |reader| reader.suffix_from(heap_item.common_prefix_len()), + &mut current_ids[..]); + { + let first_reader = &readers[tie_ids[0]]; + let suffix = first_reader.suffix_from(heap_item.common_prefix_len()); + if tie_ids.len() > 1 { + let mut single_value_merger = merger.new_value(first_reader.value()); + for &min_tie_id in &tie_ids[1..] { + single_value_merger.add(readers[min_tie_id].value()); + } + delta_writer.write_delta(heap_item.common_prefix_len(), + suffix, + &single_value_merger.finish())?; + } else { + delta_writer.write_delta(heap_item.common_prefix_len(), + suffix, + first_reader.value())?; + } + for &reader_id in others { + let reader = &readers[reader_id]; + let reader_suffix = reader.suffix_from(heap_item.common_prefix_len()); + let extra_common_prefix_len = common_prefix_len(reader_suffix, suffix); + let next_byte = reader_suffix[extra_common_prefix_len]; + queue.register(heap_item.common_prefix_len() as u32 + extra_common_prefix_len as u32, next_byte, reader_id) + } + } + for &tie_id in tie_ids { + let reader = &mut readers[tie_id]; + if reader.advance()? { + queue.register(reader.common_prefix_len() as u32, reader.suffix()[0], tie_id); + } + } + } + delta_writer.finalize()?; + Ok(()) +} + + + +#[cfg(test)] +mod tests { + use super::pick_lowest_with_ties; + + #[test] + fn test_pick_lowest_with_ties() { + { + let mut ids = [0,1,3,2,5,4]; + assert_eq!(pick_lowest_with_ties(&[1,4,3,7,1,3,5], |el| *el, &mut ids), + (&[0,4][..], &[3,2,5,1][..])); + } + { + let mut ids = [5,3,2,1,4]; + assert_eq!(pick_lowest_with_ties(&[1,4,3,7,1,3,5], |el| *el, &mut ids), + (&[4][..], &[2,3,1,5][..])); + } + } +} \ No newline at end of file diff --git a/sstable/src/merge/heap_merge.rs b/sstable/src/merge/heap_merge.rs new file mode 100644 index 000000000..1af307ed9 --- /dev/null +++ b/sstable/src/merge/heap_merge.rs @@ -0,0 +1,70 @@ + +use {SSTable, Reader, Writer}; + +use super::SingleValueMerger; +use super::ValueMerger; +use std::io; +use std::collections::BinaryHeap; +use std::cmp::Ordering; +use std::collections::binary_heap::PeekMut; + +struct HeapItem>(B); + +impl> Ord for HeapItem { + fn cmp(&self, other: &Self) -> Ordering { + other.0.as_ref().cmp(self.0.as_ref()) + } +} +impl> PartialOrd for HeapItem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(other.0.as_ref().cmp(self.0.as_ref())) + } +} + +impl> Eq for HeapItem {} +impl> PartialEq for HeapItem { + fn eq(&self, other: &Self) -> bool { + self.0.as_ref() == other.0.as_ref() + } +} + +pub fn merge_sstable>( + readers: Vec>, + mut writer: Writer, + mut merger: M) -> io::Result<()> { + let mut heap: BinaryHeap>> = BinaryHeap::with_capacity(readers.len()); + for mut reader in readers { + if reader.advance()? { + heap.push(HeapItem(reader)); + } + } + loop { + let len = heap.len(); + let mut value_merger; + if let Some(mut head) = heap.peek_mut() { + writer.write_key(head.0.key()); + value_merger = merger.new_value(head.0.value()); + if !head.0.advance()? { + PeekMut::pop(head); + } + } else { + break; + } + for _ in 0..len - 1 { + if let Some(mut head) = heap.peek_mut() { + if head.0.key() == writer.current_key() { + value_merger.add(head.0.value()); + if !head.0.advance()? { + PeekMut::pop(head) ; + } + continue; + } + } + break; + } + let value = value_merger.finish(); + writer.write_value(&value); + } + writer.finalize()?; + Ok(()) +} \ No newline at end of file diff --git a/sstable/src/merge/mod.rs b/sstable/src/merge/mod.rs new file mode 100644 index 000000000..6a8bda876 --- /dev/null +++ b/sstable/src/merge/mod.rs @@ -0,0 +1,112 @@ +mod fast_merge; +mod heap_merge; + +pub use self::fast_merge::merge_sstable; +pub use self::heap_merge::merge_sstable as merge_sstable_heap; + + +pub trait SingleValueMerger { + fn add(&mut self, v: &V); + fn finish(self) -> V; +} + +pub trait ValueMerger { + type TSingleValueMerger: SingleValueMerger; + fn new_value(&mut self, v: &V) -> Self::TSingleValueMerger; +} + +#[derive(Default)] +pub struct KeepFirst; + +pub struct FirstVal(V); + +impl ValueMerger for KeepFirst { + type TSingleValueMerger = FirstVal; + + fn new_value(&mut self, v: &V) -> FirstVal { + FirstVal(v.clone()) + } +} + +impl SingleValueMerger for FirstVal { + fn add(&mut self, _: &V) {} + + fn finish(self) -> V { + self.0 + } +} + +pub struct VoidMerge; +impl ValueMerger<()> for VoidMerge { + + type TSingleValueMerger = (); + + fn new_value(&mut self, _: &()) -> () { + () + } +} + +impl SingleValueMerger<()> for () { + fn add(&mut self, _: &()) {} + + fn finish(self) -> () { + () + } +} + +#[cfg(test)] +mod tests { + + use VoidSSTable; + use SSTable; + use super::VoidMerge; + use std::str; + use std::collections::BTreeSet; + + fn write_sstable(keys: &[&'static str]) -> Vec { + let mut buffer: Vec = vec![]; + { + let mut sstable_writer = VoidSSTable::writer(&mut buffer); + for &key in keys { + assert!(sstable_writer.write(key.as_bytes(), &()).is_ok()); + } + assert!(sstable_writer.finalize().is_ok()); + } + buffer + } + + fn merge_test_aux(arrs: &[&[&'static str]]) { + let sstables = arrs.iter() + .cloned() + .map(write_sstable) + .collect::>(); + let sstables_ref: Vec<&[u8]> = sstables.iter() + .map(|s| s.as_ref()) + .collect(); + let mut merged = BTreeSet::new(); + for &arr in arrs.iter() { + for &s in arr { + merged.insert(s.to_string()); + } + } + let mut w = Vec::new(); + assert!(VoidSSTable::merge(sstables_ref, &mut w, VoidMerge).is_ok()); + } + + #[test] + fn test_merge() { + merge_test_aux(&[]); + merge_test_aux(&[&["a"]]); + merge_test_aux(&[&["a","b"], &["ab"]]); // a, ab, b + merge_test_aux(&[&["a","b"], &["a", "b"]]); + merge_test_aux(&[ + &["happy", "hello", "payer", "tax"], + &["habitat", "hello", "zoo"], + &[], + &["a"], + ]); + merge_test_aux(&[&["a"]]); + merge_test_aux(&[&["a","b"], &["ab"]]); + merge_test_aux(&[&["a","b"], &["a", "b"]]); + } +} \ No newline at end of file diff --git a/sstable/src/value.rs b/sstable/src/value.rs new file mode 100644 index 000000000..0d3168385 --- /dev/null +++ b/sstable/src/value.rs @@ -0,0 +1,43 @@ +use std::io; +use BlockReader; + +pub trait ValueReader: Default { + + type Value; + + fn value(&self) -> &Self::Value; + + fn read(&mut self, reader: &mut BlockReader) -> io::Result<()>; +} + +pub trait ValueWriter: Default { + + type Value; + + fn write(&mut self, val: &Self::Value, writer: &mut Vec); +} + + +#[derive(Default)] +pub struct VoidReader; + +impl ValueReader for VoidReader { + type Value = (); + + fn value(&self) -> &Self::Value { + &() + } + + fn read(&mut self, _reader: &mut BlockReader) -> io::Result<()> { + Ok(()) + } +} + +#[derive(Default)] +pub struct VoidWriter; + +impl ValueWriter for VoidWriter { + type Value = (); + + fn write(&mut self, _: &Self::Value, _: &mut Vec) {} +} \ No newline at end of file diff --git a/sstable/src/vint.rs b/sstable/src/vint.rs new file mode 100644 index 000000000..4d398c7f3 --- /dev/null +++ b/sstable/src/vint.rs @@ -0,0 +1,63 @@ +const CONTINUE_BIT: u8 = 128u8; + +pub fn serialize(mut val: u64, buffer: &mut [u8]) -> usize { + for (i, b) in buffer.iter_mut().enumerate() { + let next_byte: u8 = (val & 127u64) as u8; + val = val >> 7; + if val == 0u64 { + *b = next_byte; + return i + 1; + } else { + *b = next_byte | CONTINUE_BIT; + } + } + 10 //< actually unreachable +} + +// super slow but we don't care +pub fn deserialize_read(buf: &[u8]) -> (usize, u64) { + let mut result = 0u64; + let mut shift = 0u64; + let mut consumed = 0; + + for &b in buf { + consumed += 1; + result |= u64::from(b % 128u8) << shift; + if b < CONTINUE_BIT { + break; + } + shift += 7; + } + (consumed, result) +} + + +#[cfg(test)] +mod tests { + use vint::serialize; + use vint::deserialize_read; + use std::u64; + + fn aux_test_int(val: u64, expect_len: usize) { + let mut buffer = [0u8; 14]; + assert_eq!(serialize(val, &mut buffer[..]), expect_len); + assert_eq!(deserialize_read(&buffer), (expect_len, val)); + } + + #[test] + fn test_vint() { + aux_test_int(0u64, 1); + aux_test_int(17u64, 1); + aux_test_int(127u64, 1); + aux_test_int(128u64, 2); + aux_test_int(123423418u64, 4); + for i in 1..63 { + let power_of_two = 1u64 << i; + aux_test_int(power_of_two + 1, (i / 7) + 1); + aux_test_int(power_of_two, (i / 7) + 1 ); + aux_test_int(power_of_two - 1, ((i-1) / 7) + 1); + } + aux_test_int(u64::MAX, 10); + } +} +