postings and fastfields

This commit is contained in:
Paul Masurel
2016-04-05 00:39:31 +09:00
parent 2a0d46f921
commit 6c7650fa1c
5 changed files with 133 additions and 79 deletions

View File

@@ -177,6 +177,7 @@ pub struct U32FastFieldReader {
_data: ReadOnlySource,
data_ptr: *const u64,
min_val: u32,
max_val: u32,
num_bits: u8,
mask: u32,
num_in_pack: u32,
@@ -184,6 +185,15 @@ pub struct U32FastFieldReader {
}
impl U32FastFieldReader {
pub fn min_val(&self,) -> u32 {
self.min_val
}
pub fn max_val(&self,) -> u32 {
self.max_val
}
pub fn open(data: ReadOnlySource) -> io::Result<U32FastFieldReader> {
let min_val;
let amplitude;
@@ -200,6 +210,7 @@ impl U32FastFieldReader {
_data: data,
data_ptr: ptr as *const u64,
min_val: min_val,
max_val: min_val + amplitude,
num_bits: num_bits,
mask: mask,
num_in_pack: num_in_pack,

View File

@@ -5,102 +5,148 @@ use core::schema::DocId;
use core::index::SerializableSegment;
use core::codec::SegmentSerializer;
use core::postings::PostingsSerializer;
use core::postings::TermInfo;
use std::collections::BinaryHeap;
use core::serialize::BinarySerializable;
use core::fstmap::FstMapIter;
use std::cmp::Ordering;
use std::cmp::Ord;
use core::schema::Term;
use core::schema::Schema;
use core::fastfield::FastFieldSerializer;
use core::schema::U32Field;
use std::cmp::min;
use std::cmp::max;
pub struct StreamUnion<'a, V: 'static + BinarySerializable + Ord + Clone> {
streams: Vec<FstMapIter<'a, V>>,
heap: BinaryHeap<(&'a [u8], usize, V)>,
//heap: BinaryHeap<(&'a [u8], usize)>,
// heap: BinaryHeap<usize>,
struct PostingsMerger<'a> {
doc_ids: Vec<DocId>,
doc_offsets: Vec<DocId>,
heap: BinaryHeap<(Vec<u8>, usize, TermInfo)>,
term_streams: Vec<FstMapIter<'a, TermInfo>>,
readers: &'a Vec<SegmentReader>,
}
impl<'a, V: 'static + Ord + BinarySerializable + Clone> StreamUnion<'a, V> {
impl<'a> PostingsMerger<'a> {
fn new(readers: &'a Vec<SegmentReader>) -> PostingsMerger<'a> {
let doc_offsets: Vec<DocId> = readers
.iter()
.map(|reader| reader.max_doc())
.collect();
let term_streams = readers
.iter()
.map(|reader| reader.term_infos().stream())
.collect();
let mut postings_merger = PostingsMerger {
heap: BinaryHeap::new(),
term_streams: term_streams,
doc_ids: Vec::new(),
doc_offsets: doc_offsets,
readers: readers,
};
for segment_ord in 0..readers.len() {
postings_merger.push_next_segment_el(segment_ord);
}
postings_merger
}
pub fn open(mut streams: Vec<FstMapIter<'a, V>>) -> StreamUnion<'a, V> {
let mut heap = BinaryHeap::new();
fn push_next_segment_el(&mut self, segment_ord: usize) {
match self.term_streams[segment_ord].next() {
Some((term, val)) => {
let it = (Vec::from(term), segment_ord, val.clone());
self.heap.push(it);
}
None => {}
}
}
fn append_segment(&mut self, segment_ord: usize, term_info: TermInfo) {
{
let streams_it = streams.iter_mut();
loop {
match streams_it {
Some(fst_map_it) => {
}
None => {
break;
}
}
let offset = self.doc_offsets[segment_ord];
let reader = &self.readers[segment_ord];
for doc_id in reader.read_postings(term_info.postings_offset) {
self.doc_ids.push(offset + doc_id);
}
}
let (k, v) = streams.iter_mut().next().unwrap().next().unwrap();
// for (i, stream) in streams.iter_mut().enumerate() {
// match stream.next() {
// Some(kv) => {
// let (key, val): (&'a [u8], V) = kv;
// //heap.push((key.clone(), i.clone(), val.clone()));
// // let c: &'a [u8] = key;
// // heap.push((key.clone(), i.clone()));
// //heap.push(i.clone());
// // heap.push(i.clone());
// },
// None => {},
// }
// }
StreamUnion {
streams: streams,
heap: heap,
self.push_next_segment_el(segment_ord);
}
fn next(&mut self,) -> Option<(Vec<u8>, &Vec<DocId>)> {
// TODO remove the Vec<u8> allocations
match self.heap.pop() {
Some((term, segment_ord, term_info)) => {
self.doc_ids.clear();
self.append_segment(segment_ord, term_info);
loop {
match self.heap.peek() {
Some(&(ref next_term, _, _)) if next_term == &term => {},
_ => { break; }
}
let (_, segment_ord, next_term_info) = self.heap.pop().unwrap();
self.append_segment(segment_ord, next_term_info);
}
Some((term, &self.doc_ids))
},
None => None
}
}
pub fn next(&mut self) -> Option<(&[u8], V)> {
// match self.heap.pop() {
// Some((k, i, v)) => {
// let mut vals = Vec::new();
// match self.streams[i].next() {
// Some((k_next,v_next)) => {
// self.heap.push((k_next, i, v_next));
// },
// None => {}
// }
//
// },
// None => None,
// }
None
}
}
struct IndexMerger {
schema: Schema,
readers: Vec<SegmentReader>,
offsets: Vec<DocId>,
}
impl IndexMerger {
pub fn open(segments: &Vec<Segment>) -> io::Result<IndexMerger> {
pub fn open(schema: Schema, segments: &Vec<Segment>) -> io::Result<IndexMerger> {
let mut readers = Vec::new();
let mut offsets = Vec::new();
for segment in segments.iter() {
let reader = try!(SegmentReader::open(segment.clone()));
offsets.push(reader.max_doc());
readers.push(reader);
}
Ok(IndexMerger {
schema: schema,
readers: readers,
offsets: offsets,
})
}
fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> io::Result<()> {
for reader in self.readers.iter() {
let term_infos = reader.term_infos();
let term_stream = term_infos.stream();
fn write_fast_fields(&self, fast_field_serializer: &mut FastFieldSerializer) -> io::Result<()> {
for field in self.schema
.get_u32_fields()
.iter()
.enumerate()
.filter(|&(_, field_entry)| field_entry.option.is_fast())
.map(|(field_id, _)| U32Field(field_id as u8)) {
let mut u32_readers = Vec::new();
let mut min_val = u32::min_value();
let mut max_val = 0;
for reader in self.readers.iter() {
let u32_reader = try!(reader.get_fast_field_reader(&field));
min_val = min(min_val, u32_reader.min_val());
max_val = max(max_val, u32_reader.max_val());
u32_readers.push((reader.max_doc(), u32_reader));
}
fast_field_serializer.new_u32_fast_field(field, min_val, max_val);
for (max_doc, u32_reader) in u32_readers {
for doc_id in 0..max_doc {
let val = u32_reader.get(doc_id);
try!(fast_field_serializer.add_val(val));
}
}
try!(fast_field_serializer.close_field());
}
Ok(())
}
fn write_postings(&self, postings_serializer: &mut PostingsSerializer) -> io::Result<()> {
let mut postings_merger = PostingsMerger::new(&self.readers);
loop {
match postings_merger.next() {
Some((term, doc_ids)) => {
try!(postings_serializer.new_term(&Term::from(&term), doc_ids.len() as DocId));
try!(postings_serializer.write_docs(doc_ids));
}
None => { break; }
}
}
Ok(())
}
@@ -109,6 +155,7 @@ impl IndexMerger {
impl SerializableSegment for IndexMerger {
fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> {
try!(self.write_postings(serializer.get_postings_serializer()));
try!(self.write_fast_fields(serializer.get_fast_field_serializer()));
Ok(())
}
}

View File

@@ -11,7 +11,7 @@ use core::serialize::BinarySerializable;
use std::io::{Read, Write};
use std::io;
#[derive(Debug)]
#[derive(Debug,Ord,PartialOrd,Eq,PartialEq,Clone)]
pub struct TermInfo {
pub doc_freq: u32,
pub postings_offset: u32,

View File

@@ -30,7 +30,7 @@ impl fmt::Debug for SegmentReader {
pub struct SegmentPostings {
doc_id: usize,
doc_ids: Vec<u32>,
doc_ids: Vec<DocId>,
}
impl SegmentPostings {
@@ -109,6 +109,7 @@ impl SegmentReader {
self.segment_info.max_doc
}
/// Open a new segment for reading.
pub fn open(segment: Segment) -> io::Result<SegmentReader> {
let segment_info_reader = try!(segment.open_read(SegmentComponent::INFO));
@@ -148,7 +149,7 @@ impl SegmentReader {
self.fast_fields_reader.get_field(u32_field)
}
fn read_postings(&self, offset: u32) -> SegmentPostings {
pub fn read_postings(&self, offset: u32) -> SegmentPostings {
let postings_data = &self.postings_data.as_slice()[(offset as usize)..];
SegmentPostings::from_data(&postings_data)
}

View File

@@ -11,14 +11,12 @@ use core::postings::PostingsWriter;
use core::fastfield::U32FastFieldsWriter;
use std::clone::Clone;
use std::sync::mpsc;
use std::sync::mpsc::channel;
use std::thread;
use std::sync::Mutex;
use std::sync::mpsc::SyncSender;
use std::sync::mpsc::Receiver;
use std::thread::JoinHandle;
use std::sync::Arc;
use std::rc::Rc;
pub struct IndexWriter {
// segment_writers: Vec<SegmentWriter>,
@@ -36,8 +34,7 @@ impl IndexWriter {
let schema = index.schema();
let (queue_input, queue_output): (SyncSender<ArcDoc>, Receiver<ArcDoc>) = mpsc::sync_channel(10_000);
let queue_output_sendable = Arc::new(Mutex::new(queue_output));
let threads = (0..num_threads).map(|thread_id| {
let threads = (0..num_threads).map(|_| {
let queue_output_clone = queue_output_sendable.clone();
let mut index_clone = index.clone();
let schema_clone = schema.clone();
@@ -49,8 +46,6 @@ impl IndexWriter {
let mut docs_remaining = true;
while docs_remaining {
let segment = index_clone.new_segment();
let segment_clone = segment.clone();
let mut doc;
{
match queue_output_clone.lock().unwrap().recv() {
@@ -61,8 +56,7 @@ impl IndexWriter {
let mut segment_writer = SegmentWriter::for_segment(segment.clone(), &schema_clone).unwrap();
segment_writer.add_document(&*doc, &schema_clone).unwrap();
for i in 0..(225_000 - 1) {
for _ in 0..(225_000 - 1) {
{
let queue = queue_output_clone.lock().unwrap();
match queue.recv() {
@@ -96,11 +90,12 @@ impl IndexWriter {
})
}
pub fn wait(self,) {
pub fn wait(self,) -> thread::Result<()> {
drop(self.queue_input);
for thread in self.threads {
thread.join();
try!(thread.join());
}
Ok(())
}
pub fn add_document(&mut self, doc: Document) -> io::Result<()> {