diff --git a/src/core/codec.rs b/src/core/codec.rs index a65cf2e0b..fcddad101 100644 --- a/src/core/codec.rs +++ b/src/core/codec.rs @@ -4,7 +4,7 @@ use rustc_serialize::json; use core::index::Segment; use core::index::SegmentInfo; use core::index::SegmentComponent; -use core::fastfield::FastFieldSerializer; +use fastfield::FastFieldSerializer; use core::store::StoreWriter; use core::convert_to_ioerror; diff --git a/src/core/collector.rs b/src/core/collector.rs index def40c60b..5067b222a 100644 --- a/src/core/collector.rs +++ b/src/core/collector.rs @@ -2,7 +2,7 @@ use DocId; use core::reader::SegmentReader; use core::searcher::SegmentLocalId; use core::searcher::DocAddress; -use core::fastfield::U32FastFieldReader; +use fastfield::U32FastFieldReader; use core::schema::U32Field; use std::io; diff --git a/src/core/merger.rs b/src/core/merger.rs index 8f7437640..f719cbe8c 100644 --- a/src/core/merger.rs +++ b/src/core/merger.rs @@ -12,7 +12,7 @@ use std::collections::BinaryHeap; use datastruct::FstMapIter; use core::schema::Term; use core::schema::Schema; -use core::fastfield::FastFieldSerializer; +use fastfield::FastFieldSerializer; use core::store::StoreWriter; use core::index::SegmentInfo; use std::cmp::Ordering; diff --git a/src/core/mod.rs b/src/core/mod.rs index a374e2b4b..d1f5d8152 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -8,8 +8,6 @@ pub mod collector; pub mod serialize; pub mod store; pub mod index; -pub mod fastfield; -pub mod fastdivide; pub mod merger; pub mod timer; diff --git a/src/core/reader.rs b/src/core/reader.rs index ef1aedbe1..0e06036fc 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -17,8 +17,7 @@ use core::timer::OpenTimer; use core::schema::U32Field; use core::convert_to_ioerror; use core::serialize::BinarySerializable; -use core::fastfield::U32FastFieldsReader; -use core::fastfield::U32FastFieldReader; +use fastfield::{U32FastFieldsReader, U32FastFieldReader}; use compression; use std::mem; diff --git a/src/core/writer.rs b/src/core/writer.rs index 3c21979b6..96923cc37 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -8,7 +8,7 @@ use core::analyzer::StreamingIterator; use core::index::Segment; use core::index::SegmentInfo; use postings::PostingsWriter; -use core::fastfield::U32FastFieldsWriter; +use fastfield::U32FastFieldsWriter; use std::clone::Clone; use std::sync::mpsc; use std::thread; diff --git a/src/core/fastdivide.rs b/src/fastfield/fastdivide.rs similarity index 99% rename from src/core/fastdivide.rs rename to src/fastfield/fastdivide.rs index 7e0d06620..b568a3142 100644 --- a/src/core/fastdivide.rs +++ b/src/fastfield/fastdivide.rs @@ -5,6 +5,7 @@ use std::num::Wrapping; // ported from libdivide.h by ridiculous_fish + const LIBDIVIDE_32_SHIFT_MASK: u8 = 0x1F; const LIBDIVIDE_ADD_MARKER: u8 = 0x40; const LIBDIVIDE_U32_SHIFT_PATH: u8 = 0x80; diff --git a/src/core/fastfield.rs b/src/fastfield/mod.rs similarity index 52% rename from src/core/fastfield.rs rename to src/fastfield/mod.rs index 8daed2604..33f1d5788 100644 --- a/src/core/fastfield.rs +++ b/src/fastfield/mod.rs @@ -1,295 +1,39 @@ -use std::io; -use std::io::{SeekFrom, Seek, Write}; -use directory::{WritePtr, ReadOnlySource}; -use core::serialize::BinarySerializable; -use std::collections::HashMap; -use DocId; -use core::schema::Schema; -use core::schema::Document; -use std::ops::Deref; -use core::fastdivide::count_leading_zeros; -use core::fastdivide::DividerU32; -use core::schema::U32Field; +mod fastdivide; +mod reader; +mod writer; +mod serializer; -pub fn compute_num_bits(amplitude: u32) -> u8 { + +pub use self::fastdivide::DividerU32; +pub use self::writer::{U32FastFieldsWriter, U32FastFieldWriter}; +pub use self::reader::{U32FastFieldsReader, U32FastFieldReader}; +pub use self::serializer::FastFieldSerializer; + +use self::fastdivide::count_leading_zeros; + +fn compute_num_bits(amplitude: u32) -> u8 { 32u8 - count_leading_zeros(amplitude) } -pub struct FastFieldSerializer { - write: WritePtr, - written_size: usize, - fields: Vec<(U32Field, u32)>, - num_bits: u8, - - min_value: u32, - - field_open: bool, - mini_buffer_written: usize, - mini_buffer: u64, -} - -impl FastFieldSerializer { - pub fn new(mut write: WritePtr) -> io::Result { - // just making room for the pointer to header. - let written_size: usize = try!(0u32.serialize(&mut write)); - Ok(FastFieldSerializer { - write: write, - written_size: written_size, - fields: Vec::new(), - num_bits: 0u8, - field_open: false, - mini_buffer_written: 0, - mini_buffer: 0, - min_value: 0, - }) - } - - pub fn new_u32_fast_field(&mut self, field: U32Field, min_value: u32, max_value: u32) -> io::Result<()> { - if self.field_open { - return Err(io::Error::new(io::ErrorKind::Other, "Previous field not closed")); - } - self.min_value = min_value; - self.field_open = true; - self.fields.push((field, self.written_size as u32)); - let write: &mut Write = &mut self.write; - self.written_size += try!(min_value.serialize(write)); - let amplitude = max_value - min_value; - self.written_size += try!(amplitude.serialize(write)); - self.num_bits = compute_num_bits(amplitude); - Ok(()) - } - - pub fn add_val(&mut self, val: u32) -> io::Result<()> { - let write: &mut Write = &mut self.write; - if self.mini_buffer_written + (self.num_bits as usize) > 64 { - self.written_size += try!(self.mini_buffer.serialize(write)); - self.mini_buffer = 0; - self.mini_buffer_written = 0; - } - self.mini_buffer |= ((val - self.min_value) as u64) << self.mini_buffer_written; - self.mini_buffer_written += self.num_bits as usize; - Ok(()) - } - - pub fn close_field(&mut self,) -> io::Result<()> { - if !self.field_open { - return Err(io::Error::new(io::ErrorKind::Other, "Current field is already closed")); - } - self.field_open = false; - if self.mini_buffer_written > 0 { - self.mini_buffer_written = 0; - self.written_size += try!(self.mini_buffer.serialize(&mut self.write)); - } - self.mini_buffer = 0; - Ok(()) - } - - pub fn close(mut self,) -> io::Result { - if self.field_open { - return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed")); - } - let header_offset: usize = self.written_size; - self.written_size += try!(self.fields.serialize(&mut self.write)); - try!(self.write.seek(SeekFrom::Start(0))); - try!((header_offset as u32).serialize(&mut self.write)); - Ok(self.written_size) - } -} - -pub struct U32FastFieldsWriter { - field_writers: Vec, -} - -impl U32FastFieldsWriter { - - pub fn from_schema(schema: &Schema) -> U32FastFieldsWriter { - let u32_fields: Vec = schema.get_u32_fields() - .iter() - .enumerate() - .filter(|&(_, field_entry)| field_entry.option.is_fast()) - .map(|(field_id, _)| U32Field(field_id as u8)) - .collect(); - U32FastFieldsWriter::new(u32_fields) - } - - pub fn new(fields: Vec) -> U32FastFieldsWriter { - U32FastFieldsWriter { - field_writers: fields - .iter() - .map(|field| U32FastFieldWriter::new(&field)) - .collect(), - } - } - - pub fn add_document(&mut self, doc: &Document) { - for field_writer in self.field_writers.iter_mut() { - field_writer.add_document(doc); - } - } - - pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> { - for field_writer in self.field_writers.iter() { - try!(field_writer.serialize(serializer)); - } - Ok(()) - } -} - -pub struct U32FastFieldWriter { - field: U32Field, - vals: Vec, -} - -impl U32FastFieldWriter { - pub fn new(field: &U32Field) -> U32FastFieldWriter { - U32FastFieldWriter { - field: field.clone(), - vals: Vec::new(), - } - } - - pub fn add_val(&mut self, val: u32) { - self.vals.push(val); - } - - pub fn add_document(&mut self, doc: &Document) { - let val = doc.get_u32(&self.field).unwrap_or(0u32); - self.add_val(val); - } - - pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> { - let zero = 0; - let min = self.vals.iter().min().unwrap_or(&zero).clone(); - let max = self.vals.iter().max().unwrap_or(&min).clone(); - try!(serializer.new_u32_fast_field(self.field.clone(), min, max)); - for val in self.vals.iter() { - try!(serializer.add_val(val.clone())); - } - serializer.close_field() - } -} - -pub struct U32FastFieldReader { - _data: ReadOnlySource, - data_ptr: *const u64, - min_val: u32, - max_val: u32, - num_bits: u8, - mask: u32, - num_in_pack: u32, - divider: DividerU32, -} - -impl U32FastFieldReader { - - pub fn min_val(&self,) -> u32 { - self.min_val - } - - pub fn max_val(&self,) -> u32 { - self.max_val - } - - pub fn open(data: ReadOnlySource) -> io::Result { - let min_val; - let amplitude; - { - let mut cursor = data.cursor(); - min_val = try!(u32::deserialize(&mut cursor)); - amplitude = try!(u32::deserialize(&mut cursor)); - } - let num_bits = compute_num_bits(amplitude); - let mask = (1 << num_bits) - 1; - let num_in_pack = 64u32 / (num_bits as u32); - let ptr: *const u8 = &(data.deref()[8 as usize]); - Ok(U32FastFieldReader { - _data: data, - data_ptr: ptr as *const u64, - min_val: min_val, - max_val: min_val + amplitude, - num_bits: num_bits, - mask: mask, - num_in_pack: num_in_pack, - divider: DividerU32::divide_by(num_in_pack), - }) - } - - pub fn get(&self, doc: DocId) -> u32 { - let long_addr = self.divider.divide(doc); - let ord_within_long = doc - long_addr * self.num_in_pack; - let bit_shift = (self.num_bits as u32) * ord_within_long; - let val_unshifted_unmasked: u64 = unsafe { *self.data_ptr.offset(long_addr as isize) }; - let val_shifted = (val_unshifted_unmasked >> bit_shift) as u32; - return self.min_val + (val_shifted & self.mask); - } -} - -pub struct U32FastFieldsReader { - source: ReadOnlySource, - field_offsets: HashMap, -} - -impl U32FastFieldsReader { - pub fn open(source: ReadOnlySource) -> io::Result { - let header_offset; - let field_offsets: Vec<(U32Field, u32)>; - { - let mut cursor = source.cursor(); - header_offset = try!(u32::deserialize(&mut cursor)); - try!(cursor.seek(SeekFrom::Start(header_offset as u64))); - field_offsets = try!(Vec::deserialize(&mut cursor)); - } - let mut end_offsets: Vec = field_offsets - .iter() - .map(|&(_, offset)| offset.clone()) - .collect(); - end_offsets.push(header_offset); - let mut field_offsets_map: HashMap = HashMap::new(); - for (field_start_offsets, stop_offset) in field_offsets.iter().zip(end_offsets.iter().skip(1)) { - let (field, start_offset) = field_start_offsets.clone(); - field_offsets_map.insert(field.clone(), (start_offset.clone(), stop_offset.clone())); - } - Ok(U32FastFieldsReader { - field_offsets: field_offsets_map, - source: source, - }) - } - - pub fn get_field(&self, field: &U32Field) -> io::Result { - match self.field_offsets.get(field) { - Some(&(start, stop)) => { - let field_source = self.source.slice(start as usize, stop as usize); - U32FastFieldReader::open(field_source) - } - None => { - Err(io::Error::new(io::ErrorKind::InvalidInput, "Could not find field, has it been set as a fast field?")) - } - - } - - } -} - #[cfg(test)] mod tests { use super::compute_num_bits; use super::U32FastFieldsReader; use super::U32FastFieldsWriter; + use super::FastFieldSerializer; use core::schema::U32Field; use std::path::Path; use directory::{Directory, WritePtr, RAMDirectory}; use core::schema::Document; use core::schema::Schema; use core::schema::FAST_U32; - use core::fastfield::FastFieldSerializer; use test::Bencher; use test; use rand::Rng; use rand::SeedableRng; use rand::XorShiftRng; - #[test] fn test_compute_num_bits() { assert_eq!(compute_num_bits(1), 1u8); diff --git a/src/fastfield/reader.rs b/src/fastfield/reader.rs new file mode 100644 index 000000000..38a34834b --- /dev/null +++ b/src/fastfield/reader.rs @@ -0,0 +1,111 @@ +use std::io; +use directory::ReadOnlySource; +use fastfield::DividerU32; +use core::serialize::BinarySerializable; +use DocId; +use std::collections::HashMap; +use core::schema::U32Field; +use std::io::{SeekFrom, Seek}; +use std::ops::Deref; +use super::compute_num_bits; + +pub struct U32FastFieldReader { + _data: ReadOnlySource, + data_ptr: *const u64, + min_val: u32, + max_val: u32, + num_bits: u8, + mask: u32, + num_in_pack: u32, + divider: DividerU32, +} + +impl U32FastFieldReader { + + pub fn min_val(&self,) -> u32 { + self.min_val + } + + pub fn max_val(&self,) -> u32 { + self.max_val + } + + pub fn open(data: ReadOnlySource) -> io::Result { + let min_val; + let amplitude; + { + let mut cursor = data.cursor(); + min_val = try!(u32::deserialize(&mut cursor)); + amplitude = try!(u32::deserialize(&mut cursor)); + } + let num_bits = compute_num_bits(amplitude); + let mask = (1 << num_bits) - 1; + let num_in_pack = 64u32 / (num_bits as u32); + let ptr: *const u8 = &(data.deref()[8 as usize]); + Ok(U32FastFieldReader { + _data: data, + data_ptr: ptr as *const u64, + min_val: min_val, + max_val: min_val + amplitude, + num_bits: num_bits, + mask: mask, + num_in_pack: num_in_pack, + divider: DividerU32::divide_by(num_in_pack), + }) + } + + pub fn get(&self, doc: DocId) -> u32 { + let long_addr = self.divider.divide(doc); + let ord_within_long = doc - long_addr * self.num_in_pack; + let bit_shift = (self.num_bits as u32) * ord_within_long; + let val_unshifted_unmasked: u64 = unsafe { *self.data_ptr.offset(long_addr as isize) }; + let val_shifted = (val_unshifted_unmasked >> bit_shift) as u32; + return self.min_val + (val_shifted & self.mask); + } +} + +pub struct U32FastFieldsReader { + source: ReadOnlySource, + field_offsets: HashMap, +} + +impl U32FastFieldsReader { + pub fn open(source: ReadOnlySource) -> io::Result { + let header_offset; + let field_offsets: Vec<(U32Field, u32)>; + { + let mut cursor = source.cursor(); + header_offset = try!(u32::deserialize(&mut cursor)); + try!(cursor.seek(SeekFrom::Start(header_offset as u64))); + field_offsets = try!(Vec::deserialize(&mut cursor)); + } + let mut end_offsets: Vec = field_offsets + .iter() + .map(|&(_, offset)| offset.clone()) + .collect(); + end_offsets.push(header_offset); + let mut field_offsets_map: HashMap = HashMap::new(); + for (field_start_offsets, stop_offset) in field_offsets.iter().zip(end_offsets.iter().skip(1)) { + let (field, start_offset) = field_start_offsets.clone(); + field_offsets_map.insert(field.clone(), (start_offset.clone(), stop_offset.clone())); + } + Ok(U32FastFieldsReader { + field_offsets: field_offsets_map, + source: source, + }) + } + + pub fn get_field(&self, field: &U32Field) -> io::Result { + match self.field_offsets.get(field) { + Some(&(start, stop)) => { + let field_source = self.source.slice(start as usize, stop as usize); + U32FastFieldReader::open(field_source) + } + None => { + Err(io::Error::new(io::ErrorKind::InvalidInput, "Could not find field, has it been set as a fast field?")) + } + + } + + } +} diff --git a/src/fastfield/serializer.rs b/src/fastfield/serializer.rs new file mode 100644 index 000000000..7167630af --- /dev/null +++ b/src/fastfield/serializer.rs @@ -0,0 +1,87 @@ +use core::serialize::BinarySerializable; +use directory::WritePtr; +use core::schema::U32Field; +use std::io; +use std::io::{SeekFrom, Write}; +use super::compute_num_bits; + +pub struct FastFieldSerializer { + write: WritePtr, + written_size: usize, + fields: Vec<(U32Field, u32)>, + num_bits: u8, + + min_value: u32, + + field_open: bool, + mini_buffer_written: usize, + mini_buffer: u64, +} + +impl FastFieldSerializer { + pub fn new(mut write: WritePtr) -> io::Result { + // just making room for the pointer to header. + let written_size: usize = try!(0u32.serialize(&mut write)); + Ok(FastFieldSerializer { + write: write, + written_size: written_size, + fields: Vec::new(), + num_bits: 0u8, + field_open: false, + mini_buffer_written: 0, + mini_buffer: 0, + min_value: 0, + }) + } + + pub fn new_u32_fast_field(&mut self, field: U32Field, min_value: u32, max_value: u32) -> io::Result<()> { + if self.field_open { + return Err(io::Error::new(io::ErrorKind::Other, "Previous field not closed")); + } + self.min_value = min_value; + self.field_open = true; + self.fields.push((field, self.written_size as u32)); + let write: &mut Write = &mut self.write; + self.written_size += try!(min_value.serialize(write)); + let amplitude = max_value - min_value; + self.written_size += try!(amplitude.serialize(write)); + self.num_bits = compute_num_bits(amplitude); + Ok(()) + } + + pub fn add_val(&mut self, val: u32) -> io::Result<()> { + let write: &mut Write = &mut self.write; + if self.mini_buffer_written + (self.num_bits as usize) > 64 { + self.written_size += try!(self.mini_buffer.serialize(write)); + self.mini_buffer = 0; + self.mini_buffer_written = 0; + } + self.mini_buffer |= ((val - self.min_value) as u64) << self.mini_buffer_written; + self.mini_buffer_written += self.num_bits as usize; + Ok(()) + } + + pub fn close_field(&mut self,) -> io::Result<()> { + if !self.field_open { + return Err(io::Error::new(io::ErrorKind::Other, "Current field is already closed")); + } + self.field_open = false; + if self.mini_buffer_written > 0 { + self.mini_buffer_written = 0; + self.written_size += try!(self.mini_buffer.serialize(&mut self.write)); + } + self.mini_buffer = 0; + Ok(()) + } + + pub fn close(mut self,) -> io::Result { + if self.field_open { + return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed")); + } + let header_offset: usize = self.written_size; + self.written_size += try!(self.fields.serialize(&mut self.write)); + try!(self.write.seek(SeekFrom::Start(0))); + try!((header_offset as u32).serialize(&mut self.write)); + Ok(self.written_size) + } +} diff --git a/src/fastfield/writer.rs b/src/fastfield/writer.rs new file mode 100644 index 000000000..b7e0e0633 --- /dev/null +++ b/src/fastfield/writer.rs @@ -0,0 +1,76 @@ +use core::schema::{Schema, U32Field, Document}; +use fastfield::FastFieldSerializer; +use std::io; + +pub struct U32FastFieldsWriter { + field_writers: Vec, +} + +impl U32FastFieldsWriter { + + pub fn from_schema(schema: &Schema) -> U32FastFieldsWriter { + let u32_fields: Vec = schema.get_u32_fields() + .iter() + .enumerate() + .filter(|&(_, field_entry)| field_entry.option.is_fast()) + .map(|(field_id, _)| U32Field(field_id as u8)) + .collect(); + U32FastFieldsWriter::new(u32_fields) + } + + pub fn new(fields: Vec) -> U32FastFieldsWriter { + U32FastFieldsWriter { + field_writers: fields + .iter() + .map(|field| U32FastFieldWriter::new(&field)) + .collect(), + } + } + + pub fn add_document(&mut self, doc: &Document) { + for field_writer in self.field_writers.iter_mut() { + field_writer.add_document(doc); + } + } + + pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> { + for field_writer in self.field_writers.iter() { + try!(field_writer.serialize(serializer)); + } + Ok(()) + } +} + +pub struct U32FastFieldWriter { + field: U32Field, + vals: Vec, +} + +impl U32FastFieldWriter { + pub fn new(field: &U32Field) -> U32FastFieldWriter { + U32FastFieldWriter { + field: field.clone(), + vals: Vec::new(), + } + } + + pub fn add_val(&mut self, val: u32) { + self.vals.push(val); + } + + pub fn add_document(&mut self, doc: &Document) { + let val = doc.get_u32(&self.field).unwrap_or(0u32); + self.add_val(val); + } + + pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> { + let zero = 0; + let min = self.vals.iter().min().unwrap_or(&zero).clone(); + let max = self.vals.iter().max().unwrap_or(&min).clone(); + try!(serializer.new_u32_fast_field(self.field.clone(), min, max)); + for val in self.vals.iter() { + try!(serializer.add_val(val.clone())); + } + serializer.close_field() + } +} diff --git a/src/lib.rs b/src/lib.rs index 8b7eb3e58..5831e8c63 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,6 +32,7 @@ mod datastruct; mod postings; mod directory; mod compression; +mod fastfield; pub use directory::Directory; pub use core::analyzer;