From 930010aa88c80265dc404d5e9e7ed9b41e3e05dd Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Sun, 28 Jan 2018 00:03:51 +0900 Subject: [PATCH] Unit test passing --- Cargo.toml | 1 - src/collector/count_collector.rs | 6 +-- src/collector/facet_collector.rs | 34 ++++++------ src/core/inverted_index_reader.rs | 9 ++-- src/core/segment_reader.rs | 2 +- src/directory/mmap_directory.rs | 1 - src/directory/mod.rs | 3 -- src/fastfield/multivalued/reader.rs | 15 +++--- src/indexer/index_writer.rs | 21 ++------ src/indexer/merger.rs | 7 ++- src/lib.rs | 1 - src/schema/document.rs | 22 ++++++++ src/schema/facet.rs | 7 +++ src/schema/schema.rs | 4 +- src/store/reader.rs | 13 +++-- src/store/writer.rs | 10 ++-- src/termdict/mod.rs | 22 ++++++++ src/tokenizer/facet_tokenizer.rs | 81 ++++++++++++++++++++++------- 18 files changed, 164 insertions(+), 95 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b7759043e..2936fa760 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ tempdir = "0.3" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -bincode = "0.8" libc = { version = "0.2.20", optional=true } num_cpus = "1.2" itertools = "0.5.9" diff --git a/src/collector/count_collector.rs b/src/collector/count_collector.rs index 1fd9613ec..67f563f1a 100644 --- a/src/collector/count_collector.rs +++ b/src/collector/count_collector.rs @@ -7,6 +7,7 @@ use SegmentLocalId; /// `CountCollector` collector only counts how many /// documents match the query. +#[derive(Default)] pub struct CountCollector { count: usize, } @@ -19,11 +20,6 @@ impl CountCollector { } } -impl Default for CountCollector { - fn default() -> CountCollector { - CountCollector { count: 0 } - } -} impl Collector for CountCollector { fn set_segment(&mut self, _: SegmentLocalId, _: &SegmentReader) -> Result<()> { diff --git a/src/collector/facet_collector.rs b/src/collector/facet_collector.rs index 2464af8ca..1fe4210f0 100644 --- a/src/collector/facet_collector.rs +++ b/src/collector/facet_collector.rs @@ -6,6 +6,7 @@ use std::cell::UnsafeCell; use schema::Facet; use std::collections::BTreeMap; use std::collections::BinaryHeap; +use std::collections::Bound; use termdict::TermDictionary; use termdict::TermStreamer; use termdict::TermStreamerBuilder; @@ -14,6 +15,8 @@ use termdict::TermMerger; use postings::SkipResult; use std::{u64, usize}; use schema::FACET_SEP_BYTE; +use std::iter::Peekable; + use DocId; use Result; @@ -218,12 +221,8 @@ pub struct FacetCollector { collapse: BTreeSet>, } - -use std::iter::Peekable; - fn skip<'a, I: Iterator>>(target: &[u8], collapse_it: &mut Peekable) -> SkipResult { loop { - println!("collapse {:?}, target {:?}", target, collapse_it.peek()); match collapse_it.peek() { Some(facet_bytes) => { match facet_bytes[..].cmp(&target) { @@ -270,7 +269,8 @@ impl FacetCollector { pub fn add_facet(&mut self, facet_from: T) where Facet: From { let facet = Facet::from(facet_from); - let facet_bytes = facet.encoded_bytes(); + let facet_bytes: &[u8] = facet.encoded_bytes(); + self.collapse.remove(&facet_bytes[..0]); for pos in facet_bytes.iter() .cloned() .position(|b| b == FACET_SEP_BYTE) { @@ -306,7 +306,7 @@ impl FacetCollector { } 'outer: loop { // at the begining of this loop, facet_streamer - // is position on a term that has not been processed yet. + // is positionned on a term that has not been processed yet. let skip_result = skip(facet_streamer.key(), &mut collapse_facet_it); match skip_result { SkipResult::Reached => { @@ -316,7 +316,6 @@ impl FacetCollector { self.current_segment_collapse_mapping.push(0); while facet_streamer.advance() { let depth = facet_depth(facet_streamer.key()); - println!("depth {}", depth); if depth <= collapse_depth { continue 'outer; } else if depth == collapse_depth + 1 { @@ -387,7 +386,6 @@ impl FacetCollector { .unwrap_or(0) }) .sum(); - println!("{:?} count {}", facet_merger.key(), count); if count > 0u64 { let bytes = facet_merger.key().to_owned(); facet_counts.insert(Facet::from_encoded(bytes), count); @@ -449,11 +447,19 @@ impl FacetCounts { pub fn get<'a, T>(&'a self, facet_from: T) -> impl Iterator where Facet: From { let facet = Facet::from(facet_from); - let mut facet_after_bytes = facet.encoded_bytes().to_owned(); - facet_after_bytes.push(1u8); - let facet_after = Facet::from_encoded(facet_after_bytes); + let left_bound = Bound::Excluded(facet.clone()); + let right_bound = + if facet.is_root() { + Bound::Unbounded + } else { + let mut facet_after_bytes = facet.encoded_bytes().to_owned(); + facet_after_bytes.push(1u8); + let facet_after = Facet::from_encoded(facet_after_bytes); + Bound::Excluded(facet_after) + }; + self.facet_counts - .range(facet.clone()..facet_after) + .range((left_bound, right_bound)) .map(|(facet, count)| (facet, *count)) } @@ -529,8 +535,6 @@ mod tests { index_writer.add_document(doc); } index_writer.commit().unwrap(); - - index.load_searchers().unwrap(); let searcher = index.searcher(); @@ -589,11 +593,9 @@ mod tests { let mut facet_collector = FacetCollector::for_field(facet_field); facet_collector.add_facet("/"); - searcher.search(&AllQuery, &mut facet_collector).unwrap(); let counts: FacetCounts = facet_collector.harvest(); - { let facets: Vec<(&Facet, u64)> = counts.top_k("/", 3); assert_eq!( diff --git a/src/core/inverted_index_reader.rs b/src/core/inverted_index_reader.rs index aade33e54..06a73e484 100644 --- a/src/core/inverted_index_reader.rs +++ b/src/core/inverted_index_reader.rs @@ -49,7 +49,7 @@ impl InvertedIndexReader { /// Returns the term info associated with the term. pub fn get_term_info(&self, term: &Term) -> Option { - self.termdict.get(term.as_slice()) + self.termdict.get(term.value_bytes()) } /// Return the term dictionary datastructure. @@ -144,9 +144,8 @@ impl InvertedIndexReader { /// Returns the number of documents containing the term. pub fn doc_freq(&self, term: &Term) -> u32 { - match self.get_term_info(term) { - Some(term_info) => term_info.doc_freq, - None => 0, - } + self.get_term_info(term) + .map(|term_info| term_info.doc_freq) + .unwrap_or(0u32) } } diff --git a/src/core/segment_reader.rs b/src/core/segment_reader.rs index c9ee67073..939873d08 100644 --- a/src/core/segment_reader.rs +++ b/src/core/segment_reader.rs @@ -216,7 +216,7 @@ impl SegmentReader { .expect("Lock poisoned. This should never happen") .get(&field) { - Arc::clone(inv_idx_reader); + return Arc::clone(inv_idx_reader); } let termdict_source: ReadOnlySource = self.termdict_composite diff --git a/src/directory/mmap_directory.rs b/src/directory/mmap_directory.rs index 1893519f5..5dc1166af 100644 --- a/src/directory/mmap_directory.rs +++ b/src/directory/mmap_directory.rs @@ -414,7 +414,6 @@ mod tests { assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths); } for (i, path) in paths.iter().enumerate() { - println!("delete paths {:?}", path); mmap_directory.delete(path).unwrap(); assert_eq!(mmap_directory.get_cache_info().mmapped.len(), num_paths - i - 1); } diff --git a/src/directory/mod.rs b/src/directory/mod.rs index 99200c6e3..7d1c6fb17 100644 --- a/src/directory/mod.rs +++ b/src/directory/mod.rs @@ -116,9 +116,6 @@ mod tests { assert!(directory.open_read(*TEST_PATH).is_err()); let _w = directory.open_write(*TEST_PATH).unwrap(); assert!(directory.exists(*TEST_PATH)); - if let Err(e) = directory.open_read(*TEST_PATH) { - println!("{:?}", e); - } assert!(directory.open_read(*TEST_PATH).is_ok()); assert!(directory.delete(*TEST_PATH).is_ok()); } diff --git a/src/fastfield/multivalued/reader.rs b/src/fastfield/multivalued/reader.rs index afd4220d2..bbf5102ac 100644 --- a/src/fastfield/multivalued/reader.rs +++ b/src/fastfield/multivalued/reader.rs @@ -78,34 +78,35 @@ mod tests { let mut facet = Facet::root(); { - facet_reader.facet_from_ord(0, &mut facet); + facet_reader.facet_from_ord(1, &mut facet); assert_eq!(facet, Facet::from("/category")); } { - facet_reader.facet_from_ord(1, &mut facet); + facet_reader.facet_from_ord(2, &mut facet); assert_eq!(facet, Facet::from("/category/cat1")); } { - facet_reader.facet_from_ord(2, &mut facet); + facet_reader.facet_from_ord(3, &mut facet); + assert_eq!(format!("{}", facet), "/category/cat2"); assert_eq!(facet, Facet::from("/category/cat2")); } { - facet_reader.facet_from_ord(3, &mut facet); + facet_reader.facet_from_ord(4, &mut facet); assert_eq!(facet, Facet::from("/category/cat3")); } let mut vals = Vec::new(); { facet_reader.facet_ords(0, &mut vals); - assert_eq!(&vals[..], &[2, 1]); + assert_eq!(&vals[..], &[3, 2]); } { facet_reader.facet_ords(1, &mut vals); - assert_eq!(&vals[..], &[2]); + assert_eq!(&vals[..], &[3]); } { facet_reader.facet_ords(2, &mut vals); - assert_eq!(&vals[..], &[3]); + assert_eq!(&vals[..], &[4]); } diff --git a/src/indexer/index_writer.rs b/src/indexer/index_writer.rs index 083edf11b..d47c7fdc6 100644 --- a/src/indexer/index_writer.rs +++ b/src/indexer/index_writer.rs @@ -701,32 +701,21 @@ mod tests { let num_docs_containing = |s: &str| { let searcher = index.searcher(); - let term_a = Term::from_field_text(text_field, s); - searcher.doc_freq(&term_a) + let term = Term::from_field_text(text_field, s); + searcher.doc_freq(&term) }; { // writing the segment let mut index_writer = index.writer_with_num_threads(3, 40_000_000).unwrap(); - { - let mut doc = Document::default(); - doc.add_text(text_field, "a"); - index_writer.add_document(doc); - } + index_writer.add_document(doc!(text_field=>"a")); index_writer.rollback().unwrap(); assert_eq!(index_writer.commit_opstamp(), 0u64); assert_eq!(num_docs_containing("a"), 0); - { - let mut doc = Document::default(); - doc.add_text(text_field, "b"); - index_writer.add_document(doc); - } - { - let mut doc = Document::default(); - doc.add_text(text_field, "c"); - index_writer.add_document(doc); + index_writer.add_document(doc!(text_field=>"b")); + index_writer.add_document(doc!(text_field=>"c")); } assert_eq!(index_writer.commit().unwrap(), 2u64); index.load_searchers().unwrap(); diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 39bb7225e..901d83fcd 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -17,7 +17,6 @@ use fastfield::FastFieldReader; use store::StoreWriter; use std::cmp::{max, min}; use termdict::TermDictionary; -use schema::Term; use termdict::TermStreamer; pub struct IndexMerger { @@ -261,7 +260,7 @@ impl IndexMerger { ); while merged_terms.advance() { - let term = Term::wrap(merged_terms.key()); + let term_bytes: &[u8] = merged_terms.key(); // Let's compute the list of non-empty posting lists let segment_postings: Vec<_> = merged_terms @@ -271,7 +270,7 @@ impl IndexMerger { let segment_ord = heap_item.segment_ord; let term_info = heap_item.streamer.value(); let segment_reader = &self.readers[heap_item.segment_ord]; - let inverted_index = segment_reader.inverted_index(term.field()); + let inverted_index = segment_reader.inverted_index(indexed_field); let mut segment_postings = inverted_index .read_postings_from_terminfo(term_info, segment_postings_option); if segment_postings.advance() { @@ -292,7 +291,7 @@ impl IndexMerger { // We know that there is at least one document containing // the term, so we add it. - field_serializer.new_term(term.as_ref())?; + field_serializer.new_term(term_bytes)?; // We can now serialize this postings, by pushing each document to the // postings serializer. diff --git a/src/lib.rs b/src/lib.rs index 506c3241d..46f537067 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,7 +56,6 @@ extern crate stable_deref_trait; extern crate tempdir; extern crate time; extern crate uuid; -extern crate bincode; #[cfg(test)] extern crate env_logger; diff --git a/src/schema/document.rs b/src/schema/document.rs index 972e47fe4..7c15d9c16 100644 --- a/src/schema/document.rs +++ b/src/schema/document.rs @@ -1,5 +1,8 @@ use super::*; use itertools::Itertools; +use common::VInt; +use std::io::{self, Read, Write}; +use common::BinarySerializable; /// Tantivy's Document is the object that can /// be indexed and then searched for. @@ -128,7 +131,26 @@ impl Document { } } +impl BinarySerializable for Document { + fn serialize(&self, writer: &mut W) -> io::Result<()> { + let field_values = self.field_values(); + VInt(field_values.len() as u64).serialize(writer)?; + for field_value in field_values { + field_value.serialize(writer)?; + } + Ok(()) + } + fn deserialize(reader: &mut R) -> io::Result { + let num_field_values = VInt::deserialize(reader)?.val() as usize; + let field_values = (0..num_field_values) + .map(|_| { + FieldValue::deserialize(reader) + }) + .collect::>>()?; + Ok(Document::from(field_values)) + } +} #[cfg(test)] mod tests { diff --git a/src/schema/facet.rs b/src/schema/facet.rs index a41df1b49..d8f61eda9 100644 --- a/src/schema/facet.rs +++ b/src/schema/facet.rs @@ -186,6 +186,13 @@ mod tests { use super::Facet; + #[test] + fn test_root() { + assert_eq!(Facet::root(), Facet::from("/")); + assert_eq!(format!("{}", Facet::root()), "/"); + assert!(Facet::root().is_root()); + } + #[test] fn test_facet_display() { { diff --git a/src/schema/schema.rs b/src/schema/schema.rs index 057f2c598..864992790 100644 --- a/src/schema/schema.rs +++ b/src/schema/schema.rs @@ -370,7 +370,7 @@ mod tests { "type": "u64", "options": { "indexed": false, - "fast": true, + "fast": "single", "stored": true } }, @@ -379,7 +379,7 @@ mod tests { "type": "i64", "options": { "indexed": false, - "fast": true, + "fast": "single", "stored": true } } diff --git a/src/store/reader.rs b/src/store/reader.rs index f00b35bee..4ed8cdab2 100644 --- a/src/store/reader.rs +++ b/src/store/reader.rs @@ -7,7 +7,7 @@ use schema::Document; use common::BinarySerializable; use std::mem::size_of; use std::io::{self, Read}; -use bincode; +use common::VInt; use datastruct::SkipList; use lz4; @@ -81,13 +81,12 @@ impl StoreReader { let current_block_mut = self.current_block.borrow_mut(); let mut cursor = ¤t_block_mut[..]; for _ in first_doc_id..doc_id { - let doc_length = u32::deserialize(&mut cursor)?; - cursor = &cursor[doc_length as usize..]; + let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; + cursor = &cursor[doc_length..]; } - let doc_length = u32::deserialize(&mut cursor)? as usize; - let document: Document = bincode::deserialize(&cursor[..doc_length]) - .expect("The docstore is corrupted. Failed to fetch doc"); - Ok(document) + let doc_length = VInt::deserialize(&mut cursor)?.val() as usize; + cursor = &cursor[..doc_length]; + Ok(Document::deserialize(&mut cursor)?) } } diff --git a/src/store/writer.rs b/src/store/writer.rs index e015587d1..1742a3d00 100644 --- a/src/store/writer.rs +++ b/src/store/writer.rs @@ -1,12 +1,11 @@ use directory::WritePtr; use DocId; -use common::BinarySerializable; +use common::{VInt, BinarySerializable}; use std::io::{self, Write}; use super::StoreReader; use lz4; use datastruct::SkipListBuilder; use common::CountingWriter; -use bincode; use schema::Document; const BLOCK_SIZE: usize = 16_384; @@ -49,10 +48,9 @@ impl StoreWriter { /// pub fn store<'a>(&mut self, stored_document: &Document) -> io::Result<()> { self.intermediary_buffer.clear(); - bincode::serialize_into(&mut self.intermediary_buffer, stored_document, bincode::Infinite) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e))?; - let doc_num_bytes = self.intermediary_buffer.len() as u32; - ::serialize(&doc_num_bytes, &mut self.current_block)?; + stored_document.serialize(&mut self.intermediary_buffer)?; + let doc_num_bytes = self.intermediary_buffer.len(); + VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?; self.current_block.write_all(&self.intermediary_buffer[..])?; self.doc += 1; if self.current_block.len() > BLOCK_SIZE { diff --git a/src/termdict/mod.rs b/src/termdict/mod.rs index 856721681..ef07aae86 100644 --- a/src/termdict/mod.rs +++ b/src/termdict/mod.rs @@ -516,6 +516,28 @@ mod tests { } } + #[test] + fn test_empty_string() { + let field_type = FieldType::Str(TEXT); + let buffer: Vec = { + let mut term_dictionary_builder = TermDictionaryBuilderImpl::new(vec![], field_type) + .unwrap(); + term_dictionary_builder + .insert(&[], &make_term_info(1 as u64)).unwrap(); + term_dictionary_builder + .insert(&[1u8], &make_term_info(2 as u64)).unwrap(); + term_dictionary_builder + .finish().unwrap() + }; + let source = ReadOnlySource::from(buffer); + let term_dictionary: TermDictionaryImpl = TermDictionaryImpl::from_source(source); + let mut stream = term_dictionary.stream(); + assert!(stream.advance()); + assert!(stream.key().is_empty()); + assert!(stream.advance()); + assert_eq!(stream.key(), &[1u8]); + assert!(!stream.advance()); + } #[test] fn test_stream_range_boundaries() { diff --git a/src/tokenizer/facet_tokenizer.rs b/src/tokenizer/facet_tokenizer.rs index f20cbc1c0..b6138ec7a 100644 --- a/src/tokenizer/facet_tokenizer.rs +++ b/src/tokenizer/facet_tokenizer.rs @@ -14,9 +14,16 @@ use schema::FACET_SEP_BYTE; #[derive(Clone)] pub struct FacetTokenizer; +#[derive(Debug)] +enum State { + RootFacetNotEmitted, + UpToPosition(usize), //< we already emitted facet prefix up to &text[..cursor] + Terminated, +} + pub struct FacetTokenStream<'a> { text: &'a str, - pos: usize, + state: State, token: Token, } @@ -26,7 +33,7 @@ impl<'a> Tokenizer<'a> for FacetTokenizer { fn token_stream(&self, text: &'a str) -> Self::TokenStreamImpl { FacetTokenStream { text: text, - pos: 0, + state: State::RootFacetNotEmitted, //< pos is the first char that has not been processed yet. token: Token::default(), } } @@ -35,20 +42,36 @@ impl<'a> Tokenizer<'a> for FacetTokenizer { impl<'a> TokenStream for FacetTokenStream<'a> { fn advance(&mut self) -> bool { - let bytes: &[u8] = self.text.as_bytes(); - if self.pos == bytes.len() { - false - } else { - let next_sep_pos = bytes[self.pos + 1..] - .iter() - .cloned() - .position(|b| b == FACET_SEP_BYTE) - .map(|pos| pos + self.pos + 1) - .unwrap_or(bytes.len()); - let facet_prefix = unsafe { str::from_utf8_unchecked(&bytes[self.pos..next_sep_pos]) }; - self.pos = next_sep_pos; - self.token.text.push_str(facet_prefix); - true + match self.state { + State::RootFacetNotEmitted => { + self.state = + if self.text.is_empty() { + State::Terminated + } else { + State::UpToPosition(0) + }; + true + } + State::UpToPosition(cursor) => { + let bytes: &[u8] = self.text.as_bytes(); + if let Some(next_sep_pos) = bytes[cursor+1..] + .iter() + .cloned() + .position(|b| b == FACET_SEP_BYTE) + .map(|pos| cursor + 1 + pos) { + let facet_part = unsafe { str::from_utf8_unchecked(&bytes[cursor..next_sep_pos]) }; + self.token.text.push_str(facet_part); + self.state = State::UpToPosition(next_sep_pos); + } else { + let facet_part = unsafe { str::from_utf8_unchecked(&bytes[cursor..]) }; + self.token.text.push_str(facet_part); + self.state = State::Terminated; + } + true + } + State::Terminated => { + false + } } } @@ -81,9 +104,27 @@ mod tests { .token_stream(unsafe { ::std::str::from_utf8_unchecked(facet.encoded_bytes()) }) .process(&mut add_token); } - assert_eq!(tokens.len(), 3); - assert_eq!(tokens[0], "/top"); - assert_eq!(tokens[1], "/top/a"); - assert_eq!(tokens[2], "/top/a/b"); + assert_eq!(tokens.len(), 4); + assert_eq!(tokens[0], "/"); + assert_eq!(tokens[1], "/top"); + assert_eq!(tokens[2], "/top/a"); + assert_eq!(tokens[3], "/top/a/b"); + } + + #[test] + fn test_facet_tokenizer_root_facets() { + let facet = Facet::root(); + let mut tokens = vec![]; + { + let mut add_token = |token: &Token| { + let facet = Facet::from_encoded(token.text.as_bytes().to_owned()); + tokens.push(format!("{}", facet)); + }; + FacetTokenizer + .token_stream(unsafe { ::std::str::from_utf8_unchecked(facet.encoded_bytes()) }) + .process(&mut add_token); + } + assert_eq!(tokens.len(), 1); + assert_eq!(tokens[0], "/"); } } \ No newline at end of file