From 522ebdc674432bb8800b589aac2c98ad9dfddb62 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 30 May 2017 08:22:17 +0900 Subject: [PATCH 01/16] made ResultExt public --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 6ec7b33f2..5d13c8299 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -88,7 +88,7 @@ mod functional_test; #[macro_use] mod macros; -pub use error::{Error, ErrorKind}; +pub use error::{Error, ErrorKind, ResultExt}; /// Tantivy result. pub type Result = std::result::Result; From 1d5464351ddc6dcdd41e815c36d8779ed1ed60de Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 30 May 2017 16:03:09 +0900 Subject: [PATCH 02/16] generic read --- src/common/serialize.rs | 18 +++++++++--------- src/common/vint.rs | 2 +- src/postings/term_info.rs | 2 +- src/schema/field.rs | 2 +- src/schema/field_value.rs | 2 +- src/schema/value.rs | 2 +- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/common/serialize.rs b/src/common/serialize.rs index 471ac3a9c..ffd6982aa 100644 --- a/src/common/serialize.rs +++ b/src/common/serialize.rs @@ -8,14 +8,14 @@ use common::VInt; pub trait BinarySerializable: fmt::Debug + Sized { fn serialize(&self, writer: &mut Write) -> io::Result; - fn deserialize(reader: &mut Read) -> io::Result; + fn deserialize(reader: &mut R) -> io::Result; } impl BinarySerializable for () { fn serialize(&self, _: &mut Write) -> io::Result { Ok(0) } - fn deserialize(_: &mut Read) -> io::Result { + fn deserialize(_: &mut R) -> io::Result { Ok(()) } } @@ -28,7 +28,7 @@ impl BinarySerializable for Vec { } Ok(total_size) } - fn deserialize(reader: &mut Read) -> io::Result> { + fn deserialize(reader: &mut R) -> io::Result> { let num_items = try!(VInt::deserialize(reader)).val(); let mut items: Vec = Vec::with_capacity(num_items as usize); for _ in 0..num_items { @@ -44,7 +44,7 @@ impl BinarySerializable for fn serialize(&self, write: &mut Write) -> io::Result { Ok(try!(self.0.serialize(write)) + try!(self.1.serialize(write))) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { Ok((try!(Left::deserialize(reader)), try!(Right::deserialize(reader)))) } } @@ -54,7 +54,7 @@ impl BinarySerializable for u32 { writer.write_u32::(*self).map(|_| 4) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { reader.read_u32::() } } @@ -64,7 +64,7 @@ impl BinarySerializable for u64 { fn serialize(&self, writer: &mut Write) -> io::Result { writer.write_u64::(*self).map(|_| 8) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { reader.read_u64::() } } @@ -73,7 +73,7 @@ impl BinarySerializable for i64 { fn serialize(&self, writer: &mut Write) -> io::Result { writer.write_i64::(*self).map(|_| 8) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { reader.read_i64::() } } @@ -84,7 +84,7 @@ impl BinarySerializable for u8 { try!(writer.write_u8(*self)); Ok(1) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { reader.read_u8() } } @@ -98,7 +98,7 @@ impl BinarySerializable for String { Ok(size) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { let string_length = try!(VInt::deserialize(reader)).val() as usize; let mut result = String::with_capacity(string_length); try!(reader diff --git a/src/common/vint.rs b/src/common/vint.rs index 0563d8f8e..3201b8541 100644 --- a/src/common/vint.rs +++ b/src/common/vint.rs @@ -36,7 +36,7 @@ impl BinarySerializable for VInt { Ok(written) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { let mut bytes = reader.bytes(); let mut result = 0u64; let mut shift = 0u64; diff --git a/src/postings/term_info.rs b/src/postings/term_info.rs index fbcf9e05a..90d9d02f4 100644 --- a/src/postings/term_info.rs +++ b/src/postings/term_info.rs @@ -28,7 +28,7 @@ impl BinarySerializable for TermInfo { Ok(try!(self.doc_freq.serialize(writer)) + try!(self.postings_offset.serialize(writer)) + try!(self.positions_offset.serialize(writer))) } - fn deserialize(reader: &mut io::Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { let doc_freq = try!(u32::deserialize(reader)); let postings_offset = try!(u32::deserialize(reader)); let positions_offset = try!(u32::deserialize(reader)); diff --git a/src/schema/field.rs b/src/schema/field.rs index d73a66b34..e5489adf6 100644 --- a/src/schema/field.rs +++ b/src/schema/field.rs @@ -18,7 +18,7 @@ impl BinarySerializable for Field { self.0.serialize(writer) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { u32::deserialize(reader).map(Field) } } diff --git a/src/schema/field_value.rs b/src/schema/field_value.rs index 8202cc5ca..b6cd5469b 100644 --- a/src/schema/field_value.rs +++ b/src/schema/field_value.rs @@ -38,7 +38,7 @@ impl BinarySerializable for FieldValue { Ok(self.field.serialize(writer)? + self.value.serialize(writer)?) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { let field = Field::deserialize(reader)?; let value = Value::deserialize(reader)?; Ok(FieldValue::new(field, value)) diff --git a/src/schema/value.rs b/src/schema/value.rs index 8e5ee4153..139e23f13 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -148,7 +148,7 @@ mod binary_serialize { } Ok(written_size) } - fn deserialize(reader: &mut Read) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { let type_code = try!(u8::deserialize(reader)); match type_code { TEXT_CODE => { From 8d4778f94d67e0d71bc93ded4048d2a4f644de3c Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 31 May 2017 00:05:52 +0900 Subject: [PATCH 03/16] issue/181 BinarySerializable does not return the len + Generics over Read+Write --- src/common/bitpacker.rs | 17 ++--- .../streamdict => common}/counting_writer.rs | 0 src/common/mod.rs | 2 + src/common/serialize.rs | 67 +++++++++---------- src/common/vint.rs | 7 +- src/datastruct/skip/skiplist_builder.rs | 14 ++-- src/fastfield/reader.rs | 14 +++- src/fastfield/serializer.rs | 38 +++++------ src/postings/serializer.rs | 51 ++++++-------- src/postings/term_info.rs | 8 ++- src/schema/field.rs | 2 +- src/schema/field_value.rs | 5 +- src/schema/value.rs | 16 ++--- src/store/writer.rs | 28 ++++---- src/termdict/mod.rs | 2 - src/termdict/streamdict/mod.rs | 2 - src/termdict/streamdict/streamer.rs | 1 - src/termdict/streamdict/termdict.rs | 2 +- 18 files changed, 131 insertions(+), 145 deletions(-) rename src/{termdict/streamdict => common}/counting_writer.rs (100%) diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index b259bd335..f625e07fe 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -37,7 +37,6 @@ pub struct BitPacker { mini_buffer: u64, mini_buffer_written: usize, num_bits: usize, - written_size: usize, } impl BitPacker { @@ -46,7 +45,6 @@ impl BitPacker { mini_buffer: 0u64, mini_buffer_written: 0, num_bits: num_bits, - written_size: 0, } } @@ -54,14 +52,14 @@ impl BitPacker { let val_u64 = val as u64; if self.mini_buffer_written + self.num_bits > 64 { self.mini_buffer |= val_u64.wrapping_shl(self.mini_buffer_written as u32); - self.written_size += self.mini_buffer.serialize(output)?; + self.mini_buffer.serialize(output)?; self.mini_buffer = val_u64.wrapping_shr((64 - self.mini_buffer_written) as u32); self.mini_buffer_written = self.mini_buffer_written + (self.num_bits as usize) - 64; } else { self.mini_buffer |= val_u64 << self.mini_buffer_written; self.mini_buffer_written += self.num_bits; if self.mini_buffer_written == 64 { - self.written_size += self.mini_buffer.serialize(output)?; + self.mini_buffer.serialize(output)?; self.mini_buffer_written = 0; self.mini_buffer = 0u64; } @@ -74,18 +72,16 @@ impl BitPacker { let num_bytes = (self.mini_buffer_written + 7) / 8; let arr: [u8; 8] = unsafe { mem::transmute::(self.mini_buffer) }; output.write_all(&arr[..num_bytes])?; - self.written_size += num_bytes; self.mini_buffer_written = 0; } Ok(()) } - pub fn close(&mut self, output: &mut TWrite) -> io::Result { + pub fn close(&mut self, output: &mut TWrite) -> io::Result<()> { self.flush(output)?; // Padding the write file to simplify reads. output.write_all(&[0u8; 7])?; - self.written_size += 7; - Ok(self.written_size) + Ok(()) } } @@ -163,9 +159,8 @@ mod test { for &val in &vals { bitpacker.write(val, &mut data).unwrap(); } - let num_bytes = bitpacker.close(&mut data).unwrap(); - assert_eq!(num_bytes, (num_bits * len + 7) / 8 + 7); - assert_eq!(data.len(), num_bytes); + bitpacker.close(&mut data).unwrap(); + assert_eq!(data.len(), (num_bits * len + 7) / 8 + 7); let bitunpacker = BitUnpacker::new(data, num_bits); for (i, val) in vals.iter().enumerate() { assert_eq!(bitunpacker.get(i), *val); diff --git a/src/termdict/streamdict/counting_writer.rs b/src/common/counting_writer.rs similarity index 100% rename from src/termdict/streamdict/counting_writer.rs rename to src/common/counting_writer.rs diff --git a/src/common/mod.rs b/src/common/mod.rs index e8e9facdd..0af9d2417 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,6 +1,7 @@ mod serialize; mod timer; mod vint; +mod counting_writer; pub mod bitpacker; pub use self::serialize::BinarySerializable; @@ -8,6 +9,7 @@ pub use self::timer::Timing; pub use self::timer::TimerTree; pub use self::timer::OpenTimer; pub use self::vint::VInt; +pub use self::counting_writer::CountingWriter; use std::io; diff --git a/src/common/serialize.rs b/src/common/serialize.rs index ffd6982aa..ee86247c5 100644 --- a/src/common/serialize.rs +++ b/src/common/serialize.rs @@ -6,14 +6,16 @@ use std::io::Read; use std::io; use common::VInt; + + pub trait BinarySerializable: fmt::Debug + Sized { - fn serialize(&self, writer: &mut Write) -> io::Result; + fn serialize(&self, writer: &mut W) -> io::Result<()>; fn deserialize(reader: &mut R) -> io::Result; } impl BinarySerializable for () { - fn serialize(&self, _: &mut Write) -> io::Result { - Ok(0) + fn serialize(&self, _: &mut W) -> io::Result<()> { + Ok(()) } fn deserialize(_: &mut R) -> io::Result { Ok(()) @@ -21,18 +23,18 @@ impl BinarySerializable for () { } impl BinarySerializable for Vec { - fn serialize(&self, writer: &mut Write) -> io::Result { - let mut total_size = try!(VInt(self.len() as u64).serialize(writer)); + fn serialize(&self, writer: &mut W) -> io::Result<()> { + VInt(self.len() as u64).serialize(writer)?; for it in self { - total_size += try!(it.serialize(writer)); + it.serialize(writer)?; } - Ok(total_size) + Ok(()) } fn deserialize(reader: &mut R) -> io::Result> { - let num_items = try!(VInt::deserialize(reader)).val(); + let num_items = VInt::deserialize(reader)?.val(); let mut items: Vec = Vec::with_capacity(num_items as usize); for _ in 0..num_items { - let item = try!(T::deserialize(reader)); + let item = T::deserialize(reader)?; items.push(item); } Ok(items) @@ -41,17 +43,18 @@ impl BinarySerializable for Vec { impl BinarySerializable for (Left, Right) { - fn serialize(&self, write: &mut Write) -> io::Result { - Ok(try!(self.0.serialize(write)) + try!(self.1.serialize(write))) + fn serialize(&self, write: &mut W) -> io::Result<()> { + self.0.serialize(write)?; + self.1.serialize(write) } fn deserialize(reader: &mut R) -> io::Result { - Ok((try!(Left::deserialize(reader)), try!(Right::deserialize(reader)))) + Ok((Left::deserialize(reader)?, Right::deserialize(reader)?)) } } impl BinarySerializable for u32 { - fn serialize(&self, writer: &mut Write) -> io::Result { - writer.write_u32::(*self).map(|_| 4) + fn serialize(&self, writer: &mut W) -> io::Result<()> { + writer.write_u32::(*self) } fn deserialize(reader: &mut R) -> io::Result { @@ -61,28 +64,27 @@ impl BinarySerializable for u32 { impl BinarySerializable for u64 { - fn serialize(&self, writer: &mut Write) -> io::Result { - writer.write_u64::(*self).map(|_| 8) + fn serialize(&self, writer: &mut W) -> io::Result<()> { + writer.write_u64::(*self) } - fn deserialize(reader: &mut R) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { reader.read_u64::() } } impl BinarySerializable for i64 { - fn serialize(&self, writer: &mut Write) -> io::Result { - writer.write_i64::(*self).map(|_| 8) + fn serialize(&self, writer: &mut W) -> io::Result<()> { + writer.write_i64::(*self) } - fn deserialize(reader: &mut R) -> io::Result { + fn deserialize(reader: &mut R) -> io::Result { reader.read_i64::() } } impl BinarySerializable for u8 { - fn serialize(&self, writer: &mut Write) -> io::Result { - try!(writer.write_u8(*self)); - Ok(1) + fn serialize(&self, writer: &mut W) -> io::Result<()> { + writer.write_u8(*self) } fn deserialize(reader: &mut R) -> io::Result { reader.read_u8() @@ -90,20 +92,18 @@ impl BinarySerializable for u8 { } impl BinarySerializable for String { - fn serialize(&self, writer: &mut Write) -> io::Result { + fn serialize(&self, writer: &mut W) -> io::Result<()> { let data: &[u8] = self.as_bytes(); - let mut size = try!(VInt(data.len() as u64).serialize(writer)); - size += data.len(); - try!(writer.write_all(data)); - Ok(size) + VInt(data.len() as u64).serialize(writer)?; + writer.write_all(data) } fn deserialize(reader: &mut R) -> io::Result { - let string_length = try!(VInt::deserialize(reader)).val() as usize; + let string_length = VInt::deserialize(reader)?.val() as usize; let mut result = String::with_capacity(string_length); - try!(reader - .take(string_length as u64) - .read_to_string(&mut result)); + reader + .take(string_length as u64) + .read_to_string(&mut result)?; Ok(result) } } @@ -117,9 +117,8 @@ mod test { fn serialize_test(v: T, num_bytes: usize) { let mut buffer: Vec = Vec::new(); - if num_bytes != 0 { - assert_eq!(v.serialize(&mut buffer).unwrap(), num_bytes); + v.serialize(&mut buffer).unwrap(); assert_eq!(buffer.len(), num_bytes); } else { v.serialize(&mut buffer).unwrap(); diff --git a/src/common/vint.rs b/src/common/vint.rs index 3201b8541..c1a014599 100644 --- a/src/common/vint.rs +++ b/src/common/vint.rs @@ -16,10 +16,10 @@ impl VInt { } impl BinarySerializable for VInt { - fn serialize(&self, writer: &mut Write) -> io::Result { + fn serialize(&self, writer: &mut W) -> io::Result<()> { let mut remaining = self.0; - let mut written: usize = 0; let mut buffer = [0u8; 10]; + let mut written = 0; loop { let next_byte: u8 = (remaining % 128u64) as u8; remaining /= 128u64; @@ -32,8 +32,7 @@ impl BinarySerializable for VInt { written += 1; } } - try!(writer.write_all(&buffer[0..written])); - Ok(written) + writer.write_all(&buffer[0..written]) } fn deserialize(reader: &mut R) -> io::Result { diff --git a/src/datastruct/skip/skiplist_builder.rs b/src/datastruct/skip/skiplist_builder.rs index 34c5d8a48..eaa439d08 100644 --- a/src/datastruct/skip/skiplist_builder.rs +++ b/src/datastruct/skip/skiplist_builder.rs @@ -18,7 +18,7 @@ impl LayerBuilder { } fn write(&self, output: &mut Write) -> Result<(), io::Error> { - try!(output.write_all(&self.buffer)); + output.write_all(&self.buffer)?; Ok(()) } @@ -36,8 +36,8 @@ impl LayerBuilder { self.remaining -= 1; self.len += 1; let offset = self.written_size() as u32; - try!(doc_id.serialize(&mut self.buffer)); - try!(value.serialize(&mut self.buffer)); + doc_id.serialize(&mut self.buffer)?; + value.serialize(&mut self.buffer)?; Ok(if self.remaining == 0 { self.remaining = self.period; Some((doc_id, offset)) @@ -89,7 +89,7 @@ impl SkipListBuilder { } } - pub fn write(self, output: &mut Write) -> io::Result<()> { + pub fn write(self, output: &mut W) -> io::Result<()> { let mut size: u32 = 0; let mut layer_sizes: Vec = Vec::new(); size += self.data_layer.buffer.len() as u32; @@ -98,10 +98,10 @@ impl SkipListBuilder { size += layer.buffer.len() as u32; layer_sizes.push(size); } - try!(layer_sizes.serialize(output)); - try!(self.data_layer.write(output)); + layer_sizes.serialize(output)?; + self.data_layer.write(output)?; for layer in self.skip_layers.iter().rev() { - try!(layer.write(output)); + layer.write(output)?; } Ok(()) } diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index ee1fc1cd4..94f187344 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -12,6 +12,7 @@ use fastfield::FastFieldsWriter; use common::bitpacker::compute_num_bits; use common::bitpacker::BitUnpacker; use schema::FieldType; +use error::ResultExt; use common; use owning_ref::OwningRef; @@ -125,9 +126,16 @@ impl From> for U64FastFieldReader { fast_field_writers.serialize(&mut serializer).unwrap(); serializer.close().unwrap(); } - let source = directory.open_read(path).unwrap(); - let fast_field_readers = FastFieldsReader::from_source(source).unwrap(); - fast_field_readers.open_reader(field).unwrap() + directory + .open_read(path) + .chain_err(|| "Failed to open the file") + .and_then(|source| FastFieldsReader::from_source(source) + .chain_err(|| "Failed to read the file.")) + .and_then(|ff_readers| ff_readers + .open_reader(field) + .ok_or_else(|| {"Failed to find the requested field".into() })) + .expect("This should never happen, please report.") + } } diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs index 7f97b3b28..ef6ffedf9 100644 --- a/src/fastfield/serializer.rs +++ b/src/fastfield/serializer.rs @@ -2,9 +2,9 @@ use common::BinarySerializable; use directory::WritePtr; use schema::Field; use common::bitpacker::{compute_num_bits, BitPacker}; +use common::CountingWriter; use std::io::{self, Write, Seek, SeekFrom}; - /// `FastFieldSerializer` is in charge of serializing /// fastfields on disk. /// @@ -26,8 +26,7 @@ use std::io::{self, Write, Seek, SeekFrom}; /// * `close_field()` /// * `close()` pub struct FastFieldSerializer { - write: WritePtr, - written_size: usize, + write: CountingWriter, fields: Vec<(Field, u32)>, min_value: u64, field_open: bool, @@ -37,12 +36,12 @@ pub struct FastFieldSerializer { impl FastFieldSerializer { /// Constructor - pub fn new(mut write: WritePtr) -> io::Result { + pub fn new(write: WritePtr) -> io::Result { // just making room for the pointer to header. - let written_size: usize = try!(0u32.serialize(&mut write)); + let mut counting_writer = CountingWriter::wrap(write); + 0u32.serialize(&mut counting_writer)?; Ok(FastFieldSerializer { - write: write, - written_size: written_size, + write: counting_writer, fields: Vec::new(), min_value: 0, field_open: false, @@ -61,11 +60,11 @@ impl FastFieldSerializer { } self.min_value = min_value; self.field_open = true; - self.fields.push((field, self.written_size as u32)); - let write: &mut Write = &mut self.write; - self.written_size += try!(min_value.serialize(write)); + self.fields.push((field, self.write.written_bytes() as u32)); + let write = &mut self.write; + min_value.serialize(write)?; let amplitude = max_value - min_value; - self.written_size += try!(amplitude.serialize(write)); + amplitude.serialize(write)?; let num_bits = compute_num_bits(amplitude); self.bit_packer = BitPacker::new(num_bits as usize); Ok(()) @@ -88,7 +87,7 @@ impl FastFieldSerializer { // adding some padding to make sure we // can read the last elements with our u64 // cursor - self.written_size += self.bit_packer.close(&mut self.write)?; + self.bit_packer.close(&mut self.write)?; Ok(()) } @@ -96,15 +95,16 @@ impl FastFieldSerializer { /// Closes the serializer /// /// After this call the data must be persistently save on disk. - pub fn close(mut self) -> io::Result { + pub fn close(self) -> io::Result { if self.field_open { return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed")); } - let header_offset: usize = self.written_size; - self.written_size += try!(self.fields.serialize(&mut self.write)); - try!(self.write.seek(SeekFrom::Start(0))); - try!((header_offset as u32).serialize(&mut self.write)); - try!(self.write.flush()); - Ok(self.written_size) + let header_offset: usize = self.write.written_bytes() as usize; + let (mut write, written_size) = self.write.finish()?; + self.fields.serialize(&mut write)?; + write.seek(SeekFrom::Start(0))?; + (header_offset as u32).serialize(&mut write)?; + write.flush()?; + Ok(written_size) } } diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index d5e72fa68..4a3078a2f 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -10,12 +10,11 @@ use directory::WritePtr; use compression::{NUM_DOCS_PER_BLOCK, BlockEncoder, CompositeEncoder}; use DocId; use core::Segment; -use std::io; -use core::SegmentComponent; -use std::io::Write; +use std::io::{self, Write}; use compression::VIntEncoder; use common::VInt; use common::BinarySerializable; +use common::CountingWriter; use termdict::TermDictionaryBuilder; @@ -52,10 +51,8 @@ use termdict::TermDictionaryBuilder; /// [available here](https://fulmicoton.gitbooks.io/tantivy-doc/content/inverted-index.html). pub struct PostingsSerializer { terms_fst_builder: TermDictionaryBuilderImpl, - postings_write: WritePtr, - positions_write: WritePtr, - written_bytes_postings: usize, - written_bytes_positions: usize, + postings_write: CountingWriter, + positions_write: CountingWriter, last_doc_id_encoded: u32, positions_encoder: CompositeEncoder, block_encoder: BlockEncoder, @@ -78,10 +75,8 @@ impl PostingsSerializer { let terms_fst_builder = try!(TermDictionaryBuilderImpl::new(terms_write)); Ok(PostingsSerializer { terms_fst_builder: terms_fst_builder, - postings_write: postings_write, - positions_write: positions_write, - written_bytes_postings: 0, - written_bytes_positions: 0, + postings_write: CountingWriter::wrap(postings_write), + positions_write: CountingWriter::wrap(positions_write), last_doc_id_encoded: 0u32, positions_encoder: CompositeEncoder::new(), block_encoder: BlockEncoder::new(), @@ -98,12 +93,10 @@ impl PostingsSerializer { /// Open a new `PostingsSerializer` for the given segment pub fn open(segment: &mut Segment) -> Result { - let terms_write = try!(segment.open_write(SegmentComponent::TERMS)); - let postings_write = try!(segment.open_write(SegmentComponent::POSTINGS)); - let positions_write = try!(segment.open_write(SegmentComponent::POSITIONS)); - PostingsSerializer::new(terms_write, - postings_write, - positions_write, + use SegmentComponent::{TERMS, POSTINGS, POSITIONS}; + PostingsSerializer::new(segment.open_write(TERMS)?, + segment.open_write(POSTINGS)?, + segment.open_write(POSITIONS)?, segment.schema()) } @@ -141,8 +134,8 @@ impl PostingsSerializer { self.position_deltas.clear(); self.current_term_info = TermInfo { doc_freq: 0, - postings_offset: self.written_bytes_postings as u32, - positions_offset: self.written_bytes_positions as u32, + postings_offset: self.postings_write.written_bytes() as u32, + positions_offset: self.positions_write.written_bytes() as u32, }; self.terms_fst_builder.insert_key(term) } @@ -168,8 +161,7 @@ impl PostingsSerializer { let block_encoded = self.block_encoder .compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded); - self.written_bytes_postings += block_encoded.len(); - try!(self.postings_write.write_all(block_encoded)); + self.postings_write.write_all(block_encoded)?; self.doc_ids.clear(); } // ... Idem for term frequencies @@ -177,8 +169,7 @@ impl PostingsSerializer { let block_encoded = self.block_encoder .compress_vint_unsorted(&self.term_freqs[..]); for num in block_encoded { - self.written_bytes_postings += - try!(num.serialize(&mut self.postings_write)); + num.serialize(&mut self.postings_write)?; } self.term_freqs.clear(); } @@ -186,13 +177,11 @@ impl PostingsSerializer { // On the other hand, positions are entirely buffered until the // end of the term, at which point they are compressed and written. if self.text_indexing_options.is_position_enabled() { - self.written_bytes_positions += - try!(VInt(self.position_deltas.len() as u64) - .serialize(&mut self.positions_write)); + let posdelta_len = VInt(self.position_deltas.len() as u64); + posdelta_len.serialize(&mut self.positions_write)?; let positions_encoded: &[u8] = self.positions_encoder .compress_unsorted(&self.position_deltas[..]); - try!(self.positions_write.write_all(positions_encoded)); - self.written_bytes_positions += positions_encoded.len(); + self.positions_write.write_all(positions_encoded)?; self.position_deltas.clear(); } self.term_open = false; @@ -230,15 +219,13 @@ impl PostingsSerializer { self.block_encoder .compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded); self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1]; - try!(self.postings_write.write_all(block_encoded)); - self.written_bytes_postings += block_encoded.len(); + self.postings_write.write_all(block_encoded)?; } if self.text_indexing_options.is_termfreq_enabled() { // encode the term_freqs let block_encoded: &[u8] = self.block_encoder .compress_block_unsorted(&self.term_freqs); - try!(self.postings_write.write_all(block_encoded)); - self.written_bytes_postings += block_encoded.len(); + self.postings_write.write_all(block_encoded)?; self.term_freqs.clear(); } self.doc_ids.clear(); diff --git a/src/postings/term_info.rs b/src/postings/term_info.rs index 90d9d02f4..d639e9afb 100644 --- a/src/postings/term_info.rs +++ b/src/postings/term_info.rs @@ -24,10 +24,12 @@ pub struct TermInfo { impl BinarySerializable for TermInfo { - fn serialize(&self, writer: &mut io::Write) -> io::Result { - Ok(try!(self.doc_freq.serialize(writer)) + try!(self.postings_offset.serialize(writer)) + - try!(self.positions_offset.serialize(writer))) + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.doc_freq.serialize(writer)?; + self.postings_offset.serialize(writer)?; + self.positions_offset.serialize(writer) } + fn deserialize(reader: &mut R) -> io::Result { let doc_freq = try!(u32::deserialize(reader)); let postings_offset = try!(u32::deserialize(reader)); diff --git a/src/schema/field.rs b/src/schema/field.rs index e5489adf6..9df8e149b 100644 --- a/src/schema/field.rs +++ b/src/schema/field.rs @@ -14,7 +14,7 @@ use common::BinarySerializable; pub struct Field(pub u32); impl BinarySerializable for Field { - fn serialize(&self, writer: &mut Write) -> io::Result { + fn serialize(&self, writer: &mut W) -> io::Result<()> { self.0.serialize(writer) } diff --git a/src/schema/field_value.rs b/src/schema/field_value.rs index b6cd5469b..5b7359fd1 100644 --- a/src/schema/field_value.rs +++ b/src/schema/field_value.rs @@ -34,8 +34,9 @@ impl FieldValue { } impl BinarySerializable for FieldValue { - fn serialize(&self, writer: &mut Write) -> io::Result { - Ok(self.field.serialize(writer)? + self.value.serialize(writer)?) + fn serialize(&self, writer: &mut W) -> io::Result<()> { + self.field.serialize(writer)?; + self.value.serialize(writer) } fn deserialize(reader: &mut R) -> io::Result { diff --git a/src/schema/value.rs b/src/schema/value.rs index 139e23f13..ad24688ee 100644 --- a/src/schema/value.rs +++ b/src/schema/value.rs @@ -130,23 +130,21 @@ mod binary_serialize { const I64_CODE: u8 = 2; impl BinarySerializable for Value { - fn serialize(&self, writer: &mut Write) -> io::Result { - let mut written_size = 0; + fn serialize(&self, writer: &mut W) -> io::Result<()> { match *self { Value::Str(ref text) => { - written_size += try!(TEXT_CODE.serialize(writer)); - written_size += try!(text.serialize(writer)); + TEXT_CODE.serialize(writer)?; + text.serialize(writer) } Value::U64(ref val) => { - written_size += try!(U64_CODE.serialize(writer)); - written_size += try!(val.serialize(writer)); + U64_CODE.serialize(writer)?; + val.serialize(writer) } Value::I64(ref val) => { - written_size += try!(I64_CODE.serialize(writer)); - written_size += try!(val.serialize(writer)); + I64_CODE.serialize(writer)?; + val.serialize(writer) } } - Ok(written_size) } fn deserialize(reader: &mut R) -> io::Result { let type_code = try!(u8::deserialize(reader)); diff --git a/src/store/writer.rs b/src/store/writer.rs index c6f1e492e..28befa7af 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -5,6 +5,7 @@ use common::BinarySerializable; use std::io::{self, Write}; use lz4; use datastruct::SkipListBuilder; +use common::CountingWriter; const BLOCK_SIZE: usize = 16_384; @@ -19,9 +20,8 @@ const BLOCK_SIZE: usize = 16_384; /// pub struct StoreWriter { doc: DocId, - written: u64, offset_index_writer: SkipListBuilder, - writer: WritePtr, + writer: CountingWriter, intermediary_buffer: Vec, current_block: Vec, } @@ -35,9 +35,8 @@ impl StoreWriter { pub fn new(writer: WritePtr) -> StoreWriter { StoreWriter { doc: 0, - written: 0, offset_index_writer: SkipListBuilder::new(3), - writer: writer, + writer: CountingWriter::wrap(writer), intermediary_buffer: Vec::new(), current_block: Vec::new(), } @@ -54,11 +53,12 @@ impl StoreWriter { for field_value in field_values { try!((*field_value).serialize(&mut self.intermediary_buffer)); } - try!((self.intermediary_buffer.len() as u32).serialize(&mut self.current_block)); - try!(self.current_block.write_all(&self.intermediary_buffer[..])); + (self.intermediary_buffer.len() as u32) + .serialize(&mut self.current_block)?; + self.current_block.write_all(&self.intermediary_buffer[..])?; self.doc += 1; if self.current_block.len() > BLOCK_SIZE { - try!(self.write_and_compress_block()); + self.write_and_compress_block()?; } Ok(()) } @@ -71,11 +71,11 @@ impl StoreWriter { let (_, encoder_result) = encoder.finish(); try!(encoder_result); } - let compressed_block_size = self.intermediary_buffer.len() as u64; - self.written += try!((compressed_block_size as u32).serialize(&mut self.writer)) as u64; - try!(self.writer.write_all(&self.intermediary_buffer)); - self.written += compressed_block_size; - try!(self.offset_index_writer.insert(self.doc, &self.written)); + (self.intermediary_buffer.len() as u32) + .serialize(&mut self.writer)?; + self.writer.write_all(&self.intermediary_buffer)?; + self.offset_index_writer + .insert(self.doc, &(self.writer.written_bytes() as u64))?; self.current_block.clear(); Ok(()) } @@ -89,9 +89,9 @@ impl StoreWriter { if !self.current_block.is_empty() { try!(self.write_and_compress_block()); } - let header_offset: u64 = self.written; + let header_offset: u64 = self.writer.written_bytes() as u64; try!(self.offset_index_writer - .write::>(&mut self.writer)); + .write(&mut self.writer)); try!(header_offset.serialize(&mut self.writer)); try!(self.doc.serialize(&mut self.writer)); self.writer.flush() diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 9ec324776..7e27216e9 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -411,7 +411,6 @@ mod tests { { for i in (0..20).chain((BLOCK_SIZE - 10..BLOCK_SIZE + 10)) { for j in 0..3 { - println!("i {} j {}", i, j); let &(ref fst_key, _) = &ids[i]; let &(ref last_key, _) = &ids[i + j]; let mut streamer = term_dictionary @@ -420,7 +419,6 @@ mod tests { .lt(last_key.as_bytes()) .into_stream(); for _ in 0..j { - println!("ij"); assert!(streamer.next().is_some()); } assert!(streamer.next().is_none()); diff --git a/src/termdict/streamdict/mod.rs b/src/termdict/streamdict/mod.rs index 66c3eb97c..90b719dda 100644 --- a/src/termdict/streamdict/mod.rs +++ b/src/termdict/streamdict/mod.rs @@ -1,9 +1,7 @@ mod termdict; mod streamer; -mod counting_writer; -use self::counting_writer::CountingWriter; pub use self::termdict::TermDictionaryImpl; pub use self::termdict::TermDictionaryBuilderImpl; pub use self::streamer::TermStreamerImpl; diff --git a/src/termdict/streamdict/streamer.rs b/src/termdict/streamdict/streamer.rs index dd27a2bcf..55167d468 100644 --- a/src/termdict/streamdict/streamer.rs +++ b/src/termdict/streamdict/streamer.rs @@ -4,7 +4,6 @@ use std::cmp::max; use common::BinarySerializable; use super::TermDictionaryImpl; use termdict::{TermStreamerBuilder, TermStreamer}; -use std::io::Read; pub(crate) fn stream_before<'a, V>(term_dictionary: &'a TermDictionaryImpl, target_key: &[u8]) diff --git a/src/termdict/streamdict/termdict.rs b/src/termdict/streamdict/termdict.rs index a677854f3..5759ce1e2 100644 --- a/src/termdict/streamdict/termdict.rs +++ b/src/termdict/streamdict/termdict.rs @@ -7,7 +7,7 @@ use common::VInt; use directory::ReadOnlySource; use common::BinarySerializable; use std::marker::PhantomData; -use super::CountingWriter; +use common::CountingWriter; use std::cmp::Ordering; use postings::TermInfo; use fst::raw::Node; From 0521844e5689ef90a92b229fcd645e83ad1e7c3f Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 31 May 2017 08:22:53 +0900 Subject: [PATCH 04/16] Format, small changes in VInt --- src/common/vint.rs | 13 ++++++------- src/fastfield/reader.rs | 18 +++++++++++------- src/termdict/merger.rs | 2 +- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/common/vint.rs b/src/common/vint.rs index c1a014599..39653e8a7 100644 --- a/src/common/vint.rs +++ b/src/common/vint.rs @@ -19,20 +19,19 @@ impl BinarySerializable for VInt { fn serialize(&self, writer: &mut W) -> io::Result<()> { let mut remaining = self.0; let mut buffer = [0u8; 10]; - let mut written = 0; + let mut i = 0; loop { let next_byte: u8 = (remaining % 128u64) as u8; remaining /= 128u64; if remaining == 0u64 { - buffer[written] = next_byte | 128u8; - written += 1; - break; + buffer[i] = next_byte | 128u8; + return writer.write_all(&buffer[0..i + 1]); } else { - buffer[written] = next_byte; - written += 1; + buffer[i] = next_byte; } + i += 1; } - writer.write_all(&buffer[0..written]) + } fn deserialize(reader: &mut R) -> io::Result { diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 94f187344..7c01d0a8f 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -128,14 +128,18 @@ impl From> for U64FastFieldReader { } directory .open_read(path) - .chain_err(|| "Failed to open the file") - .and_then(|source| FastFieldsReader::from_source(source) - .chain_err(|| "Failed to read the file.")) - .and_then(|ff_readers| ff_readers - .open_reader(field) - .ok_or_else(|| {"Failed to find the requested field".into() })) + .chain_err(|| "Failed to open the file") + .and_then(|source| { + FastFieldsReader::from_source(source) + .chain_err(|| "Failed to read the file.") + }) + .and_then(|ff_readers| { + ff_readers + .open_reader(field) + .ok_or_else(|| "Failed to find the requested field".into()) + }) .expect("This should never happen, please report.") - + } } diff --git a/src/termdict/merger.rs b/src/termdict/merger.rs index 6967be9c7..4689e0673 100644 --- a/src/termdict/merger.rs +++ b/src/termdict/merger.rs @@ -48,7 +48,6 @@ impl<'a, V> Ord for HeapItem<'a, V> /// - the term /// - a slice with the ordinal of the segments containing /// the terms. -#[allow(should_implement_trait)] pub struct TermMerger<'a, V> where V: 'a + BinarySerializable + Default { @@ -131,6 +130,7 @@ impl<'a, V> TermMerger<'a, V> } /// Iterates through terms + #[allow(should_implement_trait)] pub fn next(&mut self) -> Option> { if self.advance() { Some(Term::wrap(self.current_streamers[0].streamer.key())) From 19c073385affaa67fc62d5fa9e935d55724f45c3 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 22 May 2017 22:16:09 +0900 Subject: [PATCH 05/16] Better intersection and added size_hint --- src/postings/docset.rs | 14 +++++ src/postings/intersection.rs | 69 +++++++++++++++-------- src/postings/segment_postings.rs | 4 ++ src/postings/vec_postings.rs | 4 ++ src/query/boolean_query/boolean_scorer.rs | 12 ++++ src/query/phrase_query/phrase_scorer.rs | 4 ++ src/query/scorer.rs | 4 ++ src/query/term_query/term_scorer.rs | 5 ++ 8 files changed, 91 insertions(+), 25 deletions(-) diff --git a/src/postings/docset.rs b/src/postings/docset.rs index e28319f42..ea4211a5f 100644 --- a/src/postings/docset.rs +++ b/src/postings/docset.rs @@ -65,6 +65,10 @@ pub trait DocSet { None } } + + /// Returns a best-effort hint of the + /// length of the docset. + fn size_hint(&self) -> usize; } @@ -83,6 +87,11 @@ impl DocSet for Box { let unboxed: &TDocSet = self.borrow(); unboxed.doc() } + + fn size_hint(&self) -> usize { + let unboxed: &TDocSet = self.borrow(); + unboxed.size_hint() + } } impl<'a, TDocSet: DocSet> DocSet for &'a mut TDocSet { @@ -100,4 +109,9 @@ impl<'a, TDocSet: DocSet> DocSet for &'a mut TDocSet { let unref: &TDocSet = *self; unref.doc() } + + fn size_hint(&self) -> usize { + let unref: &TDocSet = *self; + unref.size_hint() + } } diff --git a/src/postings/intersection.rs b/src/postings/intersection.rs index e4e4c2308..3f30a54c7 100644 --- a/src/postings/intersection.rs +++ b/src/postings/intersection.rs @@ -10,8 +10,9 @@ pub struct IntersectionDocSet { } impl From> for IntersectionDocSet { - fn from(docsets: Vec) -> IntersectionDocSet { + fn from(mut docsets: Vec) -> IntersectionDocSet { assert!(docsets.len() >= 2); + docsets.sort_by_key(|docset| docset.size_hint()); IntersectionDocSet { docsets: docsets, finished: false, @@ -31,37 +32,55 @@ impl IntersectionDocSet { impl DocSet for IntersectionDocSet { + fn size_hint(&self) -> usize { + self.docsets + .iter() + .map(|docset| docset.size_hint()) + .min() + .unwrap() // safe as docsets cannot be empty. + } + + #[allow(never_loop)] fn advance(&mut self) -> bool { if self.finished { return false; } - let num_docsets = self.docsets.len(); - let mut count_matching = 0; - let mut doc_candidate = 0; - let mut ord = 0; - loop { - let mut doc_set = &mut self.docsets[ord]; - match doc_set.skip_next(doc_candidate) { - SkipResult::Reached => { - count_matching += 1; - if count_matching == num_docsets { - self.doc = doc_candidate; - return true; + let (head_arr, tail) = self.docsets.split_at_mut(1); + let head: &mut TDocSet = &mut head_arr[0]; + if !head.advance() { + self.finished = true; + return false; + } + let mut doc_candidate = head.doc(); + + 'outer: loop { + + for docset in tail.iter_mut() { + match docset.skip_next(doc_candidate) { + SkipResult::Reached => {} + SkipResult::OverStep => { + doc_candidate = docset.doc(); + match head.skip_next(doc_candidate) { + SkipResult::Reached => {} + SkipResult::End => { + self.finished = true; + return false; + } + SkipResult::OverStep => { + doc_candidate = head.doc(); + continue 'outer; + } + } + } + SkipResult::End => { + self.finished = true; + return false; } } - SkipResult::End => { - self.finished = true; - return false; - } - SkipResult::OverStep => { - count_matching = 1; - doc_candidate = doc_set.doc(); - } - } - ord += 1; - if ord == num_docsets { - ord = 0; } + + self.doc = doc_candidate; + return true; } } diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index d6386d138..f42922629 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -152,6 +152,10 @@ impl<'a> DocSet for SegmentPostings<'a> { } } + fn size_hint(&self) -> usize { + self.len() + } + #[inline] fn doc(&self) -> DocId { let docs = self.block_cursor.docs(); diff --git a/src/postings/vec_postings.rs b/src/postings/vec_postings.rs index 399307cff..8c9512fb1 100644 --- a/src/postings/vec_postings.rs +++ b/src/postings/vec_postings.rs @@ -34,6 +34,10 @@ impl DocSet for VecPostings { fn doc(&self) -> DocId { self.doc_ids[self.cursor.0] } + + fn size_hint(&self) -> usize { + self.len() + } } impl HasLen for VecPostings { diff --git a/src/query/boolean_query/boolean_scorer.rs b/src/query/boolean_query/boolean_scorer.rs index 8e1bf5950..595f54219 100644 --- a/src/query/boolean_query/boolean_scorer.rs +++ b/src/query/boolean_query/boolean_scorer.rs @@ -93,6 +93,18 @@ impl BooleanScorer { } impl DocSet for BooleanScorer { + fn size_hint(&self) -> usize { + // TODO fix this. it should be the min + // of the MUST scorer + // and the max of the SHOULD scorers. + self.scorers + .iter() + .map(|scorer| scorer.size_hint()) + .max() + .unwrap() + } + + fn advance(&mut self) -> bool { loop { self.score_combiner.clear(); diff --git a/src/query/phrase_query/phrase_scorer.rs b/src/query/phrase_query/phrase_scorer.rs index 23721037b..1726340d1 100644 --- a/src/query/phrase_query/phrase_scorer.rs +++ b/src/query/phrase_query/phrase_scorer.rs @@ -67,6 +67,10 @@ impl<'a> DocSet for PhraseScorer<'a> { fn doc(&self) -> DocId { self.intersection_docset.doc() } + + fn size_hint(&self) -> usize { + self.intersection_docset.size_hint() + } } diff --git a/src/query/scorer.rs b/src/query/scorer.rs index 027af82de..e3f677edf 100644 --- a/src/query/scorer.rs +++ b/src/query/scorer.rs @@ -49,6 +49,10 @@ impl DocSet for EmptyScorer { fn doc(&self) -> DocId { DocId::max_value() } + + fn size_hint(&self) -> usize { + 0 + } } impl Scorer for EmptyScorer { diff --git a/src/query/term_query/term_scorer.rs b/src/query/term_query/term_scorer.rs index 0819aeb58..73ea46b4b 100644 --- a/src/query/term_query/term_scorer.rs +++ b/src/query/term_query/term_scorer.rs @@ -32,6 +32,11 @@ impl DocSet for TermScorer fn doc(&self) -> DocId { self.postings.doc() } + + + fn size_hint(&self) -> usize { + self.postings.size_hint() + } } impl Scorer for TermScorer From 63867a7150cdf70870481ed2c24293e18ecdcb95 Mon Sep 17 00:00:00 2001 From: Laurentiu Nicola Date: Tue, 23 May 2017 21:07:19 +0300 Subject: [PATCH 06/16] Fix document generation for posting benchmarks --- src/postings/mod.rs | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 647a7ae37..c1e83dd02 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -415,23 +415,15 @@ mod tests { let mut rng: XorShiftRng = XorShiftRng::from_seed(*seed); let index = Index::create_in_ram(schema); - let mut count_a = 0; - let mut count_b = 0; - let posting_list_size = 100_000; + let posting_list_size = 1_000_000; { let mut index_writer = index.writer_with_num_threads(1, 40_000_000).unwrap(); - for _ in 0 .. { - if count_a >= posting_list_size && - count_b >= posting_list_size { - break; - } + for _ in 0 .. posting_list_size { let mut doc = Document::default(); - if count_a < posting_list_size && rng.gen_weighted_bool(15) { - count_a += 1; + if rng.gen_weighted_bool(15) { doc.add_text(text_field, "a"); } - if count_b < posting_list_size && rng.gen_weighted_bool(10) { - count_b += 1; + if rng.gen_weighted_bool(10) { doc.add_text(text_field, "b"); } index_writer.add_document(doc); From 69525cb3c7890a92bc5ce9a3becf8de3f4be4d00 Mon Sep 17 00:00:00 2001 From: Laurentiu Nicola Date: Tue, 23 May 2017 21:30:18 +0300 Subject: [PATCH 07/16] Add extra intersection test --- src/postings/mod.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/postings/mod.rs b/src/postings/mod.rs index c1e83dd02..ca32219fd 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -394,6 +394,13 @@ mod tests { assert_eq!(intersection.doc(), 9); assert!(!intersection.advance()); } + { + let a = VecPostings::from(vec![1, 3]); + let b = VecPostings::from(vec![1, 4]); + let c = VecPostings::from(vec![3, 9]); + let mut intersection = IntersectionDocSet::from(vec![a, b, c]); + assert!(!intersection.advance()); + } } From 97a051996f87d6758988bec7efe96ce01ce11d28 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 24 May 2017 13:26:05 +0900 Subject: [PATCH 08/16] issue 171. Hopefully bugfix? --- src/postings/intersection.rs | 100 +++++++++++++++++++++++++---------- src/postings/mod.rs | 30 ----------- 2 files changed, 72 insertions(+), 58 deletions(-) diff --git a/src/postings/intersection.rs b/src/postings/intersection.rs index 3f30a54c7..28decb040 100644 --- a/src/postings/intersection.rs +++ b/src/postings/intersection.rs @@ -16,7 +16,7 @@ impl From> for IntersectionDocSet { IntersectionDocSet { docsets: docsets, finished: false, - doc: DocId::max_value(), + doc: 0u32, } } } @@ -45,41 +45,37 @@ impl DocSet for IntersectionDocSet { if self.finished { return false; } - let (head_arr, tail) = self.docsets.split_at_mut(1); - let head: &mut TDocSet = &mut head_arr[0]; - if !head.advance() { - self.finished = true; - return false; - } - let mut doc_candidate = head.doc(); + + let mut candidate_doc = self.doc; + let mut candidate_ord = self.docsets.len(); 'outer: loop { - for docset in tail.iter_mut() { - match docset.skip_next(doc_candidate) { - SkipResult::Reached => {} - SkipResult::OverStep => { - doc_candidate = docset.doc(); - match head.skip_next(doc_candidate) { - SkipResult::Reached => {} - SkipResult::End => { - self.finished = true; - return false; - } - SkipResult::OverStep => { - doc_candidate = head.doc(); - continue 'outer; - } + for (ord, docset) in self.docsets.iter_mut().enumerate() { + if ord != candidate_ord { + // Candidate_ord is already at the + // right position. + // + // Calling `skip_next` would advance this docset + // and miss it. + match docset.skip_next(candidate_doc) { + SkipResult::Reached => {} + SkipResult::OverStep => { + // this is not in the intersection, + // let's update our candidate. + candidate_doc = docset.doc(); + candidate_ord = ord; + continue 'outer; + } + SkipResult::End => { + self.finished = true; + return false; } - } - SkipResult::End => { - self.finished = true; - return false; } } } - self.doc = doc_candidate; + self.doc = candidate_doc; return true; } } @@ -88,3 +84,51 @@ impl DocSet for IntersectionDocSet { self.doc } } + + +#[cfg(test)] +mod tests { + + use postings::{DocSet, VecPostings, IntersectionDocSet}; + + #[test] + fn test_intersection() { + { + let left = VecPostings::from(vec![1, 3, 9]); + let right = VecPostings::from(vec![3, 4, 9, 18]); + let mut intersection = IntersectionDocSet::from(vec![left, right]); + assert!(intersection.advance()); + assert_eq!(intersection.doc(), 3); + assert!(intersection.advance()); + assert_eq!(intersection.doc(), 9); + assert!(!intersection.advance()); + } + { + let a = VecPostings::from(vec![1, 3, 9]); + let b = VecPostings::from(vec![3, 4, 9, 18]); + let c = VecPostings::from(vec![1, 5, 9, 111]); + let mut intersection = IntersectionDocSet::from(vec![a, b, c]); + assert!(intersection.advance()); + assert_eq!(intersection.doc(), 9); + assert!(!intersection.advance()); + } + } + + #[test] + fn test_intersection_zero() { + let left = VecPostings::from(vec![0]); + let right = VecPostings::from(vec![0]); + let mut intersection = IntersectionDocSet::from(vec![left, right]); + assert!(intersection.advance()); + assert_eq!(intersection.doc(), 0); + } + + #[test] + fn test_intersection_empty() { + let a = VecPostings::from(vec![1, 3]); + let b = VecPostings::from(vec![1, 4]); + let c = VecPostings::from(vec![3, 9]); + let mut intersection = IntersectionDocSet::from(vec![a, b, c]); + assert!(!intersection.advance()); + } +} diff --git a/src/postings/mod.rs b/src/postings/mod.rs index ca32219fd..fef99d8a2 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -373,36 +373,6 @@ mod tests { } } - #[test] - fn test_intersection() { - { - let left = VecPostings::from(vec![1, 3, 9]); - let right = VecPostings::from(vec![3, 4, 9, 18]); - let mut intersection = IntersectionDocSet::from(vec![left, right]); - assert!(intersection.advance()); - assert_eq!(intersection.doc(), 3); - assert!(intersection.advance()); - assert_eq!(intersection.doc(), 9); - assert!(!intersection.advance()); - } - { - let a = VecPostings::from(vec![1, 3, 9]); - let b = VecPostings::from(vec![3, 4, 9, 18]); - let c = VecPostings::from(vec![1, 5, 9, 111]); - let mut intersection = IntersectionDocSet::from(vec![a, b, c]); - assert!(intersection.advance()); - assert_eq!(intersection.doc(), 9); - assert!(!intersection.advance()); - } - { - let a = VecPostings::from(vec![1, 3]); - let b = VecPostings::from(vec![1, 4]); - let c = VecPostings::from(vec![3, 9]); - let mut intersection = IntersectionDocSet::from(vec![a, b, c]); - assert!(!intersection.advance()); - } - } - lazy_static! { static ref TERM_A: Term = { From a35a8638cc5ca39ebec7f988499d814004dc3ebe Mon Sep 17 00:00:00 2001 From: Laurentiu Nicola Date: Wed, 24 May 2017 13:24:34 +0300 Subject: [PATCH 09/16] Comment nit --- src/postings/intersection.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/postings/intersection.rs b/src/postings/intersection.rs index 28decb040..06bc0b94e 100644 --- a/src/postings/intersection.rs +++ b/src/postings/intersection.rs @@ -53,7 +53,7 @@ impl DocSet for IntersectionDocSet { for (ord, docset) in self.docsets.iter_mut().enumerate() { if ord != candidate_ord { - // Candidate_ord is already at the + // `candidate_ord` is already at the // right position. // // Calling `skip_next` would advance this docset From a94679d74d6847f827af4932b540038cf7956f6c Mon Sep 17 00:00:00 2001 From: Laurentiu Nicola Date: Tue, 30 May 2017 15:41:11 +0300 Subject: [PATCH 10/16] Use four terms in the intersection bench --- src/postings/mod.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/postings/mod.rs b/src/postings/mod.rs index fef99d8a2..5de1f7a28 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -383,6 +383,14 @@ mod tests { let field = Field(0); Term::from_field_text(field, "b") }; + static ref TERM_C: Term = { + let field = Field(0); + Term::from_field_text(field, "c") + }; + static ref TERM_D: Term = { + let field = Field(0); + Term::from_field_text(field, "d") + }; static ref INDEX: Index = { let mut schema_builder = SchemaBuilder::default(); let text_field = schema_builder.add_text_field("text", STRING); @@ -403,6 +411,12 @@ mod tests { if rng.gen_weighted_bool(10) { doc.add_text(text_field, "b"); } + if rng.gen_weighted_bool(5) { + doc.add_text(text_field, "c"); + } + if rng.gen_weighted_bool(1) { + doc.add_text(text_field, "d"); + } index_writer.add_document(doc); } assert!(index_writer.commit().is_ok()); @@ -436,8 +450,16 @@ mod tests { let segment_postings_b = segment_reader .read_postings(&*TERM_B, SegmentPostingsOption::NoFreq) .unwrap(); + let segment_postings_c = segment_reader + .read_postings(&*TERM_C, SegmentPostingsOption::NoFreq) + .unwrap(); + let segment_postings_d = segment_reader + .read_postings(&*TERM_D, SegmentPostingsOption::NoFreq) + .unwrap(); let mut intersection = IntersectionDocSet::from(vec![segment_postings_a, - segment_postings_b]); + segment_postings_b, + segment_postings_c, + segment_postings_d]); while intersection.advance() {} }); } From 5aa45654241172ee2c5e082d2e632fed52479392 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 5 Jun 2017 22:49:53 +0900 Subject: [PATCH 11/16] Tiny cleaning --- src/common/bitpacker.rs | 5 +++-- src/datastruct/stacker/expull.rs | 8 +++++++- src/directory/mmap_directory.rs | 1 - src/fastfield/mod.rs | 4 ---- src/postings/postings_writer.rs | 3 ++- src/schema/schema.rs | 2 -- 6 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index f625e07fe..7306413db 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -123,8 +123,9 @@ impl BitUnpacker let bit_shift = addr_in_bits & 7; debug_assert!(addr + 8 <= data.len(), "The fast field field should have been padded with 7 bytes."); - let val_unshifted_unmasked: u64 = - unsafe { *(data.as_ptr().offset(addr as isize) as *const u64) }; + let val_unshifted_unmasked: u64 = unsafe { + *(data[addr..].as_ptr() as *const u64) + }; let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; (val_shifted & mask) } diff --git a/src/datastruct/stacker/expull.rs b/src/datastruct/stacker/expull.rs index 68c4e61bd..a6bd49097 100644 --- a/src/datastruct/stacker/expull.rs +++ b/src/datastruct/stacker/expull.rs @@ -13,7 +13,7 @@ pub fn jump_needed(val: u32) -> bool { } -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ExpUnrolledLinkedList { len: u32, end: u32, @@ -52,6 +52,12 @@ impl ExpUnrolledLinkedList { } +impl HeapAllocable for u32 { + fn with_addr(_addr: u32) -> u32 { + 0u32 + } +} + impl HeapAllocable for ExpUnrolledLinkedList { fn with_addr(addr: u32) -> ExpUnrolledLinkedList { let last_addr = addr + mem::size_of::() as u32 * 2u32; diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 625bae327..a3d5748b8 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -463,7 +463,6 @@ mod tests { assert_eq!(mmap_directory.get_cache_info().mmapped.len(), i + 1); } let cache_info = mmap_directory.get_cache_info(); - println!("{:?}", cache_info); assert_eq!(cache_info.counters.miss_empty, 30); assert_eq!(cache_info.counters.miss_weak, 10); assert_eq!(cache_info.mmapped.len(), 10); diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index ad2988ce4..9ae143856 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -306,10 +306,6 @@ mod tests { fast_field_readers.open_reader(*FIELD).unwrap(); let mut a = 0u64; for _ in 0..n { - println!("i {}=> {} {}", - a, - fast_field_reader.get(a as u32), - permutation[a as usize]); assert_eq!(fast_field_reader.get(a as u32), permutation[a as usize]); a = fast_field_reader.get(a as u32); } diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 7208fd037..0a674b4bc 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -88,7 +88,7 @@ impl<'a> MultiFieldPostingsWriter<'a> { .cloned() .map(|(key, _)| Term::wrap(key).field()) .enumerate(); - + let mut prev_field = Field(u32::max_value()); for (offset, field) in term_offsets_it { if field != prev_field { @@ -215,6 +215,7 @@ impl<'a, Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<' position: u32, term: &Term, heap: &Heap) { + debug_assert!(term.as_slice().len() >= 4); let recorder: &mut Rec = term_index.get_or_create(term); let current_doc = recorder.current_doc(); if current_doc != doc { diff --git a/src/schema/schema.rs b/src/schema/schema.rs index fcf4c655a..7c5f480dc 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -381,8 +381,6 @@ mod tests { } } ]"#; - println!("{}", schema_json); - println!("{}", expected); assert_eq!(schema_json, expected); let schema: Schema = serde_json::from_str(expected).unwrap(); From e547e8abad35f0f1c70cdd656973f5f28ed469c1 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 6 Jun 2017 19:56:26 +0900 Subject: [PATCH 12/16] Closes #184 Resizing the `Vec` was a bad idea, as for some stacker operation, we may have a living reference to an object in the current heap. --- src/common/bitpacker.rs | 4 +- src/datastruct/stacker/heap.rs | 90 ++++++++++++++++++++++++--------- src/postings/postings_writer.rs | 2 +- 3 files changed, 69 insertions(+), 27 deletions(-) diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index 7306413db..6aa1f5d44 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -123,9 +123,7 @@ impl BitUnpacker let bit_shift = addr_in_bits & 7; debug_assert!(addr + 8 <= data.len(), "The fast field field should have been padded with 7 bytes."); - let val_unshifted_unmasked: u64 = unsafe { - *(data[addr..].as_ptr() as *const u64) - }; + let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) }; let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; (val_shifted & mask) } diff --git a/src/datastruct/stacker/heap.rs b/src/datastruct/stacker/heap.rs index b4511fb35..e642e5908 100644 --- a/src/datastruct/stacker/heap.rs +++ b/src/datastruct/stacker/heap.rs @@ -93,8 +93,9 @@ impl Heap { struct InnerHeap { buffer: Vec, + buffer_len: u32, used: u32, - has_been_resized: bool, + next_heap: Option>, } @@ -103,13 +104,15 @@ impl InnerHeap { let buffer: Vec = vec![0u8; num_bytes]; InnerHeap { buffer: buffer, + buffer_len: num_bytes as u32, + next_heap: None, used: 0u32, - has_been_resized: false, } } pub fn clear(&mut self) { self.used = 0u32; + self.next_heap = None; } pub fn capacity(&self) -> u32 { @@ -119,30 +122,48 @@ impl InnerHeap { // Returns the number of free bytes. If the buffer // has reached it's capacity and overflowed to another buffer, return 0. pub fn num_free_bytes(&self) -> u32 { - if self.has_been_resized { + if self.next_heap.is_some() { 0u32 } else { - (self.buffer.len() as u32) - self.used + self.buffer_len - self.used } } pub fn allocate_space(&mut self, num_bytes: usize) -> u32 { let addr = self.used; self.used += num_bytes as u32; - let buffer_len = self.buffer.len(); - if self.used > buffer_len as u32 { - self.buffer.resize(buffer_len * 2, 0u8); - self.has_been_resized = true + if self.used <= self.buffer_len { + addr + } else { + if self.next_heap.is_none() { + info!(r#"Exceeded heap size. + The segment will be committed right after indexing this document."#,); + self.next_heap = Some(Box::new(InnerHeap::with_capacity(self.buffer_len as usize))); + } + self.next_heap.as_mut().unwrap().allocate_space(num_bytes) + self.buffer_len } - addr } fn get_slice(&self, start: u32, stop: u32) -> &[u8] { - &self.buffer[start as usize..stop as usize] + if start >= self.buffer_len { + self.next_heap + .as_ref() + .unwrap() + .get_slice(start - self.buffer_len, stop - self.buffer_len) + } else { + &self.buffer[start as usize..stop as usize] + } } fn get_mut_slice(&mut self, start: u32, stop: u32) -> &mut [u8] { - &mut self.buffer[start as usize..stop as usize] + if start >= self.buffer_len { + self.next_heap + .as_mut() + .unwrap() + .get_mut_slice(start - self.buffer_len, stop - self.buffer_len) + } else { + &mut self.buffer[start as usize..stop as usize] + } } fn allocate_and_set(&mut self, data: &[u8]) -> BytesRef { @@ -156,23 +177,46 @@ impl InnerHeap { } fn get_mut(&mut self, addr: u32) -> *mut u8 { - let addr_isize = addr as isize; - unsafe { self.buffer.as_mut_ptr().offset(addr_isize) } + if addr >= self.buffer_len { + self.next_heap + .as_mut() + .unwrap() + .get_mut(addr - self.buffer_len) + } else { + let addr_isize = addr as isize; + unsafe { self.buffer.as_mut_ptr().offset(addr_isize) } + } } + + fn get_mut_ref(&mut self, addr: u32) -> &mut Item { - let v_ptr_u8 = self.get_mut(addr) as *mut u8; - let v_ptr = v_ptr_u8 as *mut Item; - unsafe { &mut *v_ptr } + if addr >= self.buffer_len { + self.next_heap + .as_mut() + .unwrap() + .get_mut_ref(addr - self.buffer_len) + } else { + let v_ptr_u8 = self.get_mut(addr) as *mut u8; + let v_ptr = v_ptr_u8 as *mut Item; + unsafe { &mut *v_ptr } + } } - fn set(&mut self, addr: u32, val: &Item) { - let v_ptr: *const Item = val as *const Item; - let v_ptr_u8: *const u8 = v_ptr as *const u8; - debug_assert!(addr + mem::size_of::() as u32 <= self.used); - unsafe { - let dest_ptr: *mut u8 = self.get_mut(addr); - ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::()); + pub fn set(&mut self, addr: u32, val: &Item) { + if addr >= self.buffer_len { + self.next_heap + .as_mut() + .unwrap() + .set(addr - self.buffer_len, val); + } else { + let v_ptr: *const Item = val as *const Item; + let v_ptr_u8: *const u8 = v_ptr as *const u8; + debug_assert!(addr + mem::size_of::() as u32 <= self.used); + unsafe { + let dest_ptr: *mut u8 = self.get_mut(addr); + ptr::copy(v_ptr_u8, dest_ptr, mem::size_of::()); + } } } } diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 0a674b4bc..772506bef 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -88,7 +88,7 @@ impl<'a> MultiFieldPostingsWriter<'a> { .cloned() .map(|(key, _)| Term::wrap(key).field()) .enumerate(); - + let mut prev_field = Field(u32::max_value()); for (offset, field) in term_offsets_it { if field != prev_field { From 90fcfb3f43da1bcf105ac15b2dcdf3167ff9b8c2 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 7 Jun 2017 09:29:46 +0900 Subject: [PATCH 13/16] issue/188 Using murmurhash --- src/datastruct/stacker/hashmap.rs | 95 +++++++++++++++++++++++++++---- 1 file changed, 84 insertions(+), 11 deletions(-) diff --git a/src/datastruct/stacker/hashmap.rs b/src/datastruct/stacker/hashmap.rs index ee9de8cee..68b63bae9 100644 --- a/src/datastruct/stacker/hashmap.rs +++ b/src/datastruct/stacker/hashmap.rs @@ -1,13 +1,56 @@ use std::iter; use super::heap::{Heap, HeapAllocable, BytesRef}; -/// dbj2 hash function -fn djb2(key: &[u8]) -> u64 { - let mut state: u64 = 5381; - for &b in key { - state = (state << 5).wrapping_add(state).wrapping_add(b as u64); + + +mod murmurhash2 { + + const SEED: u32 = 3_242_157_231u32; + + #[inline(always)] + pub fn murmurhash2(key: &[u8]) -> u32 { + let mut key_ptr: *const u32 = key.as_ptr() as *const u32; + let m: u32 = 0x5bd1e995; + let r = 24; + let len = key.len() as u32; + + let mut h: u32 = SEED ^ len; + let num_blocks = len >> 2; + for _ in 0..num_blocks { + let mut k: u32 = unsafe { *key_ptr }; + k = k.wrapping_mul(m); + k ^= k >> r; + k = k.wrapping_mul(m); + k = k.wrapping_mul(m); + h ^= k; + key_ptr = key_ptr.wrapping_offset(1); + } + + // Handle the last few bytes of the input array + let remaining = len & 3; + let key_ptr_u8: *const u8 = key_ptr as *const u8; + match remaining { + 3 => { + h ^= unsafe { *key_ptr_u8.wrapping_offset(2) as u32 } << 16; + h ^= unsafe { *key_ptr_u8.wrapping_offset(1) as u32 } << 8; + h ^= unsafe { *key_ptr_u8 as u32 }; + h = h.wrapping_mul(m); + } + 2 => { + h ^= unsafe { *key_ptr_u8.wrapping_offset(1) as u32 } << 8; + h ^= unsafe { *key_ptr_u8 as u32 }; + h = h.wrapping_mul(m); + } + 1 => { + h ^= unsafe { *key_ptr_u8 as u32 }; + h = h.wrapping_mul(m); + } + _ => {} + } + h ^= h >> 13; + h = h.wrapping_mul(m); + h ^ (h >> 15) } - state } impl Default for BytesRef { @@ -69,7 +112,7 @@ struct QuadraticProbing { impl QuadraticProbing { fn compute(key: &[u8], mask: usize) -> QuadraticProbing { - let hash = djb2(key) as usize; + let hash = murmurhash2::murmurhash2(key) as usize; QuadraticProbing { hash: hash, i: 0, @@ -165,8 +208,9 @@ mod tests { use super::*; use super::super::heap::{Heap, HeapAllocable}; - use super::djb2; + use super::murmurhash2::murmurhash2; use test::Bencher; + use std::collections::HashSet; use std::collections::hash_map::DefaultHasher; use std::hash::Hasher; @@ -220,10 +264,39 @@ mod tests { assert!(iter_values.next().is_none()); } + #[test] + fn test_murmur() { + let s1 = "abcdef"; + let s2 = "abcdeg"; + for i in 0..5 { + assert_eq!(murmurhash2(&s1[i..5].as_bytes()), + murmurhash2(&s2[i..5].as_bytes())); + } + } + + #[test] + fn test_murmur_collisions() { + let mut set: HashSet = HashSet::default(); + for i in 0..10_000 { + let s = format!("hash{}", i); + let hash = murmurhash2(s.as_bytes()); + set.insert(hash); + } + assert_eq!(set.len(), 10_000); + } + #[bench] - fn bench_djb2(bench: &mut Bencher) { - let v = String::from("abwer"); - bench.iter(|| djb2(v.as_bytes())); + fn bench_murmurhash_2(b: &mut Bencher) { + let keys: Vec<&'static str> = + vec!["wer qwe qwe qwe ", "werbq weqweqwe2 ", "weraq weqweqwe3 "]; + b.iter(|| { + keys.iter() + .map(|&s| s.as_bytes()) + .map(murmurhash2::murmurhash2) + .map(|h| h as u64) + .last() + .unwrap() + }); } #[bench] From 8875b9794acddd19e828a981ec99bb27e0b57a0d Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 13 Jun 2017 23:16:42 +0900 Subject: [PATCH 14/16] Added API to get range from fastfield --- src/common/bitpacker.rs | 24 ++++++++++++++++++++++++ src/fastfield/reader.rs | 22 ++++++++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index 6aa1f5d44..5df1e7222 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -127,6 +127,30 @@ impl BitUnpacker let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; (val_shifted & mask) } + + pub fn get_range(&self, start: u32, output: &mut [u64]) { + if self.num_bits == 0 { + for val in output.iter_mut() { + *val = 0; + } + } + else { + let data: &[u8] = &*self.data; + let num_bits = self.num_bits; + let mask = self.mask; + + let mut addr_in_bits = (start as usize) * num_bits; + for i in 0..output.len() { + let addr = addr_in_bits >> 3; + let bit_shift = addr_in_bits & 7; + let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) }; + let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; + output[i] = val_shifted & mask; + addr_in_bits += num_bits; + } + } + + } } diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 7c01d0a8f..8413527b4 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -13,6 +13,7 @@ use common::bitpacker::compute_num_bits; use common::bitpacker::BitUnpacker; use schema::FieldType; use error::ResultExt; +use std::mem; use common; use owning_ref::OwningRef; @@ -29,6 +30,7 @@ pub trait FastFieldReader: Sized { /// This accessor should return as fast as possible. fn get(&self, doc: DocId) -> Self::ValueType; + fn get_range(&self, start: u32, output: &mut [Self::ValueType]); /// Opens a fast field given a source. fn open(source: ReadOnlySource) -> Self; @@ -80,6 +82,13 @@ impl FastFieldReader for U64FastFieldReader { } } + fn get_range(&self, start: u32, output: &mut [Self::ValueType]) { + self.bit_unpacker.get_range(start, output); + for out in output.iter_mut() { + *out += self.min_value; + } + } + /// Opens a new fast field reader given a read only source. /// /// # Panics @@ -181,6 +190,19 @@ impl FastFieldReader for I64FastFieldReader { common::u64_to_i64(self.underlying.get(doc)) } + /// + /// # Panics + /// + /// May panic or return wrong random result if `doc` + /// is greater or equal to the segment's `maxdoc`. + fn get_range(&self, start: u32, output: &mut [Self::ValueType]) { + let output_u64: &mut [u64] = unsafe { mem::transmute(output) }; + self.underlying.get_range(start, output_u64); + for mut_val in output_u64.iter_mut() { + *mut_val ^= 1 << 63; + } + } + /// Opens a new fast field reader given a read only source. /// /// # Panics From 93e7f28cc085827d65e28ce00a1240a60273753e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 14 Jun 2017 10:46:06 +0900 Subject: [PATCH 15/16] Added unit test --- src/common/bitpacker.rs | 30 +++++++++++++++++++++++------- src/fastfield/mod.rs | 6 +++++- src/fastfield/reader.rs | 13 +++++++++++++ 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/common/bitpacker.rs b/src/common/bitpacker.rs index 5df1e7222..49ea9f9e6 100644 --- a/src/common/bitpacker.rs +++ b/src/common/bitpacker.rs @@ -133,23 +133,21 @@ impl BitUnpacker for val in output.iter_mut() { *val = 0; } - } - else { + } else { let data: &[u8] = &*self.data; let num_bits = self.num_bits; let mask = self.mask; - let mut addr_in_bits = (start as usize) * num_bits; - for i in 0..output.len() { + for output_val in output.iter_mut() { let addr = addr_in_bits >> 3; let bit_shift = addr_in_bits & 7; let val_unshifted_unmasked: u64 = unsafe { *(data[addr..].as_ptr() as *const u64) }; let val_shifted = (val_unshifted_unmasked >> bit_shift) as u64; - output[i] = val_shifted & mask; + *output_val = val_shifted & mask; addr_in_bits += num_bits; } } - + } } @@ -172,7 +170,7 @@ mod test { assert_eq!(compute_num_bits(5_000_000_000), 33u8); } - fn test_bitpacker_util(len: usize, num_bits: usize) { + fn create_fastfield_bitpacker(len: usize, num_bits: usize) -> (BitUnpacker>, Vec) { let mut data = Vec::new(); let mut bitpacker = BitPacker::new(num_bits); let max_val: u64 = (1 << num_bits) - 1; @@ -185,6 +183,11 @@ mod test { bitpacker.close(&mut data).unwrap(); assert_eq!(data.len(), (num_bits * len + 7) / 8 + 7); let bitunpacker = BitUnpacker::new(data, num_bits); + (bitunpacker, vals) + } + + fn test_bitpacker_util(len: usize, num_bits: usize) { + let (bitunpacker, vals) = create_fastfield_bitpacker(len, num_bits); for (i, val) in vals.iter().enumerate() { assert_eq!(bitunpacker.get(i), *val); } @@ -198,4 +201,17 @@ mod test { test_bitpacker_util(6, 14); test_bitpacker_util(1000, 14); } + + #[test] + fn test_bitpacker_range() { + let (bitunpacker, vals) = create_fastfield_bitpacker(100_000, 12); + let buffer_len = 100; + let mut buffer = vec![0u64; buffer_len]; + for start in vec![0, 10, 20, 100, 1_000] { + bitunpacker.get_range(start as u32, &mut buffer[..]); + for i in 0..buffer_len { + assert_eq!(buffer[i], vals[start + i]); + } + } + } } diff --git a/src/fastfield/mod.rs b/src/fastfield/mod.rs index 9ae143856..ae18705b9 100644 --- a/src/fastfield/mod.rs +++ b/src/fastfield/mod.rs @@ -211,7 +211,6 @@ mod tests { } } - #[test] fn test_signed_intfastfield() { let path = Path::new("test"); @@ -245,6 +244,11 @@ mod tests { for (doc, i) in (-100i64..10_000i64).enumerate() { assert_eq!(fast_field_reader.get(doc as u32), i); } + let mut buffer = vec![0i64; 100]; + fast_field_reader.get_range(53, &mut buffer[..]); + for i in 0..100 { + assert_eq!(buffer[i], -100i64 + 53i64 + i as i64); + } } } diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs index 8413527b4..aae1dd797 100644 --- a/src/fastfield/reader.rs +++ b/src/fastfield/reader.rs @@ -28,8 +28,21 @@ pub trait FastFieldReader: Sized { /// Return the value associated to the given document. /// /// This accessor should return as fast as possible. + /// + /// # Panics + /// + /// May panic if `doc` is greater than the segment + // `maxdoc`. fn get(&self, doc: DocId) -> Self::ValueType; + /// Fills an output buffer with the fast field values + /// associated with the `DocId` going from + /// `start` to `start + output.len()`. + /// + /// # Panics + /// + /// May panic if `start + output.len()` is greater than + /// the segment's `maxdoc`. fn get_range(&self, start: u32, output: &mut [Self::ValueType]); /// Opens a fast field given a source. From e51feea57492d72d5dc6e793b964ee978c358ac4 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 14 Jun 2017 13:45:07 +0900 Subject: [PATCH 16/16] Removed cargo fmt from travis. --- .travis.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 4bca1ec5a..cbfbc222b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,9 +22,7 @@ before_script: - | pip install 'travis-cargo<0.2' --user && export PATH=$HOME/.local/bin:$PATH - - (cargo install rustfmt || true) script: - - cargo fmt -- --write-mode=diff - | travis-cargo build && travis-cargo test &&