From e6d062bf2d50e3cf786d200789072f63c550236e Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 12 Jan 2026 10:54:32 +0100 Subject: [PATCH] Minor refactoring in PostingsSerializer Removes the Write generics argument in PostingsSerializer. This removes useless generic. Prepares the path for codecs. Removes one useless CountingWrite layer. etc. --- src/postings/segment_postings.rs | 7 ++- src/postings/serializer.rs | 86 +++++++++++++++----------------- 2 files changed, 42 insertions(+), 51 deletions(-) diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index d9ba33eb2..e9046bd3c 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -70,13 +70,13 @@ impl SegmentPostings { let mut buffer = Vec::new(); { let mut postings_serializer = - PostingsSerializer::new(&mut buffer, 0.0, IndexRecordOption::Basic, None); + PostingsSerializer::new(0.0, IndexRecordOption::Basic, None); postings_serializer.new_term(docs.len() as u32, false); for &doc in docs { postings_serializer.write_doc(doc, 1u32); } postings_serializer - .close_term(docs.len() as u32) + .close_term(docs.len() as u32, &mut buffer) .expect("In memory Serialization should never fail."); } let block_segment_postings = BlockSegmentPostings::open( @@ -115,7 +115,6 @@ impl SegmentPostings { }) .unwrap_or(0.0); let mut postings_serializer = PostingsSerializer::new( - &mut buffer, average_field_norm, IndexRecordOption::WithFreqs, fieldnorm_reader, @@ -125,7 +124,7 @@ impl SegmentPostings { postings_serializer.write_doc(doc, tf); } postings_serializer - .close_term(doc_and_tfs.len() as u32) + .close_term(doc_and_tfs.len() as u32, &mut buffer) .unwrap(); let block_segment_postings = BlockSegmentPostings::open( doc_and_tfs.len() as u32, diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index c0ee8483c..08c3c7542 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -104,10 +104,12 @@ impl InvertedIndexSerializer { /// the serialization of a specific field. pub struct FieldSerializer<'a> { term_dictionary_builder: TermDictionaryBuilder<&'a mut CountingWriter>, - postings_serializer: PostingsSerializer<&'a mut CountingWriter>, + postings_serializer: PostingsSerializer, positions_serializer_opt: Option>>, current_term_info: TermInfo, term_open: bool, + postings_write: &'a mut CountingWriter, + postings_start_offset: u64, } impl<'a> FieldSerializer<'a> { @@ -128,27 +130,30 @@ impl<'a> FieldSerializer<'a> { .as_ref() .map(|ff_reader| total_num_tokens as Score / ff_reader.num_docs() as Score) .unwrap_or(0.0); - let postings_serializer = PostingsSerializer::new( - postings_write, - average_fieldnorm, - index_record_option, - fieldnorm_reader, - ); + let postings_serializer = + PostingsSerializer::new(average_fieldnorm, index_record_option, fieldnorm_reader); let positions_serializer_opt = if index_record_option.has_positions() { Some(PositionSerializer::new(positions_write)) } else { None }; + let postings_start_offset = postings_write.written_bytes(); Ok(FieldSerializer { term_dictionary_builder, postings_serializer, positions_serializer_opt, current_term_info: TermInfo::default(), term_open: false, + postings_write, + postings_start_offset, }) } + fn postings_offset(&self) -> usize { + (self.postings_write.written_bytes() - self.postings_start_offset) as usize + } + fn current_term_info(&self) -> TermInfo { let positions_start = if let Some(positions_serializer) = self.positions_serializer_opt.as_ref() { @@ -156,7 +161,7 @@ impl<'a> FieldSerializer<'a> { } else { 0u64 } as usize; - let addr = self.postings_serializer.written_bytes() as usize; + let addr = self.postings_offset(); TermInfo { doc_freq: 0, postings_range: addr..addr, @@ -213,21 +218,22 @@ impl<'a> FieldSerializer<'a> { crate::fail_point!("FieldSerializer::close_term", |msg: Option| { Err(io::Error::new(io::ErrorKind::Other, format!("{msg:?}"))) }); - if self.term_open { - self.postings_serializer - .close_term(self.current_term_info.doc_freq)?; - self.current_term_info.postings_range.end = - self.postings_serializer.written_bytes() as usize; - if let Some(positions_serializer) = self.positions_serializer_opt.as_mut() { - positions_serializer.close_term()?; - self.current_term_info.positions_range.end = - positions_serializer.written_bytes() as usize; - } - self.term_dictionary_builder - .insert_value(&self.current_term_info)?; - self.term_open = false; + if !self.term_open { + return Ok(()); + }; + + self.postings_serializer + .close_term(self.current_term_info.doc_freq, self.postings_write)?; + self.current_term_info.postings_range.end = self.postings_offset(); + if let Some(positions_serializer) = self.positions_serializer_opt.as_mut() { + positions_serializer.close_term()?; + self.current_term_info.positions_range.end = + positions_serializer.written_bytes() as usize; } + self.term_dictionary_builder + .insert_value(&self.current_term_info)?; + self.term_open = false; Ok(()) } @@ -237,7 +243,7 @@ impl<'a> FieldSerializer<'a> { if let Some(positions_serializer) = self.positions_serializer_opt { positions_serializer.close()?; } - self.postings_serializer.close()?; + self.postings_write.flush()?; self.term_dictionary_builder.finish()?; Ok(()) } @@ -291,8 +297,7 @@ impl Block { } } -pub struct PostingsSerializer { - output_write: CountingWriter, +pub struct PostingsSerializer { last_doc_id_encoded: u32, block_encoder: BlockEncoder, @@ -310,16 +315,13 @@ pub struct PostingsSerializer { term_has_freq: bool, } -impl PostingsSerializer { +impl PostingsSerializer { pub fn new( - write: W, avg_fieldnorm: Score, mode: IndexRecordOption, fieldnorm_reader: Option, - ) -> PostingsSerializer { + ) -> PostingsSerializer { PostingsSerializer { - output_write: CountingWriter::wrap(write), - block_encoder: BlockEncoder::new(), block: Box::new(Block::new()), @@ -422,11 +424,11 @@ impl PostingsSerializer { } } - fn close(mut self) -> io::Result<()> { - self.postings_write.flush() - } - - pub fn close_term(&mut self, doc_freq: u32) -> io::Result<()> { + pub fn close_term( + &mut self, + doc_freq: u32, + output_write: &mut impl std::io::Write, + ) -> io::Result<()> { if !self.block.is_empty() { // we have doc ids waiting to be written // this happens when the number of doc ids is @@ -451,26 +453,16 @@ impl PostingsSerializer { } if doc_freq >= COMPRESSION_BLOCK_SIZE as u32 { let skip_data = self.skip_write.data(); - VInt(skip_data.len() as u64).serialize(&mut self.output_write)?; - self.output_write.write_all(skip_data)?; + VInt(skip_data.len() as u64).serialize(output_write)?; + output_write.write_all(skip_data)?; } - self.output_write.write_all(&self.postings_write[..])?; + output_write.write_all(&self.postings_write[..])?; self.skip_write.clear(); self.postings_write.clear(); self.bm25_weight = None; Ok(()) } - /// Returns the number of bytes written in the postings write object - /// at this point. - /// When called before writing the postings of a term, this value is used as - /// start offset. - /// When called after writing the postings of a term, this value is used as a - /// end offset. - fn written_bytes(&self) -> u64 { - self.output_write.written_bytes() - } - fn clear(&mut self) { self.block.clear(); self.last_doc_id_encoded = 0;