diff --git a/src/core/codec.rs b/src/core/codec.rs index 9e750495f..6d2d663dd 100644 --- a/src/core/codec.rs +++ b/src/core/codec.rs @@ -11,6 +11,7 @@ use core::schema::Term; use core::DocId; use std::fs::File; use core::simdcompression; +use core::schema::FieldValue; pub struct SimpleCodec; @@ -18,14 +19,28 @@ pub struct SimpleCodec; // TODO should we vint? pub struct SimpleSegmentSerializer { + segment: Segment, written_bytes_postings: usize, postings_write: File, + store_write: File, term_fst_builder: MapBuilder, // TODO find an alternative to work around the "move" cur_term_num_docs: DocId, encoder: simdcompression::Encoder, } + +impl SimpleSegmentSerializer { + pub fn segment(&self,) -> Segment { + self.segment.clone() + } +} + impl SegmentSerializer<()> for SimpleSegmentSerializer { + + fn store_doc(&mut self, field: &mut Iterator) { + + } + fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()> { self.term_fst_builder.insert(term.as_slice(), self.written_bytes_postings as u64); self.cur_term_num_docs = doc_freq; @@ -77,14 +92,17 @@ impl SimpleCodec { // TODO impl packed int // TODO skip lists // TODO make that part of the codec API - fn serializer(segment: &Segment) -> Result { + pub fn serializer(segment: &Segment) -> Result { let term_write = try!(segment.open_writable(SegmentComponent::TERMS)); let postings_write = try!(segment.open_writable(SegmentComponent::POSTINGS)); + let store_write = try!(segment.open_writable(SegmentComponent::STORE)); let term_fst_builder_result = MapBuilder::new(term_write); let term_fst_builder = term_fst_builder_result.unwrap(); Ok(SimpleSegmentSerializer { + segment: segment.clone(), written_bytes_postings: 0, postings_write: postings_write, + store_write: store_write, term_fst_builder: term_fst_builder, cur_term_num_docs: 0, encoder: simdcompression::Encoder::new(), @@ -93,7 +111,7 @@ impl SimpleCodec { pub fn write(index: &I, segment: &Segment) -> Result<()> { - let serializer = try!(SimpleCodec::serializer(segment)); - index.write(serializer) + let mut serializer = try!(SimpleCodec::serializer(segment)); + index.write(&mut serializer) } } diff --git a/src/core/directory.rs b/src/core/directory.rs index 107ad70d6..97b781464 100644 --- a/src/core/directory.rs +++ b/src/core/directory.rs @@ -331,6 +331,7 @@ pub enum SegmentComponent { POSTINGS, // POSITIONS, TERMS, + STORE, } #[derive(Debug, Clone)] @@ -352,6 +353,7 @@ impl Segment { SegmentComponent::POSTINGS => ".idx", // SegmentComponent::POSITIONS => ".pos", SegmentComponent::TERMS => ".term", + SegmentComponent::STORE => ".store", } } diff --git a/src/core/serial.rs b/src/core/serial.rs index ea573ff79..117efc8a5 100644 --- a/src/core/serial.rs +++ b/src/core/serial.rs @@ -7,16 +7,18 @@ use std::fmt; pub trait SegmentSerializer { fn new_term(&mut self, term: &Term, doc_freq: DocId) -> Result<()>; fn write_docs(&mut self, docs: &[DocId]) -> Result<()>; // TODO add size + fn store_doc(&mut self, field: &mut Iterator); fn close(self,) -> Result; } pub trait SerializableSegment { - fn write>(&self, serializer: SegSer) -> Result; + fn write>(&self, serializer: &mut SegSer) -> Result; } pub struct DebugSegmentSerializer { text: String, + num_docs: u32, } impl fmt::Debug for DebugSegmentSerializer { @@ -28,13 +30,14 @@ impl fmt::Debug for DebugSegmentSerializer { impl DebugSegmentSerializer { pub fn debug_string(index: &S) -> String { - let serializer = DebugSegmentSerializer::new(); - index.write(serializer).unwrap() + let mut serializer = DebugSegmentSerializer::new(); + index.write(&mut serializer).unwrap() } pub fn new() -> DebugSegmentSerializer { DebugSegmentSerializer { text: String::new(), + num_docs: 0, } } } @@ -46,6 +49,17 @@ impl SegmentSerializer for DebugSegmentSerializer { Ok(()) } + fn store_doc(&mut self, fields: &mut Iterator) { + if self.num_docs == 0 { + self.text.push_str(&format!("# STORED DOC\n======\n")) + } + self.text.push_str(&format!("doc {}", self.num_docs)); + for field_value in fields { + self.text.push_str(&format!("field {:?} |", field_value.field)); + self.text.push_str(&format!("value {:?}\n", field_value.text)); + } + } + fn write_docs(&mut self, docs: &[DocId]) -> Result<()> { for doc in docs { self.text.push_str(&format!(" - Doc {:?}\n", doc)); diff --git a/src/core/simdcompression.rs b/src/core/simdcompression.rs index 7186323a4..0e0299ddf 100644 --- a/src/core/simdcompression.rs +++ b/src/core/simdcompression.rs @@ -24,15 +24,14 @@ impl Encoder { } pub fn encode(&mut self, input: &[u32]) -> &[u32] { - - self.input_buffer.clear(); - let input_len = input.len(); - if input_len >= self.input_buffer.len() { - self.input_buffer = (0..input_len as u32).collect(); - self.output_buffer = (0..input_len as u32 + 1000).collect(); - // TODO use resize when available - } - // TODO use clone_from when available + self.input_buffer.clear(); + let input_len = input.len(); + if input_len >= self.input_buffer.len() { + self.input_buffer = (0..input_len as u32).collect(); + self.output_buffer = (0..input_len as u32 + 1000).collect(); + // TODO use resize when available + } + // TODO use clone_from when available unsafe { ptr::copy_nonoverlapping(input.as_ptr(), self.input_buffer.as_mut_ptr(), input_len); let written_size = encode_native( @@ -60,12 +59,11 @@ impl Decoder { compressed_data: &[u32], uncompressed_values: &mut [u32]) -> size_t { unsafe { - let num_elements = decode_native( + return decode_native( compressed_data.as_ptr(), compressed_data.len() as size_t, uncompressed_values.as_mut_ptr(), uncompressed_values.len() as size_t); - return num_elements; } } } @@ -78,11 +76,11 @@ fn test_encode_big() { let mut encoder = Encoder::new(); let input: Vec = (0..100000).into_iter().collect(); let data = encoder.encode(&input); - assert_eq!(data.len(), 4); + assert_eq!(data.len(), 9578); let decoder = Decoder::new(); - let mut data_output: Vec = (0..10000).collect(); - assert_eq!(10000, decoder.decode(&data[0..4], &mut data_output)); - for i in 0..10000 { + let mut data_output: Vec = (0..100000).collect(); + assert_eq!(100000, decoder.decode(&data[0..9578], &mut data_output)); + for i in 0..100000 { assert_eq!(data_output[i], input[i]) ; } } diff --git a/src/core/writer.rs b/src/core/writer.rs index 1d7fd128b..9ca03fae2 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -4,11 +4,12 @@ use std::slice; use core::global::*; use core::schema::*; use core::codec::*; +use std::rc::Rc; use core::directory::Directory; use core::analyzer::SimpleTokenizer; use std::collections::{HashMap, BTreeMap}; use std::collections::{hash_map, btree_map}; -use std::io::{BufWriter, Write}; +use std::io::{Write}; use std::sync::Arc; use std::mem; use byteorder::{NativeEndian, ReadBytesExt, WriteBytesExt}; @@ -40,24 +41,30 @@ impl PostingsWriter { } pub struct IndexWriter { - segment_writer: SegmentWriter, + segment_writer: Rc, directory: Directory, schema: Schema, } + + fn new_segment_writer(directory: &Directory, ) -> SegmentWriter { + let segment = directory.new_segment(); + SegmentWriter::for_segment(segment) +} + impl IndexWriter { pub fn open(directory: &Directory) -> IndexWriter { let schema = directory.schema(); IndexWriter { - segment_writer: SegmentWriter::new(), + segment_writer: Rc::new(new_segment_writer(&directory)), directory: directory.clone(), schema: schema, } } pub fn add(&mut self, doc: Document) { - self.segment_writer.add(doc, &self.schema); + Rc::get_mut(&mut self.segment_writer).unwrap().add(doc, &self.schema); } // TODO remove that some day @@ -66,14 +73,27 @@ impl IndexWriter { } pub fn commit(&mut self,) -> Result { - let segment = self.directory.new_segment(); - try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz))); - // At this point, the segment is written - // We still need to sync all of the file, as well as the parent directory. - try!(self.directory.sync(segment.clone())); - self.directory.publish_segment(segment.clone()); - self.segment_writer = SegmentWriter::new(); - Ok(segment) + // TODO error handling + let mut segment_writer_rc = self.segment_writer.clone(); + self.segment_writer = Rc::new(new_segment_writer(&self.directory)); + let segment_writer_res = Rc::try_unwrap(segment_writer_rc); + match segment_writer_res { + Ok(segment_writer) => { + let segment = segment_writer.segment(); + segment_writer.write_pending(); + // write(self.segment_serializer); + // try!(SimpleCodec::write(&self.segment_writer, &segment).map(|sz| (segment.clone(), sz))); + // At this point, the segment is written + // We still need to sync all of the file, as well as the parent directory. + try!(self.directory.sync(segment.clone())); + self.directory.publish_segment(segment.clone()); + Ok(segment) + }, + Err(_) => { + panic!("error while acquiring segment writer."); + } + } + } } @@ -85,17 +105,42 @@ pub struct SegmentWriter { postings: Vec, term_index: BTreeMap, tokenizer: SimpleTokenizer, + segment_serializer: SimpleSegmentSerializer, } impl SegmentWriter { - fn new() -> SegmentWriter { + // write on disk all of the stuff that + // are still on RAM. + // for this version, that's the term dictionary + // and the postings + fn write_pending(mut self,) -> Result<()> { + //self.write(&mut self.segment_serializer); + { + for (term, postings_id) in self.term_index.iter() { + let doc_ids = &self.postings[postings_id.clone()].doc_ids; + let term_docfreq = doc_ids.len() as u32; + self.segment_serializer.new_term(&term, term_docfreq); + self.segment_serializer.write_docs(&doc_ids); + } + } + self.segment_serializer.close() + } + + pub fn segment(&self,) -> Segment { + self.segment_serializer.segment() + } + + fn for_segment(segment: Segment) -> SegmentWriter { + // TODO handle error + let segment_serializer = SimpleCodec::serializer(&segment).unwrap(); SegmentWriter { num_tokens: 0, max_doc: 0, postings: Vec::new(), term_index: BTreeMap::new(), tokenizer: SimpleTokenizer::new(), + segment_serializer: segment_serializer, } } @@ -113,6 +158,7 @@ impl SegmentWriter { } } } + self.max_doc += 1; } @@ -131,19 +177,17 @@ impl SegmentWriter { pub fn suscribe(&mut self, doc: DocId, term: Term) { self.get_postings_writer(term).suscribe(doc); - } - -} - -impl SerializableSegment for SegmentWriter { - fn write>(&self, mut serializer: SegSer) -> Result { - for (term, postings_id) in self.term_index.iter() { - let doc_ids = &self.postings[postings_id.clone()].doc_ids; - let term_docfreq = doc_ids.len() as u32; - serializer.new_term(&term, term_docfreq); - serializer.write_docs(&doc_ids); - } - serializer.close() - } } +// +// impl SerializableSegment for SegmentWriter { +// fn write>(&self, serializer: &mut SegSer) -> Result { +// for (term, postings_id) in self.term_index.iter() { +// let doc_ids = &self.postings[postings_id.clone()].doc_ids; +// let term_docfreq = doc_ids.len() as u32; +// serializer.new_term(&term, term_docfreq); +// serializer.write_docs(&doc_ids); +// } +// serializer.close() +// } +// } diff --git a/tests/core.rs b/tests/core.rs index 3d3aa1d58..dbd824c24 100644 --- a/tests/core.rs +++ b/tests/core.rs @@ -94,8 +94,8 @@ fn test_indexing() { index_writer.add(doc); } - let debug_serializer = DebugSegmentSerializer::new(); - let segment_str_before_writing = DebugSegmentSerializer::debug_string(index_writer.current_segment_writer()); + //let debug_serializer = DebugSegmentSerializer::new(); + //let segment_str_before_writing = DebugSegmentSerializer::debug_string(index_writer.current_segment_writer()); let commit_result = index_writer.commit(); assert!(commit_result.is_ok()); let segment = commit_result.unwrap();