mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-05-23 11:40:40 +00:00
merging doc store
This commit is contained in:
@@ -7,7 +7,6 @@ use core::index::SegmentComponent;
|
||||
use core::fastfield::FastFieldSerializer;
|
||||
use core::store::StoreWriter;
|
||||
use core::postings::PostingsSerializer;
|
||||
use core::schema::TextFieldValue;
|
||||
use core::convert_to_ioerror;
|
||||
|
||||
pub struct SegmentSerializer {
|
||||
@@ -40,10 +39,8 @@ impl SegmentSerializer {
|
||||
&mut self.fast_field_serializer
|
||||
}
|
||||
|
||||
pub fn store_doc(&mut self, field_values_it: &mut Iterator<Item=&TextFieldValue>) -> io::Result<()> {
|
||||
let field_values: Vec<&TextFieldValue> = field_values_it.collect();
|
||||
try!(self.store_writer.store(&field_values));
|
||||
Ok(())
|
||||
pub fn get_store_writer(&mut self,) -> &mut StoreWriter {
|
||||
&mut self.store_writer
|
||||
}
|
||||
|
||||
pub fn write_segment_info(&mut self, segment_info: &SegmentInfo) -> io::Result<()> {
|
||||
|
||||
@@ -11,6 +11,7 @@ use core::fstmap::FstMapIter;
|
||||
use core::schema::Term;
|
||||
use core::schema::Schema;
|
||||
use core::fastfield::FastFieldSerializer;
|
||||
use core::store::StoreWriter;
|
||||
use core::index::SegmentInfo;
|
||||
use std::cmp::Ordering;
|
||||
use core::schema::U32Field;
|
||||
@@ -188,12 +189,21 @@ impl IndexMerger {
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn write_storable_fields(&self, store_writer: &mut StoreWriter) -> io::Result<()> {
|
||||
for reader in self.readers.iter() {
|
||||
let store_reader = reader.get_store_reader();
|
||||
try!(store_writer.stack_reader(reader.max_doc(), store_reader));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl SerializableSegment for IndexMerger {
|
||||
fn write(&self, mut serializer: SegmentSerializer) -> io::Result<()> {
|
||||
try!(self.write_postings(serializer.get_postings_serializer()));
|
||||
try!(self.write_fast_fields(serializer.get_fast_field_serializer()));
|
||||
try!(self.write_storable_fields(serializer.get_store_writer()));
|
||||
try!(serializer.write_segment_info(&self.segment_info));
|
||||
serializer.close()
|
||||
}
|
||||
@@ -205,12 +215,13 @@ mod tests {
|
||||
use core::schema::Document;
|
||||
use core::index::Index;
|
||||
use core::schema::Term;
|
||||
use core::searcher::DocAddress;
|
||||
use core::collector::TestCollector;
|
||||
|
||||
#[test]
|
||||
fn test_index_merger() {
|
||||
let mut schema = schema::Schema::new();
|
||||
let text_fieldtype = schema::TextOptions::new().set_tokenized_indexed();
|
||||
let text_fieldtype = schema::TextOptions::new().set_tokenized_indexed().set_stored();
|
||||
let text_field = schema.add_text_field("text", &text_fieldtype);
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
@@ -281,6 +292,26 @@ mod tests {
|
||||
get_doc_ids(vec!(Term::from_field_text(&text_field, "b"))),
|
||||
vec!(0, 1, 2, 3, 4,)
|
||||
);
|
||||
{
|
||||
let doc = searcher.doc(&DocAddress(0, 0)).unwrap();
|
||||
assert_eq!(doc.get_first_text(&text_field).unwrap(), "af b");
|
||||
}
|
||||
{
|
||||
let doc = searcher.doc(&DocAddress(0, 1)).unwrap();
|
||||
assert_eq!(doc.get_first_text(&text_field).unwrap(), "a b c");
|
||||
}
|
||||
{
|
||||
let doc = searcher.doc(&DocAddress(0, 2)).unwrap();
|
||||
assert_eq!(doc.get_first_text(&text_field).unwrap(), "a b c d");
|
||||
}
|
||||
{
|
||||
let doc = searcher.doc(&DocAddress(0, 3)).unwrap();
|
||||
assert_eq!(doc.get_first_text(&text_field).unwrap(), "af b");
|
||||
}
|
||||
// {
|
||||
// let doc = searcher.doc(&DocAddress(0, 4)).unwrap();
|
||||
// assert_eq!(doc.get_first_text(&text_field).unwrap(), "a b c g");
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,6 +109,10 @@ impl SegmentReader {
|
||||
self.segment_info.max_doc
|
||||
}
|
||||
|
||||
pub fn get_store_reader(&self,) -> &StoreReader {
|
||||
&self.store_reader
|
||||
}
|
||||
|
||||
|
||||
/// Open a new segment for reading.
|
||||
pub fn open(segment: Segment) -> io::Result<SegmentReader> {
|
||||
|
||||
@@ -17,6 +17,9 @@ use lz4;
|
||||
|
||||
const BLOCK_SIZE: usize = 131_072;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct OffsetIndex(DocId, u64);
|
||||
|
||||
pub struct StoreWriter {
|
||||
doc: DocId,
|
||||
offsets: Vec<OffsetIndex>, // TODO have a better index.
|
||||
@@ -26,9 +29,6 @@ pub struct StoreWriter {
|
||||
current_block: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct OffsetIndex(DocId, u64);
|
||||
|
||||
impl BinarySerializable for OffsetIndex {
|
||||
fn serialize(&self, writer: &mut Write) -> io::Result<usize> {
|
||||
let OffsetIndex(a, b) = *self;
|
||||
@@ -54,6 +54,21 @@ impl StoreWriter {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stack_reader(&mut self, num_docs: DocId, reader: &StoreReader) -> io::Result<()> {
|
||||
println!("num docs {} ", num_docs);
|
||||
if self.current_block.len() > 0 {
|
||||
try!(self.write_and_compress_block());
|
||||
}
|
||||
try!(self.writer.write_all(reader.data.as_slice()));
|
||||
for &OffsetIndex(doc, offset) in reader.offsets.iter() {
|
||||
println!("{:?}", OffsetIndex(self.doc + doc, self.written + offset));
|
||||
self.offsets.push(OffsetIndex(self.doc + doc, self.written + offset));
|
||||
}
|
||||
self.written += reader.data.len() as u64;
|
||||
self.doc += num_docs;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn store<'a>(&mut self, field_values: &Vec<&'a TextFieldValue>) -> io::Result<()> {
|
||||
self.intermediary_buffer.clear();
|
||||
try!((field_values.len() as u32).serialize(&mut self.intermediary_buffer));
|
||||
|
||||
@@ -153,6 +153,18 @@ pub struct SegmentWriter {
|
||||
}
|
||||
|
||||
impl SegmentWriter {
|
||||
|
||||
fn for_segment(segment: Segment, schema: &Schema) -> io::Result<SegmentWriter> {
|
||||
let segment_serializer = try!(SegmentSerializer::for_segment(&segment));
|
||||
Ok(SegmentWriter {
|
||||
max_doc: 0,
|
||||
postings_writer: PostingsWriter::new(),
|
||||
segment_serializer: segment_serializer,
|
||||
tokenizer: SimpleTokenizer::new(),
|
||||
fast_field_writers: U32FastFieldsWriter::from_schema(schema),
|
||||
})
|
||||
}
|
||||
|
||||
// Write on disk all of the stuff that
|
||||
// is still on RAM :
|
||||
// - the dictionary in an fst
|
||||
@@ -170,17 +182,6 @@ impl SegmentWriter {
|
||||
self.segment_serializer.close()
|
||||
}
|
||||
|
||||
fn for_segment(segment: Segment, schema: &Schema) -> io::Result<SegmentWriter> {
|
||||
let segment_serializer = try!(SegmentSerializer::for_segment(&segment));
|
||||
Ok(SegmentWriter {
|
||||
max_doc: 0,
|
||||
postings_writer: PostingsWriter::new(),
|
||||
segment_serializer: segment_serializer,
|
||||
tokenizer: SimpleTokenizer::new(),
|
||||
fast_field_writers: U32FastFieldsWriter::from_schema(schema),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn add_document(&mut self, doc: &Document, schema: &Schema) -> io::Result<()> {
|
||||
let doc_id = self.max_doc;
|
||||
for field_value in doc.text_fields() {
|
||||
@@ -205,11 +206,14 @@ impl SegmentWriter {
|
||||
self.postings_writer.suscribe(doc_id, term);
|
||||
}
|
||||
}
|
||||
let mut stored_fieldvalues_it = doc.text_fields().filter(|text_field_value| {
|
||||
schema.text_field_options(&text_field_value.field).is_stored()
|
||||
});
|
||||
self.fast_field_writers.add_document(&doc);
|
||||
try!(self.segment_serializer.store_doc(&mut stored_fieldvalues_it));
|
||||
|
||||
let stored_fieldvalues: Vec<&TextFieldValue> = doc
|
||||
.text_fields()
|
||||
.filter(|text_field_value| schema.text_field_options(&text_field_value.field).is_stored())
|
||||
.collect();
|
||||
let doc_writer = self.segment_serializer.get_store_writer();
|
||||
try!(doc_writer.store(&stored_fieldvalues));
|
||||
self.max_doc += 1;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user