moved fastfield to its own mod.

This commit is contained in:
Paul Masurel
2016-04-30 19:18:54 +09:00
parent 26826ac4ea
commit 6f148653ff
12 changed files with 295 additions and 278 deletions

View File

@@ -4,7 +4,7 @@ use rustc_serialize::json;
use core::index::Segment;
use core::index::SegmentInfo;
use core::index::SegmentComponent;
use core::fastfield::FastFieldSerializer;
use fastfield::FastFieldSerializer;
use core::store::StoreWriter;
use core::convert_to_ioerror;

View File

@@ -2,7 +2,7 @@ use DocId;
use core::reader::SegmentReader;
use core::searcher::SegmentLocalId;
use core::searcher::DocAddress;
use core::fastfield::U32FastFieldReader;
use fastfield::U32FastFieldReader;
use core::schema::U32Field;
use std::io;

View File

@@ -12,7 +12,7 @@ use std::collections::BinaryHeap;
use datastruct::FstMapIter;
use core::schema::Term;
use core::schema::Schema;
use core::fastfield::FastFieldSerializer;
use fastfield::FastFieldSerializer;
use core::store::StoreWriter;
use core::index::SegmentInfo;
use std::cmp::Ordering;

View File

@@ -8,8 +8,6 @@ pub mod collector;
pub mod serialize;
pub mod store;
pub mod index;
pub mod fastfield;
pub mod fastdivide;
pub mod merger;
pub mod timer;

View File

@@ -17,8 +17,7 @@ use core::timer::OpenTimer;
use core::schema::U32Field;
use core::convert_to_ioerror;
use core::serialize::BinarySerializable;
use core::fastfield::U32FastFieldsReader;
use core::fastfield::U32FastFieldReader;
use fastfield::{U32FastFieldsReader, U32FastFieldReader};
use compression;
use std::mem;

View File

@@ -8,7 +8,7 @@ use core::analyzer::StreamingIterator;
use core::index::Segment;
use core::index::SegmentInfo;
use postings::PostingsWriter;
use core::fastfield::U32FastFieldsWriter;
use fastfield::U32FastFieldsWriter;
use std::clone::Clone;
use std::sync::mpsc;
use std::thread;

View File

@@ -5,6 +5,7 @@ use std::num::Wrapping;
// ported from libdivide.h by ridiculous_fish
const LIBDIVIDE_32_SHIFT_MASK: u8 = 0x1F;
const LIBDIVIDE_ADD_MARKER: u8 = 0x40;
const LIBDIVIDE_U32_SHIFT_PATH: u8 = 0x80;

View File

@@ -1,295 +1,39 @@
use std::io;
use std::io::{SeekFrom, Seek, Write};
use directory::{WritePtr, ReadOnlySource};
use core::serialize::BinarySerializable;
use std::collections::HashMap;
use DocId;
use core::schema::Schema;
use core::schema::Document;
use std::ops::Deref;
use core::fastdivide::count_leading_zeros;
use core::fastdivide::DividerU32;
use core::schema::U32Field;
mod fastdivide;
mod reader;
mod writer;
mod serializer;
pub fn compute_num_bits(amplitude: u32) -> u8 {
pub use self::fastdivide::DividerU32;
pub use self::writer::{U32FastFieldsWriter, U32FastFieldWriter};
pub use self::reader::{U32FastFieldsReader, U32FastFieldReader};
pub use self::serializer::FastFieldSerializer;
use self::fastdivide::count_leading_zeros;
fn compute_num_bits(amplitude: u32) -> u8 {
32u8 - count_leading_zeros(amplitude)
}
pub struct FastFieldSerializer {
write: WritePtr,
written_size: usize,
fields: Vec<(U32Field, u32)>,
num_bits: u8,
min_value: u32,
field_open: bool,
mini_buffer_written: usize,
mini_buffer: u64,
}
impl FastFieldSerializer {
pub fn new(mut write: WritePtr) -> io::Result<FastFieldSerializer> {
// just making room for the pointer to header.
let written_size: usize = try!(0u32.serialize(&mut write));
Ok(FastFieldSerializer {
write: write,
written_size: written_size,
fields: Vec::new(),
num_bits: 0u8,
field_open: false,
mini_buffer_written: 0,
mini_buffer: 0,
min_value: 0,
})
}
pub fn new_u32_fast_field(&mut self, field: U32Field, min_value: u32, max_value: u32) -> io::Result<()> {
if self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Previous field not closed"));
}
self.min_value = min_value;
self.field_open = true;
self.fields.push((field, self.written_size as u32));
let write: &mut Write = &mut self.write;
self.written_size += try!(min_value.serialize(write));
let amplitude = max_value - min_value;
self.written_size += try!(amplitude.serialize(write));
self.num_bits = compute_num_bits(amplitude);
Ok(())
}
pub fn add_val(&mut self, val: u32) -> io::Result<()> {
let write: &mut Write = &mut self.write;
if self.mini_buffer_written + (self.num_bits as usize) > 64 {
self.written_size += try!(self.mini_buffer.serialize(write));
self.mini_buffer = 0;
self.mini_buffer_written = 0;
}
self.mini_buffer |= ((val - self.min_value) as u64) << self.mini_buffer_written;
self.mini_buffer_written += self.num_bits as usize;
Ok(())
}
pub fn close_field(&mut self,) -> io::Result<()> {
if !self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Current field is already closed"));
}
self.field_open = false;
if self.mini_buffer_written > 0 {
self.mini_buffer_written = 0;
self.written_size += try!(self.mini_buffer.serialize(&mut self.write));
}
self.mini_buffer = 0;
Ok(())
}
pub fn close(mut self,) -> io::Result<usize> {
if self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed"));
}
let header_offset: usize = self.written_size;
self.written_size += try!(self.fields.serialize(&mut self.write));
try!(self.write.seek(SeekFrom::Start(0)));
try!((header_offset as u32).serialize(&mut self.write));
Ok(self.written_size)
}
}
pub struct U32FastFieldsWriter {
field_writers: Vec<U32FastFieldWriter>,
}
impl U32FastFieldsWriter {
pub fn from_schema(schema: &Schema) -> U32FastFieldsWriter {
let u32_fields: Vec<U32Field> = schema.get_u32_fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.option.is_fast())
.map(|(field_id, _)| U32Field(field_id as u8))
.collect();
U32FastFieldsWriter::new(u32_fields)
}
pub fn new(fields: Vec<U32Field>) -> U32FastFieldsWriter {
U32FastFieldsWriter {
field_writers: fields
.iter()
.map(|field| U32FastFieldWriter::new(&field))
.collect(),
}
}
pub fn add_document(&mut self, doc: &Document) {
for field_writer in self.field_writers.iter_mut() {
field_writer.add_document(doc);
}
}
pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> {
for field_writer in self.field_writers.iter() {
try!(field_writer.serialize(serializer));
}
Ok(())
}
}
pub struct U32FastFieldWriter {
field: U32Field,
vals: Vec<u32>,
}
impl U32FastFieldWriter {
pub fn new(field: &U32Field) -> U32FastFieldWriter {
U32FastFieldWriter {
field: field.clone(),
vals: Vec::new(),
}
}
pub fn add_val(&mut self, val: u32) {
self.vals.push(val);
}
pub fn add_document(&mut self, doc: &Document) {
let val = doc.get_u32(&self.field).unwrap_or(0u32);
self.add_val(val);
}
pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> {
let zero = 0;
let min = self.vals.iter().min().unwrap_or(&zero).clone();
let max = self.vals.iter().max().unwrap_or(&min).clone();
try!(serializer.new_u32_fast_field(self.field.clone(), min, max));
for val in self.vals.iter() {
try!(serializer.add_val(val.clone()));
}
serializer.close_field()
}
}
pub struct U32FastFieldReader {
_data: ReadOnlySource,
data_ptr: *const u64,
min_val: u32,
max_val: u32,
num_bits: u8,
mask: u32,
num_in_pack: u32,
divider: DividerU32,
}
impl U32FastFieldReader {
pub fn min_val(&self,) -> u32 {
self.min_val
}
pub fn max_val(&self,) -> u32 {
self.max_val
}
pub fn open(data: ReadOnlySource) -> io::Result<U32FastFieldReader> {
let min_val;
let amplitude;
{
let mut cursor = data.cursor();
min_val = try!(u32::deserialize(&mut cursor));
amplitude = try!(u32::deserialize(&mut cursor));
}
let num_bits = compute_num_bits(amplitude);
let mask = (1 << num_bits) - 1;
let num_in_pack = 64u32 / (num_bits as u32);
let ptr: *const u8 = &(data.deref()[8 as usize]);
Ok(U32FastFieldReader {
_data: data,
data_ptr: ptr as *const u64,
min_val: min_val,
max_val: min_val + amplitude,
num_bits: num_bits,
mask: mask,
num_in_pack: num_in_pack,
divider: DividerU32::divide_by(num_in_pack),
})
}
pub fn get(&self, doc: DocId) -> u32 {
let long_addr = self.divider.divide(doc);
let ord_within_long = doc - long_addr * self.num_in_pack;
let bit_shift = (self.num_bits as u32) * ord_within_long;
let val_unshifted_unmasked: u64 = unsafe { *self.data_ptr.offset(long_addr as isize) };
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u32;
return self.min_val + (val_shifted & self.mask);
}
}
pub struct U32FastFieldsReader {
source: ReadOnlySource,
field_offsets: HashMap<U32Field, (u32, u32)>,
}
impl U32FastFieldsReader {
pub fn open(source: ReadOnlySource) -> io::Result<U32FastFieldsReader> {
let header_offset;
let field_offsets: Vec<(U32Field, u32)>;
{
let mut cursor = source.cursor();
header_offset = try!(u32::deserialize(&mut cursor));
try!(cursor.seek(SeekFrom::Start(header_offset as u64)));
field_offsets = try!(Vec::deserialize(&mut cursor));
}
let mut end_offsets: Vec<u32> = field_offsets
.iter()
.map(|&(_, offset)| offset.clone())
.collect();
end_offsets.push(header_offset);
let mut field_offsets_map: HashMap<U32Field, (u32, u32)> = HashMap::new();
for (field_start_offsets, stop_offset) in field_offsets.iter().zip(end_offsets.iter().skip(1)) {
let (field, start_offset) = field_start_offsets.clone();
field_offsets_map.insert(field.clone(), (start_offset.clone(), stop_offset.clone()));
}
Ok(U32FastFieldsReader {
field_offsets: field_offsets_map,
source: source,
})
}
pub fn get_field(&self, field: &U32Field) -> io::Result<U32FastFieldReader> {
match self.field_offsets.get(field) {
Some(&(start, stop)) => {
let field_source = self.source.slice(start as usize, stop as usize);
U32FastFieldReader::open(field_source)
}
None => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Could not find field, has it been set as a fast field?"))
}
}
}
}
#[cfg(test)]
mod tests {
use super::compute_num_bits;
use super::U32FastFieldsReader;
use super::U32FastFieldsWriter;
use super::FastFieldSerializer;
use core::schema::U32Field;
use std::path::Path;
use directory::{Directory, WritePtr, RAMDirectory};
use core::schema::Document;
use core::schema::Schema;
use core::schema::FAST_U32;
use core::fastfield::FastFieldSerializer;
use test::Bencher;
use test;
use rand::Rng;
use rand::SeedableRng;
use rand::XorShiftRng;
#[test]
fn test_compute_num_bits() {
assert_eq!(compute_num_bits(1), 1u8);

111
src/fastfield/reader.rs Normal file
View File

@@ -0,0 +1,111 @@
use std::io;
use directory::ReadOnlySource;
use fastfield::DividerU32;
use core::serialize::BinarySerializable;
use DocId;
use std::collections::HashMap;
use core::schema::U32Field;
use std::io::{SeekFrom, Seek};
use std::ops::Deref;
use super::compute_num_bits;
pub struct U32FastFieldReader {
_data: ReadOnlySource,
data_ptr: *const u64,
min_val: u32,
max_val: u32,
num_bits: u8,
mask: u32,
num_in_pack: u32,
divider: DividerU32,
}
impl U32FastFieldReader {
pub fn min_val(&self,) -> u32 {
self.min_val
}
pub fn max_val(&self,) -> u32 {
self.max_val
}
pub fn open(data: ReadOnlySource) -> io::Result<U32FastFieldReader> {
let min_val;
let amplitude;
{
let mut cursor = data.cursor();
min_val = try!(u32::deserialize(&mut cursor));
amplitude = try!(u32::deserialize(&mut cursor));
}
let num_bits = compute_num_bits(amplitude);
let mask = (1 << num_bits) - 1;
let num_in_pack = 64u32 / (num_bits as u32);
let ptr: *const u8 = &(data.deref()[8 as usize]);
Ok(U32FastFieldReader {
_data: data,
data_ptr: ptr as *const u64,
min_val: min_val,
max_val: min_val + amplitude,
num_bits: num_bits,
mask: mask,
num_in_pack: num_in_pack,
divider: DividerU32::divide_by(num_in_pack),
})
}
pub fn get(&self, doc: DocId) -> u32 {
let long_addr = self.divider.divide(doc);
let ord_within_long = doc - long_addr * self.num_in_pack;
let bit_shift = (self.num_bits as u32) * ord_within_long;
let val_unshifted_unmasked: u64 = unsafe { *self.data_ptr.offset(long_addr as isize) };
let val_shifted = (val_unshifted_unmasked >> bit_shift) as u32;
return self.min_val + (val_shifted & self.mask);
}
}
pub struct U32FastFieldsReader {
source: ReadOnlySource,
field_offsets: HashMap<U32Field, (u32, u32)>,
}
impl U32FastFieldsReader {
pub fn open(source: ReadOnlySource) -> io::Result<U32FastFieldsReader> {
let header_offset;
let field_offsets: Vec<(U32Field, u32)>;
{
let mut cursor = source.cursor();
header_offset = try!(u32::deserialize(&mut cursor));
try!(cursor.seek(SeekFrom::Start(header_offset as u64)));
field_offsets = try!(Vec::deserialize(&mut cursor));
}
let mut end_offsets: Vec<u32> = field_offsets
.iter()
.map(|&(_, offset)| offset.clone())
.collect();
end_offsets.push(header_offset);
let mut field_offsets_map: HashMap<U32Field, (u32, u32)> = HashMap::new();
for (field_start_offsets, stop_offset) in field_offsets.iter().zip(end_offsets.iter().skip(1)) {
let (field, start_offset) = field_start_offsets.clone();
field_offsets_map.insert(field.clone(), (start_offset.clone(), stop_offset.clone()));
}
Ok(U32FastFieldsReader {
field_offsets: field_offsets_map,
source: source,
})
}
pub fn get_field(&self, field: &U32Field) -> io::Result<U32FastFieldReader> {
match self.field_offsets.get(field) {
Some(&(start, stop)) => {
let field_source = self.source.slice(start as usize, stop as usize);
U32FastFieldReader::open(field_source)
}
None => {
Err(io::Error::new(io::ErrorKind::InvalidInput, "Could not find field, has it been set as a fast field?"))
}
}
}
}

View File

@@ -0,0 +1,87 @@
use core::serialize::BinarySerializable;
use directory::WritePtr;
use core::schema::U32Field;
use std::io;
use std::io::{SeekFrom, Write};
use super::compute_num_bits;
pub struct FastFieldSerializer {
write: WritePtr,
written_size: usize,
fields: Vec<(U32Field, u32)>,
num_bits: u8,
min_value: u32,
field_open: bool,
mini_buffer_written: usize,
mini_buffer: u64,
}
impl FastFieldSerializer {
pub fn new(mut write: WritePtr) -> io::Result<FastFieldSerializer> {
// just making room for the pointer to header.
let written_size: usize = try!(0u32.serialize(&mut write));
Ok(FastFieldSerializer {
write: write,
written_size: written_size,
fields: Vec::new(),
num_bits: 0u8,
field_open: false,
mini_buffer_written: 0,
mini_buffer: 0,
min_value: 0,
})
}
pub fn new_u32_fast_field(&mut self, field: U32Field, min_value: u32, max_value: u32) -> io::Result<()> {
if self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Previous field not closed"));
}
self.min_value = min_value;
self.field_open = true;
self.fields.push((field, self.written_size as u32));
let write: &mut Write = &mut self.write;
self.written_size += try!(min_value.serialize(write));
let amplitude = max_value - min_value;
self.written_size += try!(amplitude.serialize(write));
self.num_bits = compute_num_bits(amplitude);
Ok(())
}
pub fn add_val(&mut self, val: u32) -> io::Result<()> {
let write: &mut Write = &mut self.write;
if self.mini_buffer_written + (self.num_bits as usize) > 64 {
self.written_size += try!(self.mini_buffer.serialize(write));
self.mini_buffer = 0;
self.mini_buffer_written = 0;
}
self.mini_buffer |= ((val - self.min_value) as u64) << self.mini_buffer_written;
self.mini_buffer_written += self.num_bits as usize;
Ok(())
}
pub fn close_field(&mut self,) -> io::Result<()> {
if !self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Current field is already closed"));
}
self.field_open = false;
if self.mini_buffer_written > 0 {
self.mini_buffer_written = 0;
self.written_size += try!(self.mini_buffer.serialize(&mut self.write));
}
self.mini_buffer = 0;
Ok(())
}
pub fn close(mut self,) -> io::Result<usize> {
if self.field_open {
return Err(io::Error::new(io::ErrorKind::Other, "Last field not closed"));
}
let header_offset: usize = self.written_size;
self.written_size += try!(self.fields.serialize(&mut self.write));
try!(self.write.seek(SeekFrom::Start(0)));
try!((header_offset as u32).serialize(&mut self.write));
Ok(self.written_size)
}
}

76
src/fastfield/writer.rs Normal file
View File

@@ -0,0 +1,76 @@
use core::schema::{Schema, U32Field, Document};
use fastfield::FastFieldSerializer;
use std::io;
pub struct U32FastFieldsWriter {
field_writers: Vec<U32FastFieldWriter>,
}
impl U32FastFieldsWriter {
pub fn from_schema(schema: &Schema) -> U32FastFieldsWriter {
let u32_fields: Vec<U32Field> = schema.get_u32_fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.option.is_fast())
.map(|(field_id, _)| U32Field(field_id as u8))
.collect();
U32FastFieldsWriter::new(u32_fields)
}
pub fn new(fields: Vec<U32Field>) -> U32FastFieldsWriter {
U32FastFieldsWriter {
field_writers: fields
.iter()
.map(|field| U32FastFieldWriter::new(&field))
.collect(),
}
}
pub fn add_document(&mut self, doc: &Document) {
for field_writer in self.field_writers.iter_mut() {
field_writer.add_document(doc);
}
}
pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> {
for field_writer in self.field_writers.iter() {
try!(field_writer.serialize(serializer));
}
Ok(())
}
}
pub struct U32FastFieldWriter {
field: U32Field,
vals: Vec<u32>,
}
impl U32FastFieldWriter {
pub fn new(field: &U32Field) -> U32FastFieldWriter {
U32FastFieldWriter {
field: field.clone(),
vals: Vec::new(),
}
}
pub fn add_val(&mut self, val: u32) {
self.vals.push(val);
}
pub fn add_document(&mut self, doc: &Document) {
let val = doc.get_u32(&self.field).unwrap_or(0u32);
self.add_val(val);
}
pub fn serialize(&self, serializer: &mut FastFieldSerializer) -> io::Result<()> {
let zero = 0;
let min = self.vals.iter().min().unwrap_or(&zero).clone();
let max = self.vals.iter().max().unwrap_or(&min).clone();
try!(serializer.new_u32_fast_field(self.field.clone(), min, max));
for val in self.vals.iter() {
try!(serializer.add_val(val.clone()));
}
serializer.close_field()
}
}

View File

@@ -32,6 +32,7 @@ mod datastruct;
mod postings;
mod directory;
mod compression;
mod fastfield;
pub use directory::Directory;
pub use core::analyzer;