(test not passing)

This commit is contained in:
Paul Masurel
2016-08-29 00:51:45 +09:00
parent 619b65b0f3
commit b10227ea44
6 changed files with 242 additions and 254 deletions

View File

@@ -13,6 +13,7 @@ use std::collections::HashSet;
use indexer::merger::IndexMerger;
use core::SegmentId;
use std::mem::swap;
use postings::BlockStore;
use chan;
use Result;
@@ -33,10 +34,12 @@ pub struct IndexWriter {
const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
fn index_documents(segment: Segment,
fn index_documents(block_store: &mut BlockStore,
segment: Segment,
schema: &Schema,
document_iterator: &mut Iterator<Item=Document>) -> Result<usize> {
let mut segment_writer = try!(SegmentWriter::for_segment(segment, &schema));
block_store.clear();
let mut segment_writer = try!(SegmentWriter::for_segment(block_store, segment, &schema));
for doc in document_iterator {
try!(segment_writer.add_document(&doc, &schema));
}
@@ -63,6 +66,7 @@ impl IndexWriter {
let document_receiver_clone = self.document_receiver.clone();
let target_num_docs = self.target_num_docs;
let join_handle: JoinHandle<()> = thread::spawn(move || {
let mut block_store = BlockStore::allocate(100_000);
loop {
let segment = index.new_segment();
let segment_id = segment.id();
@@ -75,7 +79,7 @@ impl IndexWriter {
// creating a new segment's files
// if no document are available.
if document_iterator.peek().is_some() {
let index_result = index_documents(segment, &schema, &mut document_iterator)
let index_result = index_documents(&mut block_store, segment, &schema, &mut document_iterator)
.map(|num_docs| (segment_id, num_docs));
segment_ready_sender_clone.send(index_result);
}

View File

@@ -19,8 +19,10 @@ use schema::TextIndexingOptions;
use postings::SpecializedPostingsWriter;
use postings::{NothingRecorder, TermFrequencyRecorder, TFAndPositionRecorder};
use indexer::segment_serializer::SegmentSerializer;
use postings::BlockStore;
pub struct SegmentWriter {
pub struct SegmentWriter<'a> {
block_store: &'a mut BlockStore,
max_doc: DocId,
tokenizer: SimpleTokenizer,
per_field_postings_writers: Vec<Box<PostingsWriter>>,
@@ -61,9 +63,9 @@ fn posting_from_field_entry(field_entry: &FieldEntry) -> Box<PostingsWriter> {
}
impl SegmentWriter {
impl<'a> SegmentWriter<'a> {
pub fn for_segment(mut segment: Segment, schema: &Schema) -> Result<SegmentWriter> {
pub fn for_segment(block_store: &'a mut BlockStore, mut segment: Segment, schema: &Schema) -> Result<SegmentWriter<'a>> {
let segment_serializer = try!(SegmentSerializer::for_segment(&mut segment));
let per_field_postings_writers = schema.fields()
.iter()
@@ -72,6 +74,7 @@ impl SegmentWriter {
})
.collect();
Ok(SegmentWriter {
block_store: block_store,
max_doc: 0,
per_field_postings_writers: per_field_postings_writers,
fieldnorms_writer: create_fieldnorms_writer(schema),
@@ -80,7 +83,7 @@ impl SegmentWriter {
fast_field_writers: U32FastFieldsWriter::from_schema(schema),
})
}
// Write on disk all of the stuff that
// is still on RAM :
// - the dictionary in an fst
@@ -91,9 +94,10 @@ impl SegmentWriter {
pub fn finalize(mut self,) -> Result<()> {
let segment_info = self.segment_info();
for per_field_postings_writer in self.per_field_postings_writers.iter_mut() {
per_field_postings_writer.close();
per_field_postings_writer.close(&mut self.block_store);
}
write(&self.per_field_postings_writers,
write(&mut self.block_store,
&self.per_field_postings_writers,
&self.fast_field_writers,
&self.fieldnorms_writer,
segment_info,
@@ -120,7 +124,7 @@ impl SegmentWriter {
match tokens.next() {
Some(token) => {
let term = Term::from_field_text(field, token);
field_posting_writers.suscribe(doc_id, pos, term);
field_posting_writers.suscribe(&mut self.block_store, doc_id, pos, term);
pos += 1;
num_tokens += 1;
},
@@ -130,7 +134,7 @@ impl SegmentWriter {
}
else {
let term = Term::from_field_text(field, field_value.value().text());
field_posting_writers.suscribe(doc_id, 0, term);
field_posting_writers.suscribe(&mut self.block_store, doc_id, 0, term);
}
pos += 1;
// THIS is to avoid phrase query accross field repetition.
@@ -146,7 +150,7 @@ impl SegmentWriter {
if u32_options.is_indexed() {
for field_value in field_values {
let term = Term::from_field_u32(field_value.field(), field_value.value().u32_value());
field_posting_writers.suscribe(doc_id, 0, term);
field_posting_writers.suscribe(&mut self.block_store, doc_id, 0, term);
}
}
}
@@ -181,13 +185,14 @@ impl SegmentWriter {
}
fn write(per_field_postings_writers: &Vec<Box<PostingsWriter>>,
fn write(block_store: &BlockStore,
per_field_postings_writers: &Vec<Box<PostingsWriter>>,
fast_field_writers: &U32FastFieldsWriter,
fieldnorms_writer: &U32FastFieldsWriter,
segment_info: SegmentInfo,
mut serializer: SegmentSerializer) -> Result<()> {
for per_field_postings_writer in per_field_postings_writers.iter() {
try!(per_field_postings_writer.serialize(serializer.get_postings_serializer()));
try!(per_field_postings_writer.serialize(block_store, serializer.get_postings_serializer()));
}
try!(fast_field_writers.serialize(serializer.get_fast_field_serializer()));
try!(fieldnorms_writer.serialize(serializer.get_fieldnorms_serializer()));
@@ -196,9 +201,10 @@ fn write(per_field_postings_writers: &Vec<Box<PostingsWriter>>,
Ok(())
}
impl SerializableSegment for SegmentWriter {
impl<'a> SerializableSegment for SegmentWriter<'a> {
fn write(&self, serializer: SegmentSerializer) -> Result<()> {
write(&self.per_field_postings_writers,
write(&self.block_store,
&self.per_field_postings_writers,
&self.fast_field_writers,
&self.fieldnorms_writer,
self.segment_info(),

View File

@@ -1,7 +1,6 @@
use compression::NUM_DOCS_PER_BLOCK;
use DocId;
const BLOCK_SIZE: u32 = NUM_DOCS_PER_BLOCK as u32;
pub const BLOCK_SIZE: u32 = NUM_DOCS_PER_BLOCK as u32;
struct Block {
data: [u32; BLOCK_SIZE as usize],
@@ -27,36 +26,43 @@ struct ListInfo {
pub struct BlockStore {
lists: Vec<ListInfo>,
blocks: Vec<Block>,
free_blocks: Vec<u32>,
free_block_id: usize,
}
impl BlockStore {
pub fn allocate(num_blocks: usize) -> BlockStore {
pub fn allocate(num_blocks: usize) -> BlockStore {
BlockStore {
lists: Vec::with_capacity(1_000_000),
lists: Vec::with_capacity(100_000),
blocks: (0 .. num_blocks).map(|_| Block::new()).collect(),
free_blocks: (0u32 .. num_blocks as u32).collect()
free_block_id: 0,
}
}
fn new_list(&mut self, first_el: u32) -> u32 {
pub fn new_list(&mut self) -> u32 {
let res = self.lists.len() as u32;
let new_block_id = self.new_block().unwrap();
self.blocks[new_block_id as usize].data[0] = first_el;
self.lists.push(ListInfo {
first: new_block_id,
last: new_block_id,
len: 1,
len: 0,
});
res
}
pub fn clear(&mut self,) {
self.free_block_id = 0;
}
fn new_block(&mut self,) -> Option<u32> {
self.free_blocks.pop()
.map(|block_id| {
self.blocks[block_id as usize].next = u32::max_value();
block_id
})
let block_id = self.free_block_id;
self.free_block_id += 1;
if block_id >= self.blocks.len() {
None
}
else {
self.blocks[block_id].next = u32::max_value();
Some(block_id as u32)
}
}
fn get_list_info(&mut self, list_id: u32) -> &mut ListInfo {
@@ -66,9 +72,7 @@ impl BlockStore {
fn block_id_to_append(&mut self, list_id: u32) -> u32 {
let list_info: ListInfo = self.lists[list_id as usize];
// get_list_info(list_id).len % BLOCK_SIZE == 0;
// let new_block_required: bool = self.get_list_info(list_id).len % BLOCK_SIZE == 0;
if list_info.len % BLOCK_SIZE == 0 {
if list_info.len != 0 && list_info.len % BLOCK_SIZE == 0 {
// we need to add a fresh new block.
let new_block_id: u32 = { self.new_block().unwrap() };
let last_block_id: usize;
@@ -87,7 +91,6 @@ impl BlockStore {
}
pub fn push(&mut self, list_id: u32, val: u32) {
let new_block_required: bool = self.get_list_info(list_id).len % BLOCK_SIZE == 0;
let block_id: u32 = self.block_id_to_append(list_id);
let list_len: u32;
{
@@ -139,107 +142,23 @@ impl<'a> Iterator for BlockIterator<'a> {
}
pub struct BlockAppender {
blocks: Vec<Box<[DocId; NUM_DOCS_PER_BLOCK]>>,
doc_freq: usize,
}
impl BlockAppender {
pub fn new() -> BlockAppender {
BlockAppender {
blocks: Vec::new(),
doc_freq: 0,
}
}
pub fn push(&mut self, doc_id: DocId) {
if self.doc_freq % NUM_DOCS_PER_BLOCK == 0 {
self.blocks.push(Box::new([0u32; NUM_DOCS_PER_BLOCK ]));
}
self.blocks[self.doc_freq / NUM_DOCS_PER_BLOCK][self.doc_freq % NUM_DOCS_PER_BLOCK] = doc_id;
self.doc_freq += 1;
}
pub fn last(&self) -> Option<DocId> {
if self.doc_freq == 0 {
return None
}
else {
Some(self.get(self.doc_freq - 1))
}
}
pub fn len(&self,) -> usize {
self.doc_freq
}
pub fn get(&self, cursor: usize) -> DocId {
self.blocks[cursor / NUM_DOCS_PER_BLOCK][cursor % NUM_DOCS_PER_BLOCK]
}
pub fn iter(&self,) -> IterBlockAppender {
IterBlockAppender {
cursor: 0,
block_appender: &self,
}
}
}
pub struct IterBlockAppender<'a> {
cursor: usize,
block_appender: &'a BlockAppender,
}
impl<'a> Iterator for IterBlockAppender<'a> {
type Item = DocId;
fn next(&mut self) -> Option<u32> {
if self.cursor == self.block_appender.doc_freq {
return None
}
else {
let res = self.block_appender.get(self.cursor);
self.cursor += 1;
Some(res)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
pub fn test_block_store() {
let mut block_store = BlockStore::allocate(1_000);
let list_2 = block_store.new_list(0);
let list_3 = block_store.new_list(0);
let list_4 = block_store.new_list(0);
let list_5 = block_store.new_list(0);
for i in 1 .. 2_000 {
let list_2 = block_store.new_list();
let list_3 = block_store.new_list();
let list_4 = block_store.new_list();
let list_5 = block_store.new_list();
for i in 0 .. 2_000 {
block_store.push(list_2, i * 2);
block_store.push(list_3, i * 3);
}
for i in 1 .. 10 {
for i in 0 .. 10 {
block_store.push(list_4, i * 4);
block_store.push(list_5, i * 5);
}
@@ -262,4 +181,4 @@ mod tests {
assert!(list4_iter.next().is_none());
assert!(list5_iter.next().is_none());
}
}
}

View File

@@ -12,7 +12,7 @@ mod freq_handler;
mod docset;
mod scored_docset;
mod segment_postings_option;
mod block_appender;
mod block_store;
pub use self::docset::{SkipResult, DocSet};
pub use self::offset_postings::OffsetPostings;
@@ -31,6 +31,7 @@ pub use self::freq_handler::FreqHandler;
pub use self::scored_docset::ScoredDocSet;
pub use self::postings::HasLen;
pub use self::segment_postings_option::SegmentPostingsOption;
pub use self::block_store::BlockStore;
#[cfg(test)]
mod tests {
@@ -71,8 +72,9 @@ mod tests {
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
let segment = index.new_segment();
let mut block_store = BlockStore::allocate(1_000);
{
let mut segment_writer = SegmentWriter::for_segment(segment.clone(), &schema).unwrap();
let mut segment_writer = SegmentWriter::for_segment(&mut block_store, segment.clone(), &schema).unwrap();
{
let mut doc = Document::new();
doc.add_text(text_field, "a b a c a d a a.");

View File

@@ -4,73 +4,39 @@ use schema::Term;
use postings::PostingsSerializer;
use std::io;
use postings::Recorder;
use postings::block_appender::BlockAppender;
use postings::block_store::BlockStore;
struct TermPostingsWriter<Rec: Recorder + 'static> {
doc_ids: BlockAppender,
recorder: Rec,
}
impl<Rec: Recorder + 'static> TermPostingsWriter<Rec> {
pub fn new() -> TermPostingsWriter<Rec> {
TermPostingsWriter {
doc_ids: BlockAppender::new(),
recorder: Recorder::new(),
}
}
fn close_doc(&mut self,) {
self.recorder.close_doc();
}
pub fn doc_freq(&self) -> u32 {
self.doc_ids.len() as u32
}
pub fn suscribe(&mut self, doc: DocId, pos: u32) {
match self.doc_ids.last() {
Some(last_doc) => {
if last_doc != doc {
self.close_doc();
self.doc_ids.push(doc);
}
},
None => {
self.doc_ids.push(doc)
},
}
self.recorder.record_position(pos);
}
pub fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> {
for (i, doc) in self.doc_ids.iter().enumerate() {
let (term_freq, position_deltas) = self.recorder.get_tf_and_posdeltas(i);
try!(serializer.write_doc(doc.clone(), term_freq, position_deltas));
}
Ok(())
}
}
pub trait PostingsWriter {
fn close(&mut self,);
fn close(&mut self, block_store: &mut BlockStore);
fn suscribe(&mut self, doc: DocId, pos: u32, term: Term);
fn suscribe(&mut self, block_store: &mut BlockStore, doc: DocId, pos: u32, term: Term);
fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()>;
fn serialize(&self, block_store: &BlockStore, serializer: &mut PostingsSerializer) -> io::Result<()>;
}
pub struct SpecializedPostingsWriter<Rec: Recorder + 'static> {
postings: Vec<TermPostingsWriter<Rec>>,
term_index: HashMap<Term, usize>,
term_index: HashMap<Term, Rec>,
}
fn get_or_create_recorder<'a, Rec: Recorder>(term: Term, term_index: &'a mut HashMap<Term, Rec>, block_store: &mut BlockStore) -> &'a mut Rec {
if term_index.contains_key(&term) {
term_index.get_mut(&term).unwrap()
}
else {
let recorder = Rec::new(block_store);
term_index
.entry(term)
.or_insert(recorder)
}
}
impl<Rec: Recorder + 'static> SpecializedPostingsWriter<Rec> {
pub fn new() -> SpecializedPostingsWriter<Rec> {
SpecializedPostingsWriter {
postings: Vec::new(),
term_index: HashMap::new(),
}
}
@@ -78,48 +44,38 @@ impl<Rec: Recorder + 'static> SpecializedPostingsWriter<Rec> {
pub fn new_boxed() -> Box<PostingsWriter> {
Box::new(Self::new())
}
fn get_term_postings(&mut self, term: Term) -> &mut TermPostingsWriter<Rec> {
let unord_id: usize = {
let num_terms = self.term_index.len();
let postings = &mut self.postings;
self.term_index
.entry(term)
.or_insert_with(|| {
let unord_id = num_terms;
postings.push(TermPostingsWriter::new());
unord_id
}).clone()
};
&mut self.postings[unord_id]
}
}
impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec> {
fn close(&mut self,) {
for term_postings_writer in self.postings.iter_mut() {
term_postings_writer.close_doc();
fn close(&mut self, block_store: &mut BlockStore) {
for recorder in self.term_index.values_mut() {
recorder.close_doc(block_store);
}
}
fn suscribe(&mut self, doc: DocId, pos: u32, term: Term) {
let doc_ids: &mut TermPostingsWriter<Rec> = self.get_term_postings(term);
doc_ids.suscribe(doc, pos);
fn suscribe(&mut self, block_store: &mut BlockStore, doc: DocId, position: u32, term: Term) {
let mut recorder = get_or_create_recorder(term, &mut self.term_index, block_store);
let current_doc = recorder.current_doc();
if current_doc != doc {
if current_doc != u32::max_value() {
recorder.close_doc(block_store);
}
recorder.new_doc(block_store, doc);
}
recorder.record_position(block_store, position);
}
fn serialize(&self, serializer: &mut PostingsSerializer) -> io::Result<()> {
let mut term_offsets: Vec<(&Term, usize)> = self.term_index
fn serialize(&self, block_store: &BlockStore, serializer: &mut PostingsSerializer) -> io::Result<()> {
let mut term_offsets: Vec<(&Term, &Rec)> = self.term_index
.iter()
.map(|(k,v)| (k, *v))
.map(|(k,v)| (k, v))
.collect();
term_offsets.sort();
for (term, postings_id) in term_offsets {
let term_postings_writer = &self.postings[postings_id];
let term_docfreq = term_postings_writer.doc_freq();
try!(serializer.new_term(term, term_docfreq));
try!(term_postings_writer.serialize(serializer));
term_offsets.sort_by_key(|&(k, _v)| k);
for (term, recorder) in term_offsets {
try!(serializer.new_term(term, recorder.doc_freq()));
try!(recorder.serialize(serializer, block_store));
try!(serializer.close_term());
}
Ok(())

View File

@@ -1,98 +1,199 @@
use postings::block_store::BlockStore;
use DocId;
use std::io;
use postings::PostingsSerializer;
const EMPTY_ARRAY: [u32; 0] = [0u32; 0];
const POSITION_END: u32 = 4294967295;
pub trait Recorder {
fn new() -> Self;
fn record_position(&mut self, position: u32);
fn close_doc(&mut self,);
fn get_tf_and_posdeltas(&self, i: usize) -> (u32, &[u32]);
fn current_doc(&self,) -> u32;
fn new(block_store: &mut BlockStore) -> Self;
fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId);
fn record_position(&mut self, block_store: &mut BlockStore, position: u32);
fn close_doc(&mut self, block_store: &mut BlockStore);
fn doc_freq(&self,) -> u32;
fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()>;
}
const EMPTY_ARRAY: [u32; 0] = [];
pub struct NothingRecorder;
pub struct NothingRecorder {
list_id: u32,
current_doc: DocId,
doc_freq: u32,
}
impl Recorder for NothingRecorder {
fn new() -> Self {
NothingRecorder
fn current_doc(&self,) -> DocId {
self.current_doc
}
fn record_position(&mut self, _position: u32) {
fn new(block_store: &mut BlockStore) -> Self {
NothingRecorder {
list_id: block_store.new_list(),
current_doc: u32::max_value(),
doc_freq: 0u32,
}
}
fn close_doc(&mut self,) {
fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId) {
self.current_doc = doc;
block_store.push(self.list_id, doc);
self.doc_freq += 1;
}
fn get_tf_and_posdeltas(&self, _: usize) -> (u32, &[u32]) {
(0u32, &EMPTY_ARRAY)
fn record_position(&mut self, _block_store: &mut BlockStore, _position: u32) {
}
fn close_doc(&mut self, _block_store: &mut BlockStore) {
}
fn doc_freq(&self,) -> u32 {
self.doc_freq
}
fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()> {
let doc_id_iter = block_store.iter_list(self.list_id);
for doc in doc_id_iter {
try!(serializer.write_doc(doc, 0u32, &EMPTY_ARRAY));
}
Ok(())
}
}
pub struct TermFrequencyRecorder {
term_freqs: Vec<u32>,
list_id: u32,
current_doc: DocId,
current_tf: u32,
doc_freq: u32,
}
impl Recorder for TermFrequencyRecorder {
fn new() -> Self {
fn new(block_store: &mut BlockStore) -> Self {
TermFrequencyRecorder {
term_freqs: Vec::new(),
list_id: block_store.new_list(),
current_doc: u32::max_value(),
current_tf: 0u32,
doc_freq: 0u32,
}
}
fn record_position(&mut self, _position: u32) {
fn current_doc(&self,) -> DocId {
self.current_doc
}
fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId) {
self.doc_freq += 1u32;
self.current_doc = doc;
block_store.push(self.list_id, doc);
}
fn record_position(&mut self, _block_store: &mut BlockStore, _position: u32) {
self.current_tf += 1;
}
fn close_doc(&mut self,) {
fn close_doc(&mut self, block_store: &mut BlockStore) {
assert!(self.current_tf > 0);
self.term_freqs.push(self.current_tf);
block_store.push(self.list_id, self.current_tf);
self.current_tf = 0;
}
fn get_tf_and_posdeltas(&self, i: usize) -> (u32, &[u32]) {
(self.term_freqs[i], &EMPTY_ARRAY)
fn doc_freq(&self,) -> u32 {
self.doc_freq
}
fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()> {
let mut doc_iter = block_store.iter_list(self.list_id);
loop {
if let Some(doc) = doc_iter.next() {
if let Some(term_freq) = doc_iter.next() {
try!(serializer.write_doc(doc, term_freq, &EMPTY_ARRAY));
continue;
}
}
break;
}
Ok(())
}
}
pub struct TFAndPositionRecorder {
cumulated_tfs: Vec<u32>,
positions: Vec<u32>,
cumulated_tf: u32,
current_pos: u32,
list_id: u32,
current_doc: DocId,
doc_freq: u32,
}
impl Recorder for TFAndPositionRecorder {
fn new() -> Self {
fn new(block_store: &mut BlockStore) -> Self {
TFAndPositionRecorder {
cumulated_tfs: vec!(0u32),
cumulated_tf: 0u32,
positions: Vec::new(),
current_pos: 0u32,
list_id: block_store.new_list(),
current_doc: u32::max_value(),
doc_freq: 0u32,
}
}
fn record_position(&mut self, position: u32) {
self.cumulated_tf += 1;
self.positions.push(position - self.current_pos);
self.current_pos = position;
fn current_doc(&self,) -> DocId {
self.current_doc
}
fn close_doc(&mut self,) {
self.cumulated_tfs.push(self.cumulated_tf);
self.current_pos = 0;
fn new_doc(&mut self, block_store: &mut BlockStore, doc: DocId) {
self.doc_freq += 1;
self.current_doc = doc;
block_store.push(self.list_id, doc);
}
fn get_tf_and_posdeltas(&self, i: usize) -> (u32, &[u32]) {
let tf = self.cumulated_tfs[i+1] - self.cumulated_tfs[i];
let pos_idx = self.cumulated_tfs[i] as usize;
let posdeltas = &self.positions[pos_idx..pos_idx + tf as usize];
(tf, posdeltas)
fn record_position(&mut self, block_store: &mut BlockStore, position: u32) {
block_store.push(self.list_id, position);
}
fn close_doc(&mut self, block_store: &mut BlockStore) {
block_store.push(self.list_id, POSITION_END);
}
fn doc_freq(&self,) -> u32 {
self.doc_freq
}
fn serialize(&self, serializer: &mut PostingsSerializer, block_store: &BlockStore) -> io::Result<()> {
let mut positions = Vec::with_capacity(100);
let mut doc_iter = block_store.iter_list(self.list_id);
loop {
if let Some(doc) = doc_iter.next() {
let mut prev_position = 0;
positions.clear();
loop {
match doc_iter.next() {
Some(position) => {
if position == POSITION_END {
break;
}
else {
positions.push(position - prev_position);
prev_position = position;
}
}
None => {
panic!("This should never happen. Pleasee report the bug.");
}
}
}
try!(serializer.write_doc(doc, positions.len() as u32, &positions));
}
else {
break;
}
}
Ok(())
}
}