From 4dc80cfa25c7a5f8dfd1467659b6ee58ce3be524 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 21 Feb 2022 09:51:27 +0900 Subject: [PATCH] Removes TokenStream chain. (#1283) This change is mostly motivated by the introduction of json object. We need to be able to inject a position object to make the position shift. --- src/indexer/segment_writer.rs | 21 ++++--- src/postings/mod.rs | 2 +- src/postings/postings_writer.rs | 31 +++++++--- src/tokenizer/mod.rs | 2 - src/tokenizer/token_stream_chain.rs | 95 ----------------------------- src/tokenizer/tokenized_string.rs | 92 +--------------------------- src/tokenizer/tokenizer.rs | 32 +--------- 7 files changed, 37 insertions(+), 238 deletions(-) delete mode 100644 src/tokenizer/token_stream_chain.rs diff --git a/src/indexer/segment_writer.rs b/src/indexer/segment_writer.rs index d2b4fd213..746747e5a 100644 --- a/src/indexer/segment_writer.rs +++ b/src/indexer/segment_writer.rs @@ -5,12 +5,13 @@ use crate::fastfield::FastFieldsWriter; use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter}; use crate::indexer::segment_serializer::SegmentSerializer; use crate::postings::{ - compute_table_size, serialize_postings, IndexingContext, PerFieldPostingsWriter, PostingsWriter, + compute_table_size, serialize_postings, IndexingContext, IndexingPosition, + PerFieldPostingsWriter, PostingsWriter, }; use crate::schema::{Field, FieldEntry, FieldType, FieldValue, Schema, Term, Type, Value}; use crate::store::{StoreReader, StoreWriter}; use crate::tokenizer::{ - BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, TokenStreamChain, Tokenizer, + BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer, }; use crate::{DocId, Document, Opstamp, SegmentComponent}; @@ -221,19 +222,19 @@ impl SegmentWriter { } } - let num_tokens = if token_streams.is_empty() { - 0 - } else { - let mut token_stream = TokenStreamChain::new(offsets, token_streams); + let mut indexing_position = IndexingPosition::default(); + for mut token_stream in token_streams { postings_writer.index_text( doc_id, field, - &mut token_stream, + &mut *token_stream, term_buffer, indexing_context, - ) - }; - self.fieldnorms_writer.record(doc_id, field, num_tokens); + &mut indexing_position, + ); + } + self.fieldnorms_writer + .record(doc_id, field, indexing_position.num_tokens); } FieldType::U64(_) => { for value in values { diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 39b8c2f52..4df8beee7 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -21,7 +21,7 @@ pub use self::block_segment_postings::BlockSegmentPostings; pub(crate) use self::indexing_context::IndexingContext; pub(crate) use self::per_field_postings_writer::PerFieldPostingsWriter; pub use self::postings::Postings; -pub(crate) use self::postings_writer::{serialize_postings, PostingsWriter}; +pub(crate) use self::postings_writer::{serialize_postings, IndexingPosition, PostingsWriter}; pub use self::segment_postings::SegmentPostings; pub use self::serializer::{FieldSerializer, InvertedIndexSerializer}; pub(crate) use self::skip::{BlockInfo, SkipReader}; diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index 6e391ab12..bee4c301c 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -18,6 +18,8 @@ use crate::termdict::TermOrdinal; use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN}; use crate::DocId; +const POSITION_GAP: u32 = 1; + fn make_field_partition( term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)], ) -> Vec<(Field, Range)> { @@ -100,6 +102,12 @@ pub(crate) fn serialize_postings( Ok(unordered_term_mappings) } +#[derive(Default)] +pub(crate) struct IndexingPosition { + pub num_tokens: u32, + pub end_position: u32, +} + /// The `PostingsWriter` is in charge of receiving documenting /// and building a `Segment` in anonymous memory. /// @@ -138,23 +146,30 @@ pub(crate) trait PostingsWriter { token_stream: &mut dyn TokenStream, term_buffer: &mut Term, indexing_context: &mut IndexingContext, - ) -> u32 { + indexing_position: &mut IndexingPosition, + ) { term_buffer.set_field(Type::Str, field); - let mut sink = |token: &Token| { + let mut num_tokens = 0; + let mut end_position = 0; + token_stream.process(&mut |token: &Token| { // We skip all tokens with a len greater than u16. - if token.text.len() <= MAX_TOKEN_LEN { - term_buffer.set_text(token.text.as_str()); - self.subscribe(doc_id, token.position as u32, term_buffer, indexing_context); - } else { + if token.text.len() > MAX_TOKEN_LEN { warn!( "A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \ MAX_TOKEN_LEN in the documentation for more information.", token.text.len(), MAX_TOKEN_LEN ); + return; } - }; - token_stream.process(&mut sink) + term_buffer.set_text(token.text.as_str()); + let start_position = indexing_position.end_position + token.position as u32; + end_position = start_position + token.position_length as u32; + self.subscribe(doc_id, start_position, term_buffer, indexing_context); + num_tokens += 1; + }); + indexing_position.end_position = end_position + POSITION_GAP; + indexing_position.num_tokens += num_tokens; } fn total_num_tokens(&self) -> u64; diff --git a/src/tokenizer/mod.rs b/src/tokenizer/mod.rs index a3af31ad5..506b4d60a 100644 --- a/src/tokenizer/mod.rs +++ b/src/tokenizer/mod.rs @@ -127,7 +127,6 @@ mod remove_long; mod simple_tokenizer; mod stemmer; mod stop_word_filter; -mod token_stream_chain; mod tokenized_string; mod tokenizer; mod tokenizer_manager; @@ -143,7 +142,6 @@ pub use self::remove_long::RemoveLongFilter; pub use self::simple_tokenizer::SimpleTokenizer; pub use self::stemmer::{Language, Stemmer}; pub use self::stop_word_filter::StopWordFilter; -pub(crate) use self::token_stream_chain::TokenStreamChain; pub use self::tokenized_string::{PreTokenizedStream, PreTokenizedString}; pub use self::tokenizer::{ BoxTokenFilter, BoxTokenStream, TextAnalyzer, Token, TokenFilter, TokenStream, Tokenizer, diff --git a/src/tokenizer/token_stream_chain.rs b/src/tokenizer/token_stream_chain.rs deleted file mode 100644 index d5e669dec..000000000 --- a/src/tokenizer/token_stream_chain.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::ops::DerefMut; - -use crate::tokenizer::{BoxTokenStream, Token, TokenStream}; - -const POSITION_GAP: usize = 2; - -pub(crate) struct TokenStreamChain<'a> { - offsets: Vec, - token_streams: Vec>, - position_shift: usize, - stream_idx: usize, - token: Token, -} - -impl<'a> TokenStreamChain<'a> { - pub fn new( - offsets: Vec, - token_streams: Vec>, - ) -> TokenStreamChain<'a> { - TokenStreamChain { - offsets, - stream_idx: 0, - token_streams, - position_shift: 0, - token: Token::default(), - } - } -} - -impl<'a> TokenStream for TokenStreamChain<'a> { - fn advance(&mut self) -> bool { - while self.stream_idx < self.token_streams.len() { - let token_stream = self.token_streams[self.stream_idx].deref_mut(); - if token_stream.advance() { - let token = token_stream.token(); - let offset_offset = self.offsets[self.stream_idx]; - self.token.offset_from = token.offset_from + offset_offset; - self.token.offset_to = token.offset_to + offset_offset; - self.token.position = token.position + self.position_shift; - self.token.text.clear(); - self.token.text.push_str(token.text.as_str()); - return true; - } else { - self.stream_idx += 1; - self.position_shift = self.token.position.wrapping_add(POSITION_GAP); - } - } - false - } - - fn token(&self) -> &Token { - assert!( - self.stream_idx <= self.token_streams.len(), - "You called .token(), after the end of the token stream has been reached" - ); - &self.token - } - - fn token_mut(&mut self) -> &mut Token { - assert!( - self.stream_idx <= self.token_streams.len(), - "You called .token(), after the end of the token stream has been reached" - ); - &mut self.token - } -} - -#[cfg(test)] -mod tests { - use super::super::{SimpleTokenizer, TokenStream, Tokenizer}; - use super::{TokenStreamChain, POSITION_GAP}; - - #[test] - fn test_chain_first_emits_no_tokens() { - let token_streams = vec![ - SimpleTokenizer.token_stream(""), - SimpleTokenizer.token_stream("hello world"), - ]; - let mut token_chain = TokenStreamChain::new(vec![0, 0], token_streams); - - assert!(token_chain.advance()); - assert_eq!(token_chain.token().text, "hello"); - assert_eq!(token_chain.token().offset_from, 0); - assert_eq!(token_chain.token().offset_to, 5); - assert_eq!(token_chain.token().position, POSITION_GAP - 1); - - assert!(token_chain.advance()); - assert_eq!(token_chain.token().text, "world"); - assert_eq!(token_chain.token().offset_from, 6); - assert_eq!(token_chain.token().offset_to, 11); - assert_eq!(token_chain.token().position, POSITION_GAP); - - assert!(!token_chain.advance()); - } -} diff --git a/src/tokenizer/tokenized_string.rs b/src/tokenizer/tokenized_string.rs index 86c4f9907..da20b7a69 100644 --- a/src/tokenizer/tokenized_string.rs +++ b/src/tokenizer/tokenized_string.rs @@ -2,7 +2,7 @@ use std::cmp::Ordering; use serde::{Deserialize, Serialize}; -use crate::tokenizer::{BoxTokenStream, Token, TokenStream, TokenStreamChain}; +use crate::tokenizer::{Token, TokenStream}; /// Struct representing pre-tokenized text #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] @@ -40,32 +40,6 @@ impl From for PreTokenizedStream { } } -impl PreTokenizedStream { - /// Creates a TokenStream from PreTokenizedString array - pub fn chain_tokenized_strings<'a>( - tok_strings: &'a [&'a PreTokenizedString], - ) -> BoxTokenStream { - if tok_strings.len() == 1 { - PreTokenizedStream::from((*tok_strings[0]).clone()).into() - } else { - let mut offsets = vec![]; - let mut total_offset = 0; - for &tok_string in tok_strings { - offsets.push(total_offset); - if let Some(last_token) = tok_string.tokens.last() { - total_offset += last_token.offset_to; - } - } - // TODO remove the string cloning. - let token_streams: Vec> = tok_strings - .iter() - .map(|&tok_string| PreTokenizedStream::from((*tok_string).clone()).into()) - .collect(); - TokenStreamChain::new(offsets, token_streams).into() - } - } -} - impl TokenStream for PreTokenizedStream { fn advance(&mut self) -> bool { self.current_token += 1; @@ -125,68 +99,4 @@ mod tests { } assert!(!token_stream.advance()); } - - #[test] - fn test_chain_tokenized_strings() { - let tok_text = PreTokenizedString { - text: String::from("A a"), - tokens: vec![ - Token { - offset_from: 0, - offset_to: 1, - position: 0, - text: String::from("A"), - position_length: 1, - }, - Token { - offset_from: 2, - offset_to: 3, - position: 1, - text: String::from("a"), - position_length: 1, - }, - ], - }; - - let chain_parts = vec![&tok_text, &tok_text]; - - let mut token_stream = PreTokenizedStream::chain_tokenized_strings(&chain_parts[..]); - - let expected_tokens = vec![ - Token { - offset_from: 0, - offset_to: 1, - position: 0, - text: String::from("A"), - position_length: 1, - }, - Token { - offset_from: 2, - offset_to: 3, - position: 1, - text: String::from("a"), - position_length: 1, - }, - Token { - offset_from: 3, - offset_to: 4, - position: 3, - text: String::from("A"), - position_length: 1, - }, - Token { - offset_from: 5, - offset_to: 6, - position: 4, - text: String::from("a"), - position_length: 1, - }, - ]; - - for expected_token in expected_tokens { - assert!(token_stream.advance()); - assert_eq!(token_stream.token(), &expected_token); - } - assert!(!token_stream.advance()); - } } diff --git a/src/tokenizer/tokenizer.rs b/src/tokenizer/tokenizer.rs index 2de1ce730..e895ad0f1 100644 --- a/src/tokenizer/tokenizer.rs +++ b/src/tokenizer/tokenizer.rs @@ -5,8 +5,6 @@ use std::ops::{Deref, DerefMut}; use serde::{Deserialize, Serialize}; -use crate::tokenizer::TokenStreamChain; - /// Token #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct Token { @@ -84,31 +82,6 @@ impl TextAnalyzer { self } - /// Tokenize an array`&str` - /// - /// The resulting `BoxTokenStream` is equivalent to what would be obtained if the &str were - /// one concatenated `&str`, with an artificial position gap of `2` between the different fields - /// to prevent accidental `PhraseQuery` to match accross two terms. - pub fn token_stream_texts<'a>(&self, texts: &'a [&'a str]) -> BoxTokenStream<'a> { - assert!(!texts.is_empty()); - if texts.len() == 1 { - self.token_stream(texts[0]) - } else { - let mut offsets = vec![]; - let mut total_offset = 0; - for &text in texts { - offsets.push(total_offset); - total_offset += text.len(); - } - let token_streams: Vec> = texts - .iter() - .cloned() - .map(|text| self.token_stream(text)) - .collect(); - From::from(TokenStreamChain::new(offsets, token_streams)) - } - } - /// Creates a token stream for a given `str`. pub fn token_stream<'a>(&self, text: &'a str) -> BoxTokenStream<'a> { let mut token_stream = self.tokenizer.token_stream(text); @@ -284,13 +257,10 @@ pub trait TokenStream { /// and push the tokens to a sink function. /// /// Remove this. - fn process(&mut self, sink: &mut dyn FnMut(&Token)) -> u32 { - let mut num_tokens_pushed = 0u32; + fn process(&mut self, sink: &mut dyn FnMut(&Token)) { while self.advance() { sink(self.token()); - num_tokens_pushed += 1u32; } - num_tokens_pushed } }