This commit is contained in:
Paul Masurel
2016-01-22 09:53:50 +09:00
parent e5eba2a530
commit 2d0054f08b
10 changed files with 161 additions and 330 deletions

View File

@@ -14,3 +14,4 @@ rand = "0.3.13"
atomicwrites = "0.0.14"
tempfile = "2.0.0"
rustc-serialize = "0.3.16"
log = "0.3.5"

View File

@@ -7,71 +7,72 @@ use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use core::directory::Segment;
use core::directory::SegmentComponent;
use core::reader::*;
use core::schema::Term;
use core::DocId;
use std::fs::File;
pub struct SimpleCodec;
pub struct SimpleSegmentSerializer {
written_bytes_postings: usize,
postings_write: File,
term_fst_builder: MapBuilder<File>,
cur_term_num_docs: DocId,
}
impl SegmentSerializer for SimpleSegmentSerializer {
fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()> {
self.term_fst_builder.insert(term.as_slice(), self.written_bytes_postings as u64);
self.cur_term_num_docs = doc_freq;
// writing the size of the posting list
match self.postings_write.write_u32::<LittleEndian>(doc_freq) {
Ok(_) => {},
Err(_) => {
let msg = String::from("Failed writing posting list length");
return Err(Error::WriteError(msg));
},
}
self.written_bytes_postings += 4;
Ok(())
}
fn add_doc(&mut self, doc_id: DocId) -> Result<()> {
match self.postings_write.write_u32::<LittleEndian>(doc_id as u32) {
Ok(_) => {},
Err(_) => {
let msg = String::from("Failed while writing posting list");
return Err(Error::WriteError(msg));
},
}
self.written_bytes_postings += 4;
Ok(())
}
fn close(&mut self,) -> Result<()> {
Ok(())
}
}
impl SimpleCodec {
// fn write_postings<D: DocCursor, W: Write>(doc_it: D, postings: &mut W) -> Result<usize> {
// let mut written_bytes: usize = 4;
// match postings.write_u32::<LittleEndian>(doc_it.len() as u32) {
// Ok(_) => {},
// Err(_) => {
// let msg = String::from("Failed writing posting list length");
// return Err(Error::WriteError(msg));
// },
// }
// for doc_id in doc_it {
// println!(" Doc {}", doc_id);
// match postings.write_u32::<LittleEndian>(doc_id as u32) {
// Ok(_) => {},
// Err(_) => {
// let msg = String::from("Failed while writing posting list");
// return Err(Error::WriteError(msg));
// },
// }
// written_bytes += 4;
// }
// Ok(written_bytes)
// }
// TODO impl packed int
// TODO skip lists
// TODO make that part of the codec API
fn serializer(segment: &Segment) -> Result<SimpleSegmentSerializer> {
let term_write = try!(segment.open_writable(SegmentComponent::TERMS));
let postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS));
let term_fst_builder_result = MapBuilder::new(term_write);
let term_fst_builder = term_fst_builder_result.unwrap();
Ok(SimpleSegmentSerializer {
written_bytes_postings: 0,
postings_write: postings_write,
term_fst_builder: term_fst_builder,
cur_term_num_docs: 0,
})
}
// pub fn write<'a, I: SerializableSegment<'a>>(index: &'a I, segment: &'a Segment) -> Result<usize> {
// let term_write = try!(segment.open_writable(SegmentComponent::TERMS));
// let mut postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS));
// let term_trie_builder_result = MapBuilder::new(term_write);
// if term_trie_builder_result.is_err() {
// // TODO include cause somehow
// return Err(Error::WriteError(String::from("Failed creating the term builder")));
// }
// let mut term_buffer: Vec<u8> = Vec::new();
// let mut term_trie_builder = term_trie_builder_result.unwrap();
// let mut term_cursor = index.term_cursor();
// let mut offset: usize = 0;
// loop {
// match term_cursor.next() {
// Some((term, doc_it)) => {
// println!("{:?}", term);
// term.write_into(&mut term_buffer);
// match term_trie_builder.insert(&term_buffer, offset as u64) {
// Ok(_) => {}
// Err(_) => {
// return Err(Error::WriteError(String::from("Failed while inserting into the fst")))
// },
// }
// offset += try!(SimpleCodec::write_postings(doc_it, &mut postings_write));
// },
// None => {
// break;
// }
// }
// }
// term_trie_builder.finish();
// Ok(0)
//
// }
pub fn write<I: SerializableSegment>(index: &I, segment: &Segment) -> Result<()> {
let mut serializer = try!(SimpleCodec::serializer(segment));
index.write(&mut serializer)
}
}

View File

@@ -116,7 +116,7 @@ impl Directory {
f.write_all(encoded.as_bytes())
});
match write_result {
Ok(_) => Ok(()),
Ok(_) => { Ok(()) },
Err(ioerr) => Err(Error::IOError(ioerr.kind(), format!("Failed to write meta file : {:?}", ioerr))),
}
}

View File

@@ -3,6 +3,7 @@ use std::io;
#[derive(Debug)]
pub enum Error {
NotImplementedYet,
WriteError(String),
IOError(io::ErrorKind, String),
FileNotFound(String),

View File

@@ -2,20 +2,9 @@ use std::fmt;
use std::fmt::{Debug, Formatter};
use std::io::prelude::Read;
use core::global::DocId;
// use std::core::slice;
// use core::schema::{Field, Term};
// use std::slice;
use std::vec;
/////////////////////////////
pub trait PostingsWriter {
fn suscribe(&mut self, DocId);
}
////////////////////////////////////

View File

@@ -12,17 +12,16 @@ pub struct FieldValue {
}
#[derive(Clone,PartialEq,PartialOrd,Eq,Hash)]
#[derive(Clone,PartialEq,PartialOrd,Ord,Eq,Hash)]
pub struct Term {
data: Vec<u8>, // avoid copies
// pub field: Field,
// pub text: &'a [u8],
data: Vec<u8>,
}
impl Term {
// TODO avoid all these copies.
// TODO avoid all these copies in Term.
// when the term is in the dictionary, there
// shouldn't be any copy
pub fn field(&self,) -> Field {
Field(self.data[0])
}
@@ -48,9 +47,8 @@ impl Term {
}
}
pub fn write_into(&self, buf: &mut Vec<u8>) {
buf.clear();
buf.extend(&self.data);
pub fn as_slice(&self,)->&[u8] {
&self.data
}
}

View File

@@ -5,7 +5,8 @@ use core::error::{Result, Error};
// change the API to remove the lifetime, by
// "pushing" the data to a SegmentSerializer.
struct DebugSegmentSerialize {
#[derive(Debug)]
pub struct DebugSegmentSerialize {
text: String,
}
@@ -13,6 +14,12 @@ impl DebugSegmentSerialize {
pub fn to_string(&self,) -> &String {
&self.text
}
pub fn new() -> DebugSegmentSerialize {
DebugSegmentSerialize {
text: String::new(),
}
}
}
impl SegmentSerializer for DebugSegmentSerialize {
@@ -25,11 +32,16 @@ impl SegmentSerializer for DebugSegmentSerialize {
self.text.push_str(&format!(" - Doc {:?}\n", doc_id));
Ok(())
}
fn close(&mut self,) -> Result<()> {
Ok(())
}
}
pub trait SegmentSerializer {
fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()>;
fn add_doc(&mut self, doc_id: DocId) -> Result<()>;
fn close(&mut self,) -> Result<()>;
}
pub trait SerializableSegment {

View File

@@ -8,7 +8,6 @@ use core::directory::Directory;
use core::analyzer::tokenize;
use std::collections::{HashMap, BTreeMap};
use std::collections::{hash_map, btree_map};
use core::postings::PostingsWriter;
use std::io::{BufWriter, Write};
use std::mem;
use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt};
@@ -19,19 +18,18 @@ use std::cell::RefCell;
use std::borrow::BorrowMut;
use core::directory::Segment;
pub struct SimplePostingsWriter {
pub struct PostingsWriter {
doc_ids: Vec<DocId>,
}
impl SimplePostingsWriter {
pub fn new() -> SimplePostingsWriter {
SimplePostingsWriter {
impl PostingsWriter {
pub fn new() -> PostingsWriter {
PostingsWriter {
doc_ids: Vec::new(),
}
}
}
impl PostingsWriter for SimplePostingsWriter {
fn suscribe(&mut self, doc_id: DocId) {
if self.doc_ids.len() == 0 || self.doc_ids[self.doc_ids.len() - 1] < doc_id {
self.doc_ids.push(doc_id);
@@ -39,51 +37,6 @@ impl PostingsWriter for SimplePostingsWriter {
}
}
struct FieldWriter {
postings: Vec<SimplePostingsWriter>,
term_index: BTreeMap<String, usize>,
}
impl FieldWriter {
pub fn new() -> FieldWriter {
FieldWriter {
term_index: BTreeMap::new(),
postings: Vec::new()
}
}
pub fn get_postings_writer(&mut self, term_text: &str) -> &mut SimplePostingsWriter {
match self.term_index.get(term_text) {
Some(unord_id) => {
return &mut self.postings[*unord_id];
},
None => {}
}
let unord_id = self.term_index.len();
self.postings.push(SimplePostingsWriter::new());
self.term_index.insert(String::from(term_text), unord_id.clone());
&mut self.postings[unord_id]
}
pub fn suscribe(&mut self, doc: DocId, term_text: &str) {
self.get_postings_writer(term_text).suscribe(doc);
}
}
pub struct SegmentWriter {
max_doc: DocId,
term_writers: HashMap<Field, FieldWriter>,
}
impl SegmentWriter {
fn new() -> SegmentWriter {
SegmentWriter {
max_doc: 0,
term_writers: HashMap::new(),
}
}
}
pub struct IndexWriter {
segment_writer: SegmentWriter,
directory: Directory,
@@ -115,194 +68,66 @@ impl IndexWriter {
}
pub struct SegmentWriter {
max_doc: DocId,
postings: Vec<PostingsWriter>,
term_index: BTreeMap<Term, usize>,
}
impl SegmentWriter {
fn get_field_writer<'a>(&'a mut self, field: &Field) -> &'a mut FieldWriter {
if !self.term_writers.contains_key(field) {
self.term_writers.insert((*field).clone(), FieldWriter::new());
}
self.term_writers.get_mut(field).unwrap()
}
fn new() -> SegmentWriter {
SegmentWriter {
max_doc: 0,
postings: Vec::new(),
term_index: BTreeMap::new(),
}
}
pub fn add(&mut self, doc: Document) {
let doc_id = self.max_doc;
for field_value in doc {
let field = field_value.field;
let field_writer = self.get_field_writer(&field);
for token in tokenize(&field_value.text) {
field_writer.suscribe(doc_id, token);
let term = Term::from_field_text(field.clone(), token);
self.suscribe(doc_id, term);
}
}
self.max_doc += 1;
}
pub fn get_postings_writer(&mut self, term: Term) -> &mut PostingsWriter {
match self.term_index.get(&term) {
Some(unord_id) => {
return &mut self.postings[*unord_id];
},
None => {}
}
let unord_id = self.term_index.len();
self.postings.push(PostingsWriter::new());
self.term_index.insert(term, unord_id.clone());
&mut self.postings[unord_id]
}
pub fn suscribe(&mut self, doc: DocId, term: Term) {
self.get_postings_writer(term).suscribe(doc);
}
}
//////////////////////////////////
// CIWFormCursor
//
// struct CIWFormCursor<'a> {
// term_it: btree_map::Iter<'a, String, usize>, // term -> postings_idx
// postings_map: &'a Vec<SimplePostingsWriter>, // postings_idx -> postings
// }
//
// struct FormPostings<'a> {
// form: &'a str,
// postings: &'a SimplePostingsWriter,
// }
//
// impl<'a> Iterator for CIWFormCursor<'a> {
// type Item = FormPostings<'a>;
//
// fn next(&mut self,) -> Option<FormPostings<'a>> {
// self.term_it.next()
// .map(|(form, postings_idx)| {
// FormPostings {
// form: form,
// postings: unsafe { self.postings_map.get_unchecked(*postings_idx) }
// }
// })
// }
// }
//////////////////////////////////
// CIWDocCursor
//
//
// pub struct CIWTermCursor<'a> {
// field_it: hash_map::Iter<'a, Field, FieldWriter>,
// form_it: CIWFormCursor<'a>,
// current_form_postings: Option<FormPostings<'a>>,
// field: &'a Field,
// }
//
// impl<'a> CIWTermCursor<'a> {
//
// fn advance(&mut self,) -> bool {
// let next_form = self.next_form();
// if next_form {
// true
// }
// else {
// if self.next_field() {
// self.advance()
// }
// else {
// false
// }
// }
// }
//
// fn get_term(&self) -> Term {
// let field = self.field.clone();
// let value = self.current_form_postings.as_ref().unwrap().form;
// Term::from_field_text(field, value)
// }
//
// fn doc_cursor(&self,) -> CIWDocCursor<'a> {
// let postings = self.current_form_postings
// .as_ref()
// .unwrap()
// .postings;
// let num_docs = postings.doc_ids.len() as DocId;
// CIWDocCursor {
// num_docs: num_docs,
// docs_it: postings
// .doc_ids
// .iter(),
// current: None
// }
// }
//
// fn next_form(&mut self,) -> bool {
// match self.form_it.next() {
// Some(form_postings) => {
// self.current_form_postings = Some(form_postings);
// return true;
// },
// None => { false }
// }
// }
//
// // Advance to the next field
// // sets up form_it to iterate on forms
// // returns true iff there was a next field
// fn next_field(&mut self,) -> bool {
// match self.field_it.next() {
// Some((field, field_writer)) => {
// self.form_it = CIWFormCursor {
// term_it: field_writer.term_index.iter(),
// postings_map: &field_writer.postings,
// };
// self.field = field;
// true
// },
// None => false,
// }
// }
// }
//
// impl<'a> TermCursor for CIWTermCursor<'a> {
//
// type DocCur = CIWDocCursor<'a>;
//
// fn next(&mut self,) -> Option<(Term, CIWDocCursor<'a>)> {
// if self.advance() {
// Some((self.get_term(), self.doc_cursor()))
// }
// else {
// None
// }
// }
// }
//
//
// impl<'a> SerializableSegment<'a> for SegmentWriter {
//
// type TermCur = CIWTermCursor<'a>;
//
// fn term_cursor(&'a self) -> CIWTermCursor<'a> {
// let mut field_it: hash_map::Iter<'a, Field, FieldWriter> = self.term_writers.iter();
// let (field, field_writer) = field_it.next().unwrap();
// CIWTermCursor {
// field_it: field_it,
// form_it: CIWFormCursor {
// term_it: field_writer.term_index.iter(),
// postings_map: &field_writer.postings,
// },
// field: field,
// current_form_postings: None,
// }
// // TODO handle having no fields at all
// }
// }
//
// // TODO add positions
//
// pub struct CIWDocCursor<'a> {
// docs_it: slice::Iter<'a, DocId>,
// current: Option<DocId>,
// num_docs: DocId,
// }
//
// impl<'a> Iterator for CIWDocCursor<'a> {
// type Item=DocId;
//
// fn next(&mut self) -> Option<DocId> {
// self.current = self.docs_it.next().map(|x| *x);
// self.current
// }
// }
//
// impl<'a> DocCursor for CIWDocCursor<'a> {
//
// fn doc(&self,) -> DocId {
// self.current.unwrap()
// }
//
// fn len(&self) -> DocId {
// self.num_docs
// }
// }
impl SerializableSegment for SegmentWriter {
fn write<SegSer: SegmentSerializer>(&self, serializer: &mut SegSer) -> Result<()> {
for (term, postings_id) in self.term_index.iter() {
let doc_ids = &self.postings[postings_id.clone()].doc_ids;
let term_docfreq = doc_ids.len() as u32;
serializer.new_term(&term, term_docfreq);
for doc_id in doc_ids {
serializer.add_doc(doc_id.clone());
}
}
Ok(())
}
}

View File

@@ -2,6 +2,10 @@
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate log;
extern crate fst;
extern crate byteorder;
extern crate memmap;
@@ -10,5 +14,4 @@ extern crate regex;
extern crate rustc_serialize;
extern crate atomicwrites;
pub mod core;

View File

@@ -13,8 +13,6 @@ use tantivy::core::global::*;
use tantivy::core::writer::IndexWriter;
use tantivy::core::directory::{Directory, generate_segment_name, SegmentId};
use std::ops::DerefMut;
use tantivy::core::writer::SimplePostingsWriter;
use tantivy::core::postings::PostingsWriter;
use tantivy::core::reader::SegmentIndexReader;
use std::io::{ BufWriter, Write};
use regex::Regex;
@@ -57,29 +55,32 @@ fn test_indexing() {
doc.set(Field(1), "a b c d");
index_writer.add(doc);
}
let debug_serializer = DebugSegmentSerialize::new();
// let segment_writer = index_writer.current_segment_writer();
let commit_result = index_writer.commit();
println!("{:?}", commit_result);
assert!(commit_result.is_ok());
// reading the segment
println!("------");
{
let segment = commit_result.unwrap();
let index_reader = SegmentIndexReader::open(segment).unwrap();
let mut term_cursor = index_reader.term_cursor();
loop {
match term_cursor.next() {
Some((term, mut doc_cursor)) => {
println!("{:?}", term);
for doc in doc_cursor {
println!(" Doc {}", doc);
}
},
None => {
break;
},
}
}
}
// {
// let segment = commit_result.unwrap();
// let index_reader = SegmentIndexReader::open(segment).unwrap();
// let mut term_cursor = index_reader.term_cursor();
// loop {
// match term_cursor.next() {
// Some((term, mut doc_cursor)) => {
// println!("{:?}", term);
// for doc in doc_cursor {
// println!(" Doc {}", doc);
// }
// },
// None => {
// break;
// },
// }
// }
// }
assert!(false);
}
{