From 3e1e09fa1f8d8485e69bbc6c1e730a52de12e0f0 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Wed, 6 Apr 2016 10:50:26 +0900 Subject: [PATCH] merging doc store --- src/core/codec.rs | 7 ++----- src/core/merger.rs | 33 ++++++++++++++++++++++++++++++++- src/core/reader.rs | 4 ++++ src/core/store.rs | 21 ++++++++++++++++++--- src/core/writer.rs | 34 +++++++++++++++++++--------------- 5 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/core/codec.rs b/src/core/codec.rs index 274b8400e..12a2cf4ca 100644 --- a/src/core/codec.rs +++ b/src/core/codec.rs @@ -7,7 +7,6 @@ use core::index::SegmentComponent; use core::fastfield::FastFieldSerializer; use core::store::StoreWriter; use core::postings::PostingsSerializer; -use core::schema::TextFieldValue; use core::convert_to_ioerror; pub struct SegmentSerializer { @@ -40,10 +39,8 @@ impl SegmentSerializer { &mut self.fast_field_serializer } - pub fn store_doc(&mut self, field_values_it: &mut Iterator) -> io::Result<()> { - let field_values: Vec<&TextFieldValue> = field_values_it.collect(); - try!(self.store_writer.store(&field_values)); - Ok(()) + pub fn get_store_writer(&mut self,) -> &mut StoreWriter { + &mut self.store_writer } pub fn write_segment_info(&mut self, segment_info: &SegmentInfo) -> io::Result<()> { diff --git a/src/core/merger.rs b/src/core/merger.rs index 04dca5a23..f13e8ac05 100644 --- a/src/core/merger.rs +++ b/src/core/merger.rs @@ -11,6 +11,7 @@ use core::fstmap::FstMapIter; use core::schema::Term; use core::schema::Schema; use core::fastfield::FastFieldSerializer; +use core::store::StoreWriter; use core::index::SegmentInfo; use std::cmp::Ordering; use core::schema::U32Field; @@ -188,12 +189,21 @@ impl IndexMerger { } Ok(()) } + + fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> io::Result<()> { + for reader in self.readers.iter() { + let store_reader = reader.get_store_reader(); + try!(store_writer.stack_reader(reader.max_doc(), store_reader)); + } + Ok(()) + } } impl SerializableSegment for IndexMerger { fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> { try!(self.write_postings(serializer.get_postings_serializer())); try!(self.write_fast_fields(serializer.get_fast_field_serializer())); + try!(self.write_storable_fields(serializer.get_store_writer())); try!(serializer.write_segment_info(&self.segment_info)); serializer.close() } @@ -205,12 +215,13 @@ mod tests { use core::schema::Document; use core::index::Index; use core::schema::Term; + use core::searcher::DocAddress; use core::collector::TestCollector; #[test] fn test_index_merger() { let mut schema = schema::Schema::new(); - let text_fieldtype = schema::TextOptions::new().set_tokenized_indexed(); + let text_fieldtype = schema::TextOptions::new().set_tokenized_indexed().set_stored(); let text_field = schema.add_text_field("text", &text_fieldtype); let index = Index::create_in_ram(schema); @@ -281,6 +292,26 @@ mod tests { get_doc_ids(vec!(Term::from_field_text(&text_field, "b"))), vec!(0, 1, 2, 3, 4,) ); + { + let doc = searcher.doc(&DocAddress(0, 0)).unwrap(); + assert_eq!(doc.get_first_text(&text_field).unwrap(), "af b"); + } + { + let doc = searcher.doc(&DocAddress(0, 1)).unwrap(); + assert_eq!(doc.get_first_text(&text_field).unwrap(), "a b c"); + } + { + let doc = searcher.doc(&DocAddress(0, 2)).unwrap(); + assert_eq!(doc.get_first_text(&text_field).unwrap(), "a b c d"); + } + { + let doc = searcher.doc(&DocAddress(0, 3)).unwrap(); + assert_eq!(doc.get_first_text(&text_field).unwrap(), "af b"); + } + // { + // let doc = searcher.doc(&DocAddress(0, 4)).unwrap(); + // assert_eq!(doc.get_first_text(&text_field).unwrap(), "a b c g"); + // } } } } diff --git a/src/core/reader.rs b/src/core/reader.rs index 2fd7640d5..7d77e8c87 100644 --- a/src/core/reader.rs +++ b/src/core/reader.rs @@ -109,6 +109,10 @@ impl SegmentReader { self.segment_info.max_doc } + pub fn get_store_reader(&self,) -> &StoreReader { + &self.store_reader + } + /// Open a new segment for reading. pub fn open(segment: Segment) -> io::Result { diff --git a/src/core/store.rs b/src/core/store.rs index 40c4e26ab..188bcd42f 100644 --- a/src/core/store.rs +++ b/src/core/store.rs @@ -17,6 +17,9 @@ use lz4; const BLOCK_SIZE: usize = 131_072; +#[derive(Debug, Clone, PartialEq, Eq)] +struct OffsetIndex(DocId, u64); + pub struct StoreWriter { doc: DocId, offsets: Vec, // TODO have a better index. @@ -26,9 +29,6 @@ pub struct StoreWriter { current_block: Vec, } -#[derive(Debug, Clone, PartialEq, Eq)] -struct OffsetIndex(DocId, u64); - impl BinarySerializable for OffsetIndex { fn serialize(&self, writer: &mut Write) -> io::Result { let OffsetIndex(a, b) = *self; @@ -54,6 +54,21 @@ impl StoreWriter { } } + pub fn stack_reader(&mut self, num_docs: DocId, reader: &StoreReader) -> io::Result<()> { + println!("num docs {} ", num_docs); + if self.current_block.len() > 0 { + try!(self.write_and_compress_block()); + } + try!(self.writer.write_all(reader.data.as_slice())); + for &OffsetIndex(doc, offset) in reader.offsets.iter() { + println!("{:?}", OffsetIndex(self.doc + doc, self.written + offset)); + self.offsets.push(OffsetIndex(self.doc + doc, self.written + offset)); + } + self.written += reader.data.len() as u64; + self.doc += num_docs; + Ok(()) + } + pub fn store<'a>(&mut self, field_values: &Vec<&'a TextFieldValue>) -> io::Result<()> { self.intermediary_buffer.clear(); try!((field_values.len() as u32).serialize(&mut self.intermediary_buffer)); diff --git a/src/core/writer.rs b/src/core/writer.rs index 71d1a4769..522630649 100644 --- a/src/core/writer.rs +++ b/src/core/writer.rs @@ -153,6 +153,18 @@ pub struct SegmentWriter { } impl SegmentWriter { + + fn for_segment(segment: Segment, schema: &Schema) -> io::Result { + let segment_serializer = try!(SegmentSerializer::for_segment(&segment)); + Ok(SegmentWriter { + max_doc: 0, + postings_writer: PostingsWriter::new(), + segment_serializer: segment_serializer, + tokenizer: SimpleTokenizer::new(), + fast_field_writers: U32FastFieldsWriter::from_schema(schema), + }) + } + // Write on disk all of the stuff that // is still on RAM : // - the dictionary in an fst @@ -170,17 +182,6 @@ impl SegmentWriter { self.segment_serializer.close() } - fn for_segment(segment: Segment, schema: &Schema) -> io::Result { - let segment_serializer = try!(SegmentSerializer::for_segment(&segment)); - Ok(SegmentWriter { - max_doc: 0, - postings_writer: PostingsWriter::new(), - segment_serializer: segment_serializer, - tokenizer: SimpleTokenizer::new(), - fast_field_writers: U32FastFieldsWriter::from_schema(schema), - }) - } - pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> { let doc_id = self.max_doc; for field_value in doc.text_fields() { @@ -205,11 +206,14 @@ impl SegmentWriter { self.postings_writer.suscribe(doc_id, term); } } - let mut stored_fieldvalues_it = doc.text_fields().filter(|text_field_value| { - schema.text_field_options(&text_field_value.field).is_stored() - }); self.fast_field_writers.add_document(&doc); - try!(self.segment_serializer.store_doc(&mut stored_fieldvalues_it)); + + let stored_fieldvalues: Vec<&TextFieldValue> = doc + .text_fields() + .filter(|text_field_value| schema.text_field_options(&text_field_value.field).is_stored()) + .collect(); + let doc_writer = self.segment_serializer.get_store_writer(); + try!(doc_writer.store(&stored_fieldvalues)); self.max_doc += 1; Ok(()) }