Removes TokenStream chain. (#1283)

This change is mostly motivated by the introduction of json object.

We need to be able to inject a position object to make the position
shift.
This commit is contained in:
Paul Masurel
2022-02-21 09:51:27 +09:00
committed by GitHub
parent cef145790c
commit 4dc80cfa25
7 changed files with 37 additions and 238 deletions

View File

@@ -5,12 +5,13 @@ use crate::fastfield::FastFieldsWriter;
use crate::fieldnorm::{FieldNormReaders, FieldNormsWriter};
use crate::indexer::segment_serializer::SegmentSerializer;
use crate::postings::{
compute_table_size, serialize_postings, IndexingContext, PerFieldPostingsWriter, PostingsWriter,
compute_table_size, serialize_postings, IndexingContext, IndexingPosition,
PerFieldPostingsWriter, PostingsWriter,
};
use crate::schema::{Field, FieldEntry, FieldType, FieldValue, Schema, Term, Type, Value};
use crate::store::{StoreReader, StoreWriter};
use crate::tokenizer::{
BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, TokenStreamChain, Tokenizer,
BoxTokenStream, FacetTokenizer, PreTokenizedStream, TextAnalyzer, Tokenizer,
};
use crate::{DocId, Document, Opstamp, SegmentComponent};
@@ -221,19 +222,19 @@ impl SegmentWriter {
}
}
let num_tokens = if token_streams.is_empty() {
0
} else {
let mut token_stream = TokenStreamChain::new(offsets, token_streams);
let mut indexing_position = IndexingPosition::default();
for mut token_stream in token_streams {
postings_writer.index_text(
doc_id,
field,
&mut token_stream,
&mut *token_stream,
term_buffer,
indexing_context,
)
};
self.fieldnorms_writer.record(doc_id, field, num_tokens);
&mut indexing_position,
);
}
self.fieldnorms_writer
.record(doc_id, field, indexing_position.num_tokens);
}
FieldType::U64(_) => {
for value in values {

View File

@@ -21,7 +21,7 @@ pub use self::block_segment_postings::BlockSegmentPostings;
pub(crate) use self::indexing_context::IndexingContext;
pub(crate) use self::per_field_postings_writer::PerFieldPostingsWriter;
pub use self::postings::Postings;
pub(crate) use self::postings_writer::{serialize_postings, PostingsWriter};
pub(crate) use self::postings_writer::{serialize_postings, IndexingPosition, PostingsWriter};
pub use self::segment_postings::SegmentPostings;
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
pub(crate) use self::skip::{BlockInfo, SkipReader};

View File

@@ -18,6 +18,8 @@ use crate::termdict::TermOrdinal;
use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN};
use crate::DocId;
const POSITION_GAP: u32 = 1;
fn make_field_partition(
term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)],
) -> Vec<(Field, Range<usize>)> {
@@ -100,6 +102,12 @@ pub(crate) fn serialize_postings(
Ok(unordered_term_mappings)
}
#[derive(Default)]
pub(crate) struct IndexingPosition {
pub num_tokens: u32,
pub end_position: u32,
}
/// The `PostingsWriter` is in charge of receiving documenting
/// and building a `Segment` in anonymous memory.
///
@@ -138,23 +146,30 @@ pub(crate) trait PostingsWriter {
token_stream: &mut dyn TokenStream,
term_buffer: &mut Term,
indexing_context: &mut IndexingContext,
) -> u32 {
indexing_position: &mut IndexingPosition,
) {
term_buffer.set_field(Type::Str, field);
let mut sink = |token: &Token| {
let mut num_tokens = 0;
let mut end_position = 0;
token_stream.process(&mut |token: &Token| {
// We skip all tokens with a len greater than u16.
if token.text.len() <= MAX_TOKEN_LEN {
term_buffer.set_text(token.text.as_str());
self.subscribe(doc_id, token.position as u32, term_buffer, indexing_context);
} else {
if token.text.len() > MAX_TOKEN_LEN {
warn!(
"A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \
MAX_TOKEN_LEN in the documentation for more information.",
token.text.len(),
MAX_TOKEN_LEN
);
return;
}
};
token_stream.process(&mut sink)
term_buffer.set_text(token.text.as_str());
let start_position = indexing_position.end_position + token.position as u32;
end_position = start_position + token.position_length as u32;
self.subscribe(doc_id, start_position, term_buffer, indexing_context);
num_tokens += 1;
});
indexing_position.end_position = end_position + POSITION_GAP;
indexing_position.num_tokens += num_tokens;
}
fn total_num_tokens(&self) -> u64;

View File

@@ -127,7 +127,6 @@ mod remove_long;
mod simple_tokenizer;
mod stemmer;
mod stop_word_filter;
mod token_stream_chain;
mod tokenized_string;
mod tokenizer;
mod tokenizer_manager;
@@ -143,7 +142,6 @@ pub use self::remove_long::RemoveLongFilter;
pub use self::simple_tokenizer::SimpleTokenizer;
pub use self::stemmer::{Language, Stemmer};
pub use self::stop_word_filter::StopWordFilter;
pub(crate) use self::token_stream_chain::TokenStreamChain;
pub use self::tokenized_string::{PreTokenizedStream, PreTokenizedString};
pub use self::tokenizer::{
BoxTokenFilter, BoxTokenStream, TextAnalyzer, Token, TokenFilter, TokenStream, Tokenizer,

View File

@@ -1,95 +0,0 @@
use std::ops::DerefMut;
use crate::tokenizer::{BoxTokenStream, Token, TokenStream};
const POSITION_GAP: usize = 2;
pub(crate) struct TokenStreamChain<'a> {
offsets: Vec<usize>,
token_streams: Vec<BoxTokenStream<'a>>,
position_shift: usize,
stream_idx: usize,
token: Token,
}
impl<'a> TokenStreamChain<'a> {
pub fn new(
offsets: Vec<usize>,
token_streams: Vec<BoxTokenStream<'a>>,
) -> TokenStreamChain<'a> {
TokenStreamChain {
offsets,
stream_idx: 0,
token_streams,
position_shift: 0,
token: Token::default(),
}
}
}
impl<'a> TokenStream for TokenStreamChain<'a> {
fn advance(&mut self) -> bool {
while self.stream_idx < self.token_streams.len() {
let token_stream = self.token_streams[self.stream_idx].deref_mut();
if token_stream.advance() {
let token = token_stream.token();
let offset_offset = self.offsets[self.stream_idx];
self.token.offset_from = token.offset_from + offset_offset;
self.token.offset_to = token.offset_to + offset_offset;
self.token.position = token.position + self.position_shift;
self.token.text.clear();
self.token.text.push_str(token.text.as_str());
return true;
} else {
self.stream_idx += 1;
self.position_shift = self.token.position.wrapping_add(POSITION_GAP);
}
}
false
}
fn token(&self) -> &Token {
assert!(
self.stream_idx <= self.token_streams.len(),
"You called .token(), after the end of the token stream has been reached"
);
&self.token
}
fn token_mut(&mut self) -> &mut Token {
assert!(
self.stream_idx <= self.token_streams.len(),
"You called .token(), after the end of the token stream has been reached"
);
&mut self.token
}
}
#[cfg(test)]
mod tests {
use super::super::{SimpleTokenizer, TokenStream, Tokenizer};
use super::{TokenStreamChain, POSITION_GAP};
#[test]
fn test_chain_first_emits_no_tokens() {
let token_streams = vec![
SimpleTokenizer.token_stream(""),
SimpleTokenizer.token_stream("hello world"),
];
let mut token_chain = TokenStreamChain::new(vec![0, 0], token_streams);
assert!(token_chain.advance());
assert_eq!(token_chain.token().text, "hello");
assert_eq!(token_chain.token().offset_from, 0);
assert_eq!(token_chain.token().offset_to, 5);
assert_eq!(token_chain.token().position, POSITION_GAP - 1);
assert!(token_chain.advance());
assert_eq!(token_chain.token().text, "world");
assert_eq!(token_chain.token().offset_from, 6);
assert_eq!(token_chain.token().offset_to, 11);
assert_eq!(token_chain.token().position, POSITION_GAP);
assert!(!token_chain.advance());
}
}

View File

@@ -2,7 +2,7 @@ use std::cmp::Ordering;
use serde::{Deserialize, Serialize};
use crate::tokenizer::{BoxTokenStream, Token, TokenStream, TokenStreamChain};
use crate::tokenizer::{Token, TokenStream};
/// Struct representing pre-tokenized text
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
@@ -40,32 +40,6 @@ impl From<PreTokenizedString> for PreTokenizedStream {
}
}
impl PreTokenizedStream {
/// Creates a TokenStream from PreTokenizedString array
pub fn chain_tokenized_strings<'a>(
tok_strings: &'a [&'a PreTokenizedString],
) -> BoxTokenStream {
if tok_strings.len() == 1 {
PreTokenizedStream::from((*tok_strings[0]).clone()).into()
} else {
let mut offsets = vec![];
let mut total_offset = 0;
for &tok_string in tok_strings {
offsets.push(total_offset);
if let Some(last_token) = tok_string.tokens.last() {
total_offset += last_token.offset_to;
}
}
// TODO remove the string cloning.
let token_streams: Vec<BoxTokenStream<'static>> = tok_strings
.iter()
.map(|&tok_string| PreTokenizedStream::from((*tok_string).clone()).into())
.collect();
TokenStreamChain::new(offsets, token_streams).into()
}
}
}
impl TokenStream for PreTokenizedStream {
fn advance(&mut self) -> bool {
self.current_token += 1;
@@ -125,68 +99,4 @@ mod tests {
}
assert!(!token_stream.advance());
}
#[test]
fn test_chain_tokenized_strings() {
let tok_text = PreTokenizedString {
text: String::from("A a"),
tokens: vec![
Token {
offset_from: 0,
offset_to: 1,
position: 0,
text: String::from("A"),
position_length: 1,
},
Token {
offset_from: 2,
offset_to: 3,
position: 1,
text: String::from("a"),
position_length: 1,
},
],
};
let chain_parts = vec![&tok_text, &tok_text];
let mut token_stream = PreTokenizedStream::chain_tokenized_strings(&chain_parts[..]);
let expected_tokens = vec![
Token {
offset_from: 0,
offset_to: 1,
position: 0,
text: String::from("A"),
position_length: 1,
},
Token {
offset_from: 2,
offset_to: 3,
position: 1,
text: String::from("a"),
position_length: 1,
},
Token {
offset_from: 3,
offset_to: 4,
position: 3,
text: String::from("A"),
position_length: 1,
},
Token {
offset_from: 5,
offset_to: 6,
position: 4,
text: String::from("a"),
position_length: 1,
},
];
for expected_token in expected_tokens {
assert!(token_stream.advance());
assert_eq!(token_stream.token(), &expected_token);
}
assert!(!token_stream.advance());
}
}

View File

@@ -5,8 +5,6 @@ use std::ops::{Deref, DerefMut};
use serde::{Deserialize, Serialize};
use crate::tokenizer::TokenStreamChain;
/// Token
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Token {
@@ -84,31 +82,6 @@ impl TextAnalyzer {
self
}
/// Tokenize an array`&str`
///
/// The resulting `BoxTokenStream` is equivalent to what would be obtained if the &str were
/// one concatenated `&str`, with an artificial position gap of `2` between the different fields
/// to prevent accidental `PhraseQuery` to match accross two terms.
pub fn token_stream_texts<'a>(&self, texts: &'a [&'a str]) -> BoxTokenStream<'a> {
assert!(!texts.is_empty());
if texts.len() == 1 {
self.token_stream(texts[0])
} else {
let mut offsets = vec![];
let mut total_offset = 0;
for &text in texts {
offsets.push(total_offset);
total_offset += text.len();
}
let token_streams: Vec<BoxTokenStream<'a>> = texts
.iter()
.cloned()
.map(|text| self.token_stream(text))
.collect();
From::from(TokenStreamChain::new(offsets, token_streams))
}
}
/// Creates a token stream for a given `str`.
pub fn token_stream<'a>(&self, text: &'a str) -> BoxTokenStream<'a> {
let mut token_stream = self.tokenizer.token_stream(text);
@@ -284,13 +257,10 @@ pub trait TokenStream {
/// and push the tokens to a sink function.
///
/// Remove this.
fn process(&mut self, sink: &mut dyn FnMut(&Token)) -> u32 {
let mut num_tokens_pushed = 0u32;
fn process(&mut self, sink: &mut dyn FnMut(&Token)) {
while self.advance() {
sink(self.token());
num_tokens_pushed += 1u32;
}
num_tokens_pushed
}
}