mirror of
https://github.com/quickwit-oss/tantivy.git
synced 2026-01-06 01:02:55 +00:00
Preparing for release
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "tantivy"
|
||||
version = "0.6.0-dev"
|
||||
version = "0.6.0"
|
||||
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
|
||||
license = "MIT"
|
||||
categories = ["database-implementations", "data-structures"]
|
||||
|
||||
@@ -38,9 +38,9 @@ Tantivy is, in fact, strongly inspired by Lucene's design.
|
||||
- Distributed search and will not be in the scope of tantivy.
|
||||
|
||||
|
||||
# Supported OS
|
||||
# Supported OS and compiler
|
||||
|
||||
Tantivy supports Linux, MacOS and Windows.
|
||||
Tantivy works on stable rust (>= 1.27) and supports Linux, MacOS and Windows.
|
||||
|
||||
# Getting started
|
||||
|
||||
@@ -57,8 +57,8 @@ It will walk you through getting a wikipedia search engine up and running in a f
|
||||
|
||||
## Development
|
||||
|
||||
Tantivy now compiles on stable rust.
|
||||
To check out and run test, you can simply run :
|
||||
Tantivy compiles on stable rust but requires `Rust >= 1.27`.
|
||||
To check out and run tests, you can simply run :
|
||||
|
||||
git clone git@github.com:tantivy-search/tantivy.git
|
||||
cd tantivy
|
||||
|
||||
@@ -342,16 +342,19 @@ impl FacetCollector {
|
||||
pub fn harvest(mut self) -> FacetCounts {
|
||||
self.finalize_segment();
|
||||
|
||||
let collapsed_facet_ords: Vec<&[u64]> = self.segment_counters
|
||||
let collapsed_facet_ords: Vec<&[u64]> = self
|
||||
.segment_counters
|
||||
.iter()
|
||||
.map(|segment_counter| &segment_counter.facet_ords[..])
|
||||
.collect();
|
||||
let collapsed_facet_counts: Vec<&[u64]> = self.segment_counters
|
||||
let collapsed_facet_counts: Vec<&[u64]> = self
|
||||
.segment_counters
|
||||
.iter()
|
||||
.map(|segment_counter| &segment_counter.facet_counts[..])
|
||||
.collect();
|
||||
|
||||
let facet_streams = self.segment_counters
|
||||
let facet_streams = self
|
||||
.segment_counters
|
||||
.iter()
|
||||
.map(|seg_counts| seg_counts.facet_reader.facet_dict().range().into_stream())
|
||||
.collect::<Vec<_>>();
|
||||
@@ -402,7 +405,8 @@ impl Collector for FacetCollector {
|
||||
|
||||
fn collect(&mut self, doc: DocId, _: Score) {
|
||||
let facet_reader: &mut FacetReader = unsafe {
|
||||
&mut *self.ff_reader
|
||||
&mut *self
|
||||
.ff_reader
|
||||
.as_ref()
|
||||
.expect("collect() was called before set_segment. This should never happen.")
|
||||
.get()
|
||||
@@ -507,7 +511,7 @@ mod tests {
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
|
||||
let mut index_writer = index.writer_with_num_threads(1,3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
let num_facets: usize = 3 * 4 * 5;
|
||||
let facets: Vec<Facet> = (0..num_facets)
|
||||
.map(|mut n| {
|
||||
@@ -587,7 +591,7 @@ mod tests {
|
||||
.collect();
|
||||
thread_rng().shuffle(&mut docs[..]);
|
||||
|
||||
let mut index_writer = index.writer_with_num_threads(1,3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
for doc in docs {
|
||||
index_writer.add_document(doc);
|
||||
}
|
||||
@@ -644,7 +648,7 @@ mod bench {
|
||||
// 40425 docs
|
||||
thread_rng().shuffle(&mut docs[..]);
|
||||
|
||||
let mut index_writer = index.writer_with_num_threads(1,3_000_000).unwrap();
|
||||
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
|
||||
for doc in docs {
|
||||
index_writer.add_document(doc);
|
||||
}
|
||||
|
||||
@@ -161,11 +161,13 @@ impl Collector for TopCollector {
|
||||
fn collect(&mut self, doc: DocId, score: Score) {
|
||||
if self.at_capacity() {
|
||||
// It's ok to unwrap as long as a limit of 0 is forbidden.
|
||||
let limit_doc: GlobalScoredDoc = *self.heap
|
||||
let limit_doc: GlobalScoredDoc = *self
|
||||
.heap
|
||||
.peek()
|
||||
.expect("Top collector with size 0 is forbidden");
|
||||
if limit_doc.score < score {
|
||||
let mut mut_head = self.heap
|
||||
let mut mut_head = self
|
||||
.heap
|
||||
.peek_mut()
|
||||
.expect("Top collector with size 0 is forbidden");
|
||||
mut_head.score = score;
|
||||
@@ -241,5 +243,4 @@ mod tests {
|
||||
TopCollector::with_limit(0);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
@@ -5,8 +5,6 @@ use std::mem;
|
||||
use std::ops::Deref;
|
||||
use std::ptr;
|
||||
|
||||
|
||||
|
||||
pub(crate) struct BitPacker {
|
||||
mini_buffer: u64,
|
||||
mini_buffer_written: usize,
|
||||
|
||||
@@ -72,7 +72,8 @@ impl<W: Write> CompositeWrite<W> {
|
||||
let footer_offset = self.write.written_bytes();
|
||||
VInt(self.offsets.len() as u64).serialize(&mut self.write)?;
|
||||
|
||||
let mut offset_fields: Vec<_> = self.offsets
|
||||
let mut offset_fields: Vec<_> = self
|
||||
.offsets
|
||||
.iter()
|
||||
.map(|(file_addr, offset)| (*offset, *file_addr))
|
||||
.collect();
|
||||
|
||||
@@ -34,7 +34,8 @@ impl BlockEncoder {
|
||||
let num_bits = self.bitpacker.num_bits_sorted(offset, block);
|
||||
self.output[0] = num_bits;
|
||||
let written_size =
|
||||
1 + self.bitpacker
|
||||
1 + self
|
||||
.bitpacker
|
||||
.compress_sorted(offset, block, &mut self.output[1..], num_bits);
|
||||
&self.output[..written_size]
|
||||
}
|
||||
@@ -42,7 +43,8 @@ impl BlockEncoder {
|
||||
pub fn compress_block_unsorted(&mut self, block: &[u32]) -> &[u8] {
|
||||
let num_bits = self.bitpacker.num_bits(block);
|
||||
self.output[0] = num_bits;
|
||||
let written_size = 1 + self.bitpacker
|
||||
let written_size = 1 + self
|
||||
.bitpacker
|
||||
.compress(block, &mut self.output[1..], num_bits);
|
||||
&self.output[..written_size]
|
||||
}
|
||||
@@ -83,7 +85,8 @@ impl BlockDecoder {
|
||||
pub fn uncompress_block_unsorted<'a>(&mut self, compressed_data: &'a [u8]) -> usize {
|
||||
let num_bits = compressed_data[0];
|
||||
self.output_len = COMPRESSION_BLOCK_SIZE;
|
||||
1 + self.bitpacker
|
||||
1 + self
|
||||
.bitpacker
|
||||
.decompress(&compressed_data[1..], &mut self.output, num_bits)
|
||||
}
|
||||
|
||||
|
||||
@@ -42,7 +42,8 @@ impl CompressedIntStream {
|
||||
// no need to read.
|
||||
self.cached_next_addr
|
||||
} else {
|
||||
let next_addr = addr + self.block_decoder
|
||||
let next_addr = addr + self
|
||||
.block_decoder
|
||||
.uncompress_block_unsorted(self.buffer.slice_from(addr));
|
||||
self.cached_addr = addr;
|
||||
self.cached_next_addr = next_addr;
|
||||
|
||||
@@ -21,13 +21,13 @@ use directory::ManagedDirectory;
|
||||
use directory::MmapDirectory;
|
||||
use directory::{Directory, RAMDirectory};
|
||||
use indexer::index_writer::open_index_writer;
|
||||
use indexer::index_writer::HEAP_SIZE_MIN;
|
||||
use indexer::segment_updater::save_new_metas;
|
||||
use indexer::DirectoryLock;
|
||||
use num_cpus;
|
||||
use std::path::Path;
|
||||
use tokenizer::TokenizerManager;
|
||||
use IndexWriter;
|
||||
use indexer::index_writer::HEAP_SIZE_MIN;
|
||||
|
||||
const NUM_SEARCHERS: usize = 12;
|
||||
|
||||
@@ -155,7 +155,12 @@ impl Index {
|
||||
) -> Result<IndexWriter> {
|
||||
let directory_lock = DirectoryLock::lock(self.directory().box_clone())?;
|
||||
let heap_size_in_bytes_per_thread = overall_heap_size_in_bytes / num_threads;
|
||||
open_index_writer(self, num_threads, heap_size_in_bytes_per_thread, directory_lock)
|
||||
open_index_writer(
|
||||
self,
|
||||
num_threads,
|
||||
heap_size_in_bytes_per_thread,
|
||||
directory_lock,
|
||||
)
|
||||
}
|
||||
|
||||
/// Creates a multithreaded writer
|
||||
@@ -186,7 +191,8 @@ impl Index {
|
||||
|
||||
/// Returns the list of segments that are searchable
|
||||
pub fn searchable_segments(&self) -> Result<Vec<Segment>> {
|
||||
Ok(self.searchable_segment_metas()?
|
||||
Ok(self
|
||||
.searchable_segment_metas()?
|
||||
.into_iter()
|
||||
.map(|segment_meta| self.segment(segment_meta))
|
||||
.collect())
|
||||
@@ -221,7 +227,8 @@ impl Index {
|
||||
|
||||
/// Returns the list of segment ids that are searchable.
|
||||
pub fn searchable_segment_ids(&self) -> Result<Vec<SegmentId>> {
|
||||
Ok(self.searchable_segment_metas()?
|
||||
Ok(self
|
||||
.searchable_segment_metas()?
|
||||
.iter()
|
||||
.map(|segment_meta| segment_meta.id())
|
||||
.collect())
|
||||
|
||||
@@ -87,7 +87,8 @@ impl<T> Deref for LeasedItem<T> {
|
||||
type Target = T;
|
||||
|
||||
fn deref(&self) -> &T {
|
||||
&self.gen_item
|
||||
&self
|
||||
.gen_item
|
||||
.as_ref()
|
||||
.expect("Unwrapping a leased item should never fail")
|
||||
.item // unwrap is safe here
|
||||
@@ -96,7 +97,8 @@ impl<T> Deref for LeasedItem<T> {
|
||||
|
||||
impl<T> DerefMut for LeasedItem<T> {
|
||||
fn deref_mut(&mut self) -> &mut T {
|
||||
&mut self.gen_item
|
||||
&mut self
|
||||
.gen_item
|
||||
.as_mut()
|
||||
.expect("Unwrapping a mut leased item should never fail")
|
||||
.item // unwrap is safe here
|
||||
|
||||
@@ -78,7 +78,8 @@ impl Searcher {
|
||||
|
||||
/// Return the field searcher associated to a `Field`.
|
||||
pub fn field(&self, field: Field) -> FieldSearcher {
|
||||
let inv_index_readers = self.segment_readers
|
||||
let inv_index_readers = self
|
||||
.segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.inverted_index(field))
|
||||
.collect::<Vec<_>>();
|
||||
@@ -98,7 +99,8 @@ impl FieldSearcher {
|
||||
/// Returns a Stream over all of the sorted unique terms of
|
||||
/// for the given field.
|
||||
pub fn terms(&self) -> TermMerger {
|
||||
let term_streamers: Vec<_> = self.inv_index_readers
|
||||
let term_streamers: Vec<_> = self
|
||||
.inv_index_readers
|
||||
.iter()
|
||||
.map(|inverted_index| inverted_index.terms().stream())
|
||||
.collect();
|
||||
@@ -108,7 +110,8 @@ impl FieldSearcher {
|
||||
|
||||
impl fmt::Debug for Searcher {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
let segment_ids = self.segment_readers
|
||||
let segment_ids = self
|
||||
.segment_readers
|
||||
.iter()
|
||||
.map(|segment_reader| segment_reader.segment_id())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -156,11 +156,13 @@ impl SegmentReader {
|
||||
&FieldType::Bytes => {}
|
||||
_ => return Err(FastFieldNotAvailableError::new(field_entry)),
|
||||
}
|
||||
let idx_reader = self.fast_fields_composite
|
||||
let idx_reader = self
|
||||
.fast_fields_composite
|
||||
.open_read_with_idx(field, 0)
|
||||
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))
|
||||
.map(FastFieldReader::open)?;
|
||||
let values = self.fast_fields_composite
|
||||
let values = self
|
||||
.fast_fields_composite
|
||||
.open_read_with_idx(field, 1)
|
||||
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))?;
|
||||
Ok(BytesFastFieldReader::open(idx_reader, values))
|
||||
@@ -272,7 +274,8 @@ impl SegmentReader {
|
||||
/// term dictionary associated to a specific field,
|
||||
/// and opening the posting list associated to any term.
|
||||
pub fn inverted_index(&self, field: Field) -> Arc<InvertedIndexReader> {
|
||||
if let Some(inv_idx_reader) = self.inv_idx_reader_cache
|
||||
if let Some(inv_idx_reader) = self
|
||||
.inv_idx_reader_cache
|
||||
.read()
|
||||
.expect("Lock poisoned. This should never happen")
|
||||
.get(&field)
|
||||
@@ -301,11 +304,13 @@ impl SegmentReader {
|
||||
|
||||
let postings_source = postings_source_opt.unwrap();
|
||||
|
||||
let termdict_source = self.termdict_composite
|
||||
let termdict_source = self
|
||||
.termdict_composite
|
||||
.open_read(field)
|
||||
.expect("Failed to open field term dictionary in composite file. Is the field indexed");
|
||||
|
||||
let positions_source = self.positions_composite
|
||||
let positions_source = self
|
||||
.positions_composite
|
||||
.open_read(field)
|
||||
.expect("Index corrupted. Failed to open field positions in composite file.");
|
||||
|
||||
|
||||
@@ -117,7 +117,8 @@ impl ManagedDirectory {
|
||||
let mut files_to_delete = vec![];
|
||||
{
|
||||
// releasing the lock as .delete() will use it too.
|
||||
let meta_informations_rlock = self.meta_informations
|
||||
let meta_informations_rlock = self
|
||||
.meta_informations
|
||||
.read()
|
||||
.expect("Managed directory rlock poisoned in garbage collect.");
|
||||
|
||||
@@ -170,7 +171,8 @@ impl ManagedDirectory {
|
||||
if !deleted_files.is_empty() {
|
||||
// update the list of managed files by removing
|
||||
// the file that were removed.
|
||||
let mut meta_informations_wlock = self.meta_informations
|
||||
let mut meta_informations_wlock = self
|
||||
.meta_informations
|
||||
.write()
|
||||
.expect("Managed directory wlock poisoned (2).");
|
||||
{
|
||||
@@ -193,7 +195,8 @@ impl ManagedDirectory {
|
||||
pub fn protect_file_from_delete(&self, path: &Path) -> FileProtection {
|
||||
let pathbuf = path.to_owned();
|
||||
{
|
||||
let mut meta_informations_wlock = self.meta_informations
|
||||
let mut meta_informations_wlock = self
|
||||
.meta_informations
|
||||
.write()
|
||||
.expect("Managed file lock poisoned on protect");
|
||||
*meta_informations_wlock
|
||||
@@ -215,7 +218,8 @@ impl ManagedDirectory {
|
||||
/// will not lead to garbage files that will
|
||||
/// never get removed.
|
||||
fn register_file_as_managed(&mut self, filepath: &Path) -> io::Result<()> {
|
||||
let mut meta_wlock = self.meta_informations
|
||||
let mut meta_wlock = self
|
||||
.meta_informations
|
||||
.write()
|
||||
.expect("Managed file lock poisoned");
|
||||
let has_changed = meta_wlock.managed_paths.insert(filepath.to_owned());
|
||||
@@ -248,7 +252,8 @@ impl Directory for ManagedDirectory {
|
||||
|
||||
fn delete(&self, path: &Path) -> result::Result<(), DeleteError> {
|
||||
{
|
||||
let metas_rlock = self.meta_informations
|
||||
let metas_rlock = self
|
||||
.meta_informations
|
||||
.read()
|
||||
.expect("poisoned lock in managed directory meta");
|
||||
if let Some(counter) = metas_rlock.protected_files.get(path) {
|
||||
|
||||
@@ -32,7 +32,8 @@ fn open_mmap(full_path: &Path) -> result::Result<Option<MmapReadOnly>, OpenReadE
|
||||
}
|
||||
})?;
|
||||
|
||||
let meta_data = file.metadata()
|
||||
let meta_data = file
|
||||
.metadata()
|
||||
.map_err(|e| IOError::with_path(full_path.to_owned(), e))?;
|
||||
if meta_data.len() == 0 {
|
||||
// if the file size is 0, it will not be possible
|
||||
@@ -309,7 +310,8 @@ impl Directory for MmapDirectory {
|
||||
// when the last reference is gone.
|
||||
mmap_cache.cache.remove(&full_path);
|
||||
match fs::remove_file(&full_path) {
|
||||
Ok(_) => self.sync_directory()
|
||||
Ok(_) => self
|
||||
.sync_directory()
|
||||
.map_err(|e| IOError::with_path(path.to_owned(), e).into()),
|
||||
Err(e) => {
|
||||
if e.kind() == io::ErrorKind::NotFound {
|
||||
|
||||
@@ -170,7 +170,8 @@ impl Directory for RAMDirectory {
|
||||
let path_buf = PathBuf::from(path);
|
||||
let vec_writer = VecWriter::new(path_buf.clone(), self.fs.clone());
|
||||
|
||||
let exists = self.fs
|
||||
let exists = self
|
||||
.fs
|
||||
.write(path_buf.clone(), &Vec::new())
|
||||
.map_err(|err| IOError::with_path(path.to_owned(), err))?;
|
||||
|
||||
|
||||
@@ -41,7 +41,8 @@ pub struct DeleteBitSet {
|
||||
impl DeleteBitSet {
|
||||
/// Opens a delete bitset given its data source.
|
||||
pub fn open(data: ReadOnlySource) -> DeleteBitSet {
|
||||
let num_deleted: usize = data.as_slice()
|
||||
let num_deleted: usize = data
|
||||
.as_slice()
|
||||
.iter()
|
||||
.map(|b| b.count_ones() as usize)
|
||||
.sum();
|
||||
|
||||
@@ -56,7 +56,8 @@ impl FacetReader {
|
||||
|
||||
/// Given a term ordinal returns the term associated to it.
|
||||
pub fn facet_from_ord(&self, facet_ord: TermOrdinal, output: &mut Facet) {
|
||||
let found_term = self.term_dict
|
||||
let found_term = self
|
||||
.term_dict
|
||||
.ord_to_term(facet_ord as u64, output.inner_buffer_mut());
|
||||
assert!(found_term, "Term ordinal {} no found.", facet_ord);
|
||||
}
|
||||
|
||||
@@ -52,7 +52,8 @@ impl DeleteQueue {
|
||||
//
|
||||
// Past delete operations are not accessible.
|
||||
pub fn cursor(&self) -> DeleteCursor {
|
||||
let last_block = self.inner
|
||||
let last_block = self
|
||||
.inner
|
||||
.read()
|
||||
.expect("Read lock poisoned when opening delete queue cursor")
|
||||
.last_block
|
||||
@@ -92,7 +93,8 @@ impl DeleteQueue {
|
||||
// be some unflushed operations.
|
||||
//
|
||||
fn flush(&self) -> Option<Arc<Block>> {
|
||||
let mut self_wlock = self.inner
|
||||
let mut self_wlock = self
|
||||
.inner
|
||||
.write()
|
||||
.expect("Failed to acquire write lock on delete queue writer");
|
||||
|
||||
@@ -132,7 +134,8 @@ impl From<DeleteQueue> for NextBlock {
|
||||
impl NextBlock {
|
||||
fn next_block(&self) -> Option<Arc<Block>> {
|
||||
{
|
||||
let next_read_lock = self.0
|
||||
let next_read_lock = self
|
||||
.0
|
||||
.read()
|
||||
.expect("Failed to acquire write lock in delete queue");
|
||||
if let InnerNextBlock::Closed(ref block) = *next_read_lock {
|
||||
@@ -141,7 +144,8 @@ impl NextBlock {
|
||||
}
|
||||
let next_block;
|
||||
{
|
||||
let mut next_write_lock = self.0
|
||||
let mut next_write_lock = self
|
||||
.0
|
||||
.write()
|
||||
.expect("Failed to acquire write lock in delete queue");
|
||||
match *next_write_lock {
|
||||
|
||||
@@ -22,6 +22,7 @@ use indexer::DirectoryLock;
|
||||
use indexer::MergePolicy;
|
||||
use indexer::SegmentEntry;
|
||||
use indexer::SegmentWriter;
|
||||
use postings::compute_table_size;
|
||||
use schema::Document;
|
||||
use schema::IndexRecordOption;
|
||||
use schema::Term;
|
||||
@@ -29,14 +30,13 @@ use std::mem;
|
||||
use std::mem::swap;
|
||||
use std::thread;
|
||||
use std::thread::JoinHandle;
|
||||
use postings::compute_table_size;
|
||||
|
||||
// Size of the margin for the heap. A segment is closed when the remaining memory
|
||||
// in the heap goes below MARGIN_IN_BYTES.
|
||||
pub const MARGIN_IN_BYTES: usize = 1_000_000;
|
||||
|
||||
// We impose the memory per thread to be at least 3 MB.
|
||||
pub const HEAP_SIZE_MIN: usize = ((MARGIN_IN_BYTES as u32)* 3u32) as usize;
|
||||
pub const HEAP_SIZE_MIN: usize = ((MARGIN_IN_BYTES as u32) * 3u32) as usize;
|
||||
pub const HEAP_SIZE_MAX: usize = u32::max_value() as usize - MARGIN_IN_BYTES;
|
||||
|
||||
// Add document will block if the number of docs waiting in the queue to be indexed
|
||||
@@ -46,8 +46,6 @@ const PIPELINE_MAX_SIZE_IN_DOCS: usize = 10_000;
|
||||
type DocumentSender = chan::Sender<AddOperation>;
|
||||
type DocumentReceiver = chan::Receiver<AddOperation>;
|
||||
|
||||
|
||||
|
||||
/// Split the thread memory budget into
|
||||
/// - the heap size
|
||||
/// - the hash table "table" itself.
|
||||
@@ -123,12 +121,12 @@ pub fn open_index_writer(
|
||||
if heap_size_in_bytes_per_thread < HEAP_SIZE_MIN {
|
||||
let err_msg = format!(
|
||||
"The heap size per thread needs to be at least {}.",
|
||||
HEAP_SIZE_MIN);
|
||||
HEAP_SIZE_MIN
|
||||
);
|
||||
bail!(ErrorKind::InvalidArgument(err_msg));
|
||||
}
|
||||
if heap_size_in_bytes_per_thread >= HEAP_SIZE_MAX {
|
||||
let err_msg = format!(
|
||||
"The heap size per thread cannot exceed {}", HEAP_SIZE_MAX);
|
||||
let err_msg = format!("The heap size per thread cannot exceed {}", HEAP_SIZE_MAX);
|
||||
bail!(ErrorKind::InvalidArgument(err_msg));
|
||||
}
|
||||
let (document_sender, document_receiver): (DocumentSender, DocumentReceiver) =
|
||||
@@ -141,10 +139,7 @@ pub fn open_index_writer(
|
||||
let stamper = Stamper::new(current_opstamp);
|
||||
|
||||
let segment_updater =
|
||||
SegmentUpdater::new(
|
||||
index.clone(),
|
||||
stamper.clone(),
|
||||
&delete_queue.cursor())?;
|
||||
SegmentUpdater::new(index.clone(), stamper.clone(), &delete_queue.cursor())?;
|
||||
|
||||
let mut index_writer = IndexWriter {
|
||||
_directory_lock: Some(directory_lock),
|
||||
@@ -266,19 +261,18 @@ pub fn advance_deletes(
|
||||
Ok(file_protect)
|
||||
}
|
||||
|
||||
fn index_documents(
|
||||
memory_budget: usize,
|
||||
segment: &Segment,
|
||||
generation: usize,
|
||||
document_iterator: &mut Iterator<Item = AddOperation>,
|
||||
segment_updater: &mut SegmentUpdater,
|
||||
mut delete_cursor: DeleteCursor,
|
||||
fn index_documents(
|
||||
memory_budget: usize,
|
||||
segment: &Segment,
|
||||
generation: usize,
|
||||
document_iterator: &mut Iterator<Item = AddOperation>,
|
||||
segment_updater: &mut SegmentUpdater,
|
||||
mut delete_cursor: DeleteCursor,
|
||||
) -> Result<bool> {
|
||||
let schema = segment.schema();
|
||||
let segment_id = segment.id();
|
||||
let table_size = initial_table_size(memory_budget);
|
||||
let mut segment_writer =
|
||||
SegmentWriter::for_segment(table_size, segment.clone(), &schema)?;
|
||||
let mut segment_writer = SegmentWriter::for_segment(table_size, segment.clone(), &schema)?;
|
||||
for doc in document_iterator {
|
||||
segment_writer.add_document(doc, &schema)?;
|
||||
|
||||
@@ -348,7 +342,8 @@ impl IndexWriter {
|
||||
}
|
||||
drop(self.workers_join_handle);
|
||||
|
||||
let result = self.segment_updater
|
||||
let result = self
|
||||
.segment_updater
|
||||
.wait_merging_thread()
|
||||
.chain_err(|| ErrorKind::ErrorInThread("Failed to join merging thread.".into()));
|
||||
|
||||
@@ -493,7 +488,8 @@ impl IndexWriter {
|
||||
let document_receiver = self.document_receiver.clone();
|
||||
|
||||
// take the directory lock to create a new index_writer.
|
||||
let directory_lock = self._directory_lock
|
||||
let directory_lock = self
|
||||
._directory_lock
|
||||
.take()
|
||||
.expect("The IndexWriter does not have any lock. This is a bug, please report.");
|
||||
|
||||
@@ -649,11 +645,11 @@ impl IndexWriter {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::initial_table_size;
|
||||
use env_logger;
|
||||
use error::*;
|
||||
use indexer::NoMergePolicy;
|
||||
use schema::{self, Document};
|
||||
use super::initial_table_size;
|
||||
use Index;
|
||||
use Term;
|
||||
|
||||
@@ -844,7 +840,6 @@ mod tests {
|
||||
assert_eq!(num_docs_containing("b"), 100);
|
||||
}
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_hashmap_size() {
|
||||
assert_eq!(initial_table_size(100_000), 12);
|
||||
|
||||
@@ -440,7 +440,8 @@ impl IndexMerger {
|
||||
) -> Result<Option<TermOrdinalMapping>> {
|
||||
let mut positions_buffer: Vec<u32> = Vec::with_capacity(1_000);
|
||||
let mut delta_computer = DeltaComputer::new();
|
||||
let field_readers = self.readers
|
||||
let field_readers = self
|
||||
.readers
|
||||
.iter()
|
||||
.map(|reader| reader.inverted_index(indexed_field))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
@@ -59,7 +59,8 @@ impl SegmentRegister {
|
||||
}
|
||||
|
||||
pub fn segment_metas(&self) -> Vec<SegmentMeta> {
|
||||
let mut segment_ids: Vec<SegmentMeta> = self.segment_states
|
||||
let mut segment_ids: Vec<SegmentMeta> = self
|
||||
.segment_states
|
||||
.values()
|
||||
.map(|segment_entry| segment_entry.meta().clone())
|
||||
.collect();
|
||||
|
||||
@@ -11,9 +11,8 @@ mod postings_writer;
|
||||
mod recorder;
|
||||
mod segment_postings;
|
||||
mod serializer;
|
||||
mod term_info;
|
||||
mod stacker;
|
||||
|
||||
mod term_info;
|
||||
|
||||
pub(crate) use self::postings_writer::MultiFieldPostingsWriter;
|
||||
pub use self::serializer::{FieldSerializer, InvertedIndexSerializer};
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use super::stacker::{TermHashMap, Addr, MemoryArena};
|
||||
use super::stacker::{Addr, MemoryArena, TermHashMap};
|
||||
|
||||
use postings::recorder::{NothingRecorder, Recorder, TFAndPositionRecorder, TermFrequencyRecorder};
|
||||
use postings::UnorderedTermId;
|
||||
@@ -49,7 +49,6 @@ pub struct MultiFieldPostingsWriter {
|
||||
per_field_postings_writers: Vec<Box<PostingsWriter>>,
|
||||
}
|
||||
|
||||
|
||||
impl MultiFieldPostingsWriter {
|
||||
/// Create a new `MultiFieldPostingsWriter` given
|
||||
/// a schema and a heap.
|
||||
@@ -74,7 +73,13 @@ impl MultiFieldPostingsWriter {
|
||||
|
||||
pub fn index_text(&mut self, doc: DocId, field: Field, token_stream: &mut TokenStream) -> u32 {
|
||||
let postings_writer = self.per_field_postings_writers[field.0 as usize].deref_mut();
|
||||
postings_writer.index_text(&mut self.term_index, doc, field, token_stream, &mut self.heap)
|
||||
postings_writer.index_text(
|
||||
&mut self.term_index,
|
||||
doc,
|
||||
field,
|
||||
token_stream,
|
||||
&mut self.heap,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn subscribe(&mut self, doc: DocId, term: &Term) -> UnorderedTermId {
|
||||
@@ -89,8 +94,10 @@ impl MultiFieldPostingsWriter {
|
||||
&self,
|
||||
serializer: &mut InvertedIndexSerializer,
|
||||
) -> Result<HashMap<Field, HashMap<UnorderedTermId, TermOrdinal>>> {
|
||||
let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self.term_index.iter()
|
||||
.map(|(term_bytes, addr, bucket_id)| (term_bytes, addr, bucket_id as UnorderedTermId) )
|
||||
let mut term_offsets: Vec<(&[u8], Addr, UnorderedTermId)> = self
|
||||
.term_index
|
||||
.iter()
|
||||
.map(|(term_bytes, addr, bucket_id)| (term_bytes, addr, bucket_id as UnorderedTermId))
|
||||
.collect();
|
||||
term_offsets.sort_by_key(|&(k, _, _)| k);
|
||||
|
||||
@@ -147,7 +154,7 @@ impl MultiFieldPostingsWriter {
|
||||
&term_offsets[start..stop],
|
||||
&mut field_serializer,
|
||||
&self.term_index.heap,
|
||||
&self.heap
|
||||
&self.heap,
|
||||
)?;
|
||||
field_serializer.close()?;
|
||||
}
|
||||
@@ -183,7 +190,7 @@ pub trait PostingsWriter {
|
||||
term_addrs: &[(&[u8], Addr, UnorderedTermId)],
|
||||
serializer: &mut FieldSerializer,
|
||||
term_heap: &MemoryArena,
|
||||
heap: &MemoryArena
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()>;
|
||||
|
||||
/// Tokenize a text and subscribe all of its token.
|
||||
@@ -238,7 +245,7 @@ impl<Rec: Recorder + 'static> PostingsWriter for SpecializedPostingsWriter<Rec>
|
||||
doc: DocId,
|
||||
position: u32,
|
||||
term: &Term,
|
||||
heap: &mut MemoryArena
|
||||
heap: &mut MemoryArena,
|
||||
) -> UnorderedTermId {
|
||||
debug_assert!(term.as_slice().len() >= 4);
|
||||
self.total_num_tokens += 1;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use super::stacker::{MemoryArena, ExpUnrolledLinkedList};
|
||||
use super::stacker::{ExpUnrolledLinkedList, MemoryArena};
|
||||
use postings::FieldSerializer;
|
||||
use std::{self, io};
|
||||
use DocId;
|
||||
@@ -29,11 +29,7 @@ pub trait Recorder: Copy {
|
||||
/// Close the document. It will help record the term frequency.
|
||||
fn close_doc(&mut self, heap: &mut MemoryArena);
|
||||
/// Pushes the postings information to the serializer.
|
||||
fn serialize(
|
||||
&self,
|
||||
serializer: &mut FieldSerializer,
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()>;
|
||||
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()>;
|
||||
}
|
||||
|
||||
/// Only records the doc ids
|
||||
@@ -47,7 +43,7 @@ impl Recorder for NothingRecorder {
|
||||
fn new(heap: &mut MemoryArena) -> Self {
|
||||
NothingRecorder {
|
||||
stack: ExpUnrolledLinkedList::new(heap),
|
||||
current_doc: u32::max_value()
|
||||
current_doc: u32::max_value(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,11 +60,7 @@ impl Recorder for NothingRecorder {
|
||||
|
||||
fn close_doc(&mut self, _heap: &mut MemoryArena) {}
|
||||
|
||||
fn serialize(
|
||||
&self,
|
||||
serializer: &mut FieldSerializer,
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()> {
|
||||
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
|
||||
for doc in self.stack.iter(heap) {
|
||||
serializer.write_doc(doc, 0u32, &EMPTY_ARRAY)?;
|
||||
}
|
||||
@@ -85,7 +77,6 @@ pub struct TermFrequencyRecorder {
|
||||
}
|
||||
|
||||
impl Recorder for TermFrequencyRecorder {
|
||||
|
||||
fn new(heap: &mut MemoryArena) -> Self {
|
||||
TermFrequencyRecorder {
|
||||
stack: ExpUnrolledLinkedList::new(heap),
|
||||
@@ -113,14 +104,11 @@ impl Recorder for TermFrequencyRecorder {
|
||||
self.current_tf = 0;
|
||||
}
|
||||
|
||||
fn serialize(
|
||||
&self,
|
||||
serializer: &mut FieldSerializer,
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()> {
|
||||
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
|
||||
// the last document has not been closed...
|
||||
// its term freq is self.current_tf.
|
||||
let mut doc_iter = self.stack
|
||||
let mut doc_iter = self
|
||||
.stack
|
||||
.iter(heap)
|
||||
.chain(Some(self.current_tf).into_iter());
|
||||
|
||||
@@ -142,7 +130,6 @@ pub struct TFAndPositionRecorder {
|
||||
}
|
||||
|
||||
impl Recorder for TFAndPositionRecorder {
|
||||
|
||||
fn new(heap: &mut MemoryArena) -> Self {
|
||||
TFAndPositionRecorder {
|
||||
stack: ExpUnrolledLinkedList::new(heap),
|
||||
@@ -167,11 +154,7 @@ impl Recorder for TFAndPositionRecorder {
|
||||
self.stack.push(POSITION_END, heap);
|
||||
}
|
||||
|
||||
fn serialize(
|
||||
&self,
|
||||
serializer: &mut FieldSerializer,
|
||||
heap: &MemoryArena,
|
||||
) -> io::Result<()> {
|
||||
fn serialize(&self, serializer: &mut FieldSerializer, heap: &MemoryArena) -> io::Result<()> {
|
||||
let mut doc_positions = Vec::with_capacity(100);
|
||||
let mut positions_iter = self.stack.iter(heap);
|
||||
while let Some(doc) = positions_iter.next() {
|
||||
|
||||
@@ -399,7 +399,8 @@ impl BlockSegmentPostings {
|
||||
/// Returns false iff there was no remaining blocks.
|
||||
pub fn advance(&mut self) -> bool {
|
||||
if self.num_bitpacked_blocks > 0 {
|
||||
let num_consumed_bytes = self.doc_decoder
|
||||
let num_consumed_bytes = self
|
||||
.doc_decoder
|
||||
.uncompress_block_sorted(self.remaining_data.as_ref(), self.doc_offset);
|
||||
self.remaining_data.advance(num_consumed_bytes);
|
||||
match self.freq_reading_option {
|
||||
@@ -409,7 +410,8 @@ impl BlockSegmentPostings {
|
||||
self.remaining_data.advance(num_bytes_to_skip);
|
||||
}
|
||||
FreqReadingOption::ReadFreq => {
|
||||
let num_consumed_bytes = self.freq_decoder
|
||||
let num_consumed_bytes = self
|
||||
.freq_decoder
|
||||
.uncompress_block_unsorted(self.remaining_data.as_ref());
|
||||
self.remaining_data.advance(num_consumed_bytes);
|
||||
}
|
||||
|
||||
@@ -160,7 +160,8 @@ impl<'a> FieldSerializer<'a> {
|
||||
}
|
||||
|
||||
fn current_term_info(&self) -> TermInfo {
|
||||
let (filepos, offset) = self.positions_serializer_opt
|
||||
let (filepos, offset) = self
|
||||
.positions_serializer_opt
|
||||
.as_ref()
|
||||
.map(|positions_serializer| positions_serializer.addr())
|
||||
.unwrap_or((0u64, 0u8));
|
||||
@@ -272,7 +273,8 @@ impl<W: Write> PostingsSerializer<W> {
|
||||
if self.doc_ids.len() == COMPRESSION_BLOCK_SIZE {
|
||||
{
|
||||
// encode the doc ids
|
||||
let block_encoded: &[u8] = self.block_encoder
|
||||
let block_encoded: &[u8] = self
|
||||
.block_encoder
|
||||
.compress_block_sorted(&self.doc_ids, self.last_doc_id_encoded);
|
||||
self.last_doc_id_encoded = self.doc_ids[self.doc_ids.len() - 1];
|
||||
self.postings_write.write_all(block_encoded)?;
|
||||
@@ -298,14 +300,16 @@ impl<W: Write> PostingsSerializer<W> {
|
||||
// In that case, the remaining part is encoded
|
||||
// using variable int encoding.
|
||||
{
|
||||
let block_encoded = self.block_encoder
|
||||
let block_encoded = self
|
||||
.block_encoder
|
||||
.compress_vint_sorted(&self.doc_ids, self.last_doc_id_encoded);
|
||||
self.postings_write.write_all(block_encoded)?;
|
||||
self.doc_ids.clear();
|
||||
}
|
||||
// ... Idem for term frequencies
|
||||
if self.termfreq_enabled {
|
||||
let block_encoded = self.block_encoder
|
||||
let block_encoded = self
|
||||
.block_encoder
|
||||
.compress_vint_unsorted(&self.term_freqs[..]);
|
||||
self.postings_write.write_all(block_encoded)?;
|
||||
self.term_freqs.clear();
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use super::{MemoryArena, Addr};
|
||||
use super::{Addr, MemoryArena};
|
||||
|
||||
use std::mem;
|
||||
use common::is_power_of_2;
|
||||
use std::mem;
|
||||
|
||||
const MAX_BLOCK_LEN: u32 = 1u32 << 15;
|
||||
|
||||
@@ -57,7 +57,6 @@ pub struct ExpUnrolledLinkedList {
|
||||
}
|
||||
|
||||
impl ExpUnrolledLinkedList {
|
||||
|
||||
pub fn new(heap: &mut MemoryArena) -> ExpUnrolledLinkedList {
|
||||
let addr = heap.allocate_space((FIRST_BLOCK as usize) * mem::size_of::<u32>());
|
||||
ExpUnrolledLinkedList {
|
||||
@@ -87,19 +86,20 @@ impl ExpUnrolledLinkedList {
|
||||
// to the future next block.
|
||||
let new_block_size: usize = (new_block_len + 1) * mem::size_of::<u32>();
|
||||
let new_block_addr: Addr = heap.allocate_space(new_block_size);
|
||||
unsafe { // logic
|
||||
unsafe {
|
||||
// logic
|
||||
heap.write(self.tail, new_block_addr)
|
||||
};
|
||||
self.tail = new_block_addr;
|
||||
}
|
||||
unsafe { // logic
|
||||
unsafe {
|
||||
// logic
|
||||
heap.write(self.tail, val);
|
||||
self.tail = self.tail.offset(mem::size_of::<u32>() as u32);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub struct ExpUnrolledLinkedListIterator<'a> {
|
||||
heap: &'a MemoryArena,
|
||||
addr: Addr,
|
||||
@@ -115,16 +115,17 @@ impl<'a> Iterator for ExpUnrolledLinkedListIterator<'a> {
|
||||
None
|
||||
} else {
|
||||
self.consumed += 1;
|
||||
let addr: Addr =
|
||||
if jump_needed(self.consumed).is_some() {
|
||||
unsafe { // logic
|
||||
self.heap.read(self.addr)
|
||||
}
|
||||
} else {
|
||||
self.addr
|
||||
};
|
||||
let addr: Addr = if jump_needed(self.consumed).is_some() {
|
||||
unsafe {
|
||||
// logic
|
||||
self.heap.read(self.addr)
|
||||
}
|
||||
} else {
|
||||
self.addr
|
||||
};
|
||||
self.addr = addr.offset(mem::size_of::<u32>() as u32);
|
||||
Some(unsafe { // logic
|
||||
Some(unsafe {
|
||||
// logic
|
||||
self.heap.read(addr)
|
||||
})
|
||||
}
|
||||
@@ -134,8 +135,8 @@ impl<'a> Iterator for ExpUnrolledLinkedListIterator<'a> {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use super::jump_needed;
|
||||
use super::super::MemoryArena;
|
||||
use super::jump_needed;
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
@@ -171,19 +172,15 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[cfg(all(test, feature = "unstable"))]
|
||||
mod bench {
|
||||
use super::ExpUnrolledLinkedList;
|
||||
use test::Bencher;
|
||||
use tantivy_memory_arena::MemoryArena;
|
||||
use test::Bencher;
|
||||
|
||||
const NUM_STACK: usize = 10_000;
|
||||
const STACK_SIZE: u32 = 1000;
|
||||
|
||||
|
||||
|
||||
|
||||
#[bench]
|
||||
fn bench_push_vec(bench: &mut Bencher) {
|
||||
bench.iter(|| {
|
||||
|
||||
@@ -28,7 +28,6 @@ use std::ptr;
|
||||
const NUM_BITS_PAGE_ADDR: usize = 20;
|
||||
const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 1 MB large
|
||||
|
||||
|
||||
/// Represents a pointer into the `MemoryArena`
|
||||
/// .
|
||||
/// Pointer are 32-bits and are split into
|
||||
@@ -42,7 +41,6 @@ const PAGE_SIZE: usize = 1 << NUM_BITS_PAGE_ADDR; // pages are 1 MB large
|
||||
pub struct Addr(u32);
|
||||
|
||||
impl Addr {
|
||||
|
||||
/// Creates a null pointer.
|
||||
pub fn null_pointer() -> Addr {
|
||||
Addr(u32::max_value())
|
||||
@@ -54,7 +52,7 @@ impl Addr {
|
||||
}
|
||||
|
||||
fn new(page_id: usize, local_addr: usize) -> Addr {
|
||||
Addr( (page_id << NUM_BITS_PAGE_ADDR | local_addr) as u32)
|
||||
Addr((page_id << NUM_BITS_PAGE_ADDR | local_addr) as u32)
|
||||
}
|
||||
|
||||
fn page_id(&self) -> usize {
|
||||
@@ -71,7 +69,6 @@ impl Addr {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/// Trait required for an object to be `storable`.
|
||||
///
|
||||
/// # Warning
|
||||
@@ -86,7 +83,10 @@ pub trait ArenaStorable {
|
||||
unsafe fn write_into(self, arena: &mut MemoryArena, addr: Addr);
|
||||
}
|
||||
|
||||
impl<V> ArenaStorable for V where V: Copy {
|
||||
impl<V> ArenaStorable for V
|
||||
where
|
||||
V: Copy,
|
||||
{
|
||||
fn num_bytes(&self) -> usize {
|
||||
mem::size_of::<V>()
|
||||
}
|
||||
@@ -103,12 +103,11 @@ pub struct MemoryArena {
|
||||
}
|
||||
|
||||
impl MemoryArena {
|
||||
|
||||
/// Creates a new memory arena.
|
||||
pub fn new() -> MemoryArena {
|
||||
let first_page = Page::new(0);
|
||||
MemoryArena {
|
||||
pages: vec![first_page]
|
||||
pages: vec![first_page],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,7 +136,7 @@ impl MemoryArena {
|
||||
pub fn write_bytes<B: AsRef<[u8]>>(&mut self, addr: Addr, data: B) {
|
||||
let bytes = data.as_ref();
|
||||
self.pages[addr.page_id()]
|
||||
.get_mut_slice(addr.page_local_addr(), bytes .len())
|
||||
.get_mut_slice(addr.page_local_addr(), bytes.len())
|
||||
.copy_from_slice(bytes);
|
||||
}
|
||||
|
||||
@@ -147,8 +146,7 @@ impl MemoryArena {
|
||||
///
|
||||
/// Panics if the memory has not been allocated beforehands.
|
||||
pub fn read_slice(&self, addr: Addr, len: usize) -> &[u8] {
|
||||
self.pages[addr.page_id()]
|
||||
.get_slice(addr.page_local_addr(), len)
|
||||
self.pages[addr.page_id()].get_slice(addr.page_local_addr(), len)
|
||||
}
|
||||
|
||||
unsafe fn get_mut_ptr(&mut self, addr: Addr) -> *mut u8 {
|
||||
@@ -161,7 +159,9 @@ impl MemoryArena {
|
||||
pub fn store<Item: ArenaStorable>(&mut self, val: Item) -> Addr {
|
||||
let num_bytes = val.num_bytes();
|
||||
let addr = self.allocate_space(num_bytes);
|
||||
unsafe { self.write(addr, val); };
|
||||
unsafe {
|
||||
self.write(addr, val);
|
||||
};
|
||||
addr
|
||||
}
|
||||
|
||||
@@ -187,24 +187,24 @@ impl MemoryArena {
|
||||
}
|
||||
self.add_page().allocate_space(len).unwrap()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
struct Page {
|
||||
page_id: usize,
|
||||
len: usize,
|
||||
data: Box<[u8]>
|
||||
data: Box<[u8]>,
|
||||
}
|
||||
|
||||
impl Page {
|
||||
fn new(page_id: usize) -> Page {
|
||||
let mut data: Vec<u8> = Vec::with_capacity(PAGE_SIZE);
|
||||
unsafe { data.set_len(PAGE_SIZE); } // avoid initializing page
|
||||
unsafe {
|
||||
data.set_len(PAGE_SIZE);
|
||||
} // avoid initializing page
|
||||
Page {
|
||||
page_id,
|
||||
len: 0,
|
||||
data: data.into_boxed_slice()
|
||||
data: data.into_boxed_slice(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -256,29 +256,36 @@ mod tests {
|
||||
let addr_a = arena.allocate_space(a.len());
|
||||
arena.write_bytes(addr_a, a);
|
||||
|
||||
let addr_b= arena.allocate_space(b.len());
|
||||
let addr_b = arena.allocate_space(b.len());
|
||||
arena.write_bytes(addr_b, b);
|
||||
|
||||
assert_eq!(arena.read_slice(addr_a, a.len()), a);
|
||||
assert_eq!(arena.read_slice(addr_b, b.len()), b);
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
struct MyTest {
|
||||
pub a: usize,
|
||||
pub b: u8,
|
||||
pub c: u32
|
||||
pub c: u32,
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_store_object() {
|
||||
let mut arena = MemoryArena::new();
|
||||
let a = MyTest { a: 143, b: 21, c: 32};
|
||||
let b = MyTest { a: 113, b: 221, c: 12};
|
||||
let a = MyTest {
|
||||
a: 143,
|
||||
b: 21,
|
||||
c: 32,
|
||||
};
|
||||
let b = MyTest {
|
||||
a: 113,
|
||||
b: 221,
|
||||
c: 12,
|
||||
};
|
||||
let addr_a = arena.store(a);
|
||||
let addr_b = arena.store(b);
|
||||
assert_eq!(unsafe { arena.read::<MyTest>(addr_a) }, a);
|
||||
assert_eq!(unsafe { arena.read::<MyTest>(addr_b) }, b);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
mod expull;
|
||||
mod memory_arena;
|
||||
mod murmurhash2;
|
||||
mod term_hashmap;
|
||||
mod expull;
|
||||
|
||||
pub use self::expull::ExpUnrolledLinkedList;
|
||||
pub use self::memory_arena::{Addr, ArenaStorable, MemoryArena};
|
||||
use self::murmurhash2::murmurhash2;
|
||||
pub use self::memory_arena::{Addr, MemoryArena, ArenaStorable};
|
||||
pub use self::term_hashmap::{compute_table_size, TermHashMap};
|
||||
pub use self::expull::ExpUnrolledLinkedList;
|
||||
@@ -44,15 +44,12 @@ pub fn murmurhash2(key: &[u8]) -> u32 {
|
||||
h ^ (h >> 15)
|
||||
}
|
||||
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
|
||||
use super::murmurhash2;
|
||||
use std::collections::HashSet;
|
||||
|
||||
|
||||
#[test]
|
||||
fn test_murmur() {
|
||||
let s1 = "abcdef";
|
||||
@@ -86,4 +83,4 @@ mod test {
|
||||
}
|
||||
assert_eq!(set.len(), 10_000);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
use super::{Addr, MemoryArena, ArenaStorable};
|
||||
use super::murmurhash2;
|
||||
use super::{Addr, ArenaStorable, MemoryArena};
|
||||
use std::iter;
|
||||
use std::mem;
|
||||
use std::slice;
|
||||
use super::murmurhash2;
|
||||
|
||||
pub type BucketId = usize;
|
||||
|
||||
|
||||
struct KeyBytesValue<'a, V> {
|
||||
key: &'a [u8],
|
||||
value: V
|
||||
value: V,
|
||||
}
|
||||
|
||||
impl<'a, V> KeyBytesValue<'a, V> {
|
||||
@@ -19,7 +18,9 @@ impl<'a, V> KeyBytesValue<'a, V> {
|
||||
}
|
||||
|
||||
impl<'a, V> ArenaStorable for KeyBytesValue<'a, V>
|
||||
where V: ArenaStorable {
|
||||
where
|
||||
V: ArenaStorable,
|
||||
{
|
||||
fn num_bytes(&self) -> usize {
|
||||
0u16.num_bytes() + self.key.len() + self.value.num_bytes()
|
||||
}
|
||||
@@ -33,7 +34,7 @@ impl<'a, V> ArenaStorable for KeyBytesValue<'a, V>
|
||||
|
||||
/// Returns the actual memory size in bytes
|
||||
/// required to create a table of size $2^num_bits$.
|
||||
pub fn compute_table_size(num_bits: usize) -> usize {
|
||||
pub fn compute_table_size(num_bits: usize) -> usize {
|
||||
(1 << num_bits) * mem::size_of::<KeyValue>()
|
||||
}
|
||||
|
||||
@@ -54,7 +55,7 @@ impl Default for KeyValue {
|
||||
fn default() -> Self {
|
||||
KeyValue {
|
||||
key_value_addr: Addr::null_pointer(),
|
||||
hash: 0u32
|
||||
hash: 0u32,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -99,7 +100,6 @@ impl QuadraticProbing {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub struct Iter<'a> {
|
||||
hashmap: &'a TermHashMap,
|
||||
inner: slice::Iter<'a, usize>,
|
||||
@@ -111,7 +111,8 @@ impl<'a> Iterator for Iter<'a> {
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.inner.next().cloned().map(move |bucket: usize| {
|
||||
let kv = self.hashmap.table[bucket];
|
||||
let (key, offset): (&'a [u8], Addr) = unsafe { self.hashmap.get_key_value(kv.key_value_addr) };
|
||||
let (key, offset): (&'a [u8], Addr) =
|
||||
unsafe { self.hashmap.get_key_value(kv.key_value_addr) };
|
||||
(key, offset, bucket as BucketId)
|
||||
})
|
||||
}
|
||||
@@ -195,14 +196,11 @@ impl TermHashMap {
|
||||
/// will be in charge of returning a default value.
|
||||
/// If the key already as an associated value, then it will be passed
|
||||
/// `Some(previous_value)`.
|
||||
pub fn mutate_or_create<S, V, TMutator>(
|
||||
&mut self,
|
||||
key: S,
|
||||
mut updater: TMutator) -> BucketId
|
||||
where
|
||||
S: AsRef<[u8]>,
|
||||
V: Copy,
|
||||
TMutator: FnMut(Option<V>) -> V
|
||||
pub fn mutate_or_create<S, V, TMutator>(&mut self, key: S, mut updater: TMutator) -> BucketId
|
||||
where
|
||||
S: AsRef<[u8]>,
|
||||
V: Copy,
|
||||
TMutator: FnMut(Option<V>) -> V,
|
||||
{
|
||||
if self.is_saturated() {
|
||||
self.resize();
|
||||
@@ -220,11 +218,13 @@ impl TermHashMap {
|
||||
return bucket as BucketId;
|
||||
} else if kv.hash == hash {
|
||||
let (key_matches, val_addr) = {
|
||||
let (stored_key, val_addr): (&[u8], Addr) = unsafe { self.get_key_value(kv.key_value_addr) };
|
||||
let (stored_key, val_addr): (&[u8], Addr) =
|
||||
unsafe { self.get_key_value(kv.key_value_addr) };
|
||||
(stored_key == key_bytes, val_addr)
|
||||
};
|
||||
if key_matches {
|
||||
unsafe { // logic
|
||||
unsafe {
|
||||
// logic
|
||||
let v = self.heap.read(val_addr);
|
||||
let new_v = updater(Some(v));
|
||||
self.heap.write(val_addr, new_v);
|
||||
@@ -257,9 +257,8 @@ mod bench {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use std::collections::HashMap;
|
||||
use super::TermHashMap;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[test]
|
||||
fn test_hash_map() {
|
||||
@@ -286,7 +285,8 @@ mod tests {
|
||||
let mut vanilla_hash_map = HashMap::new();
|
||||
let mut iter_values = hash_map.iter();
|
||||
while let Some((key, addr, _)) = iter_values.next() {
|
||||
let val: u32 = unsafe { // test
|
||||
let val: u32 = unsafe {
|
||||
// test
|
||||
hash_map.heap.read(addr)
|
||||
};
|
||||
vanilla_hash_map.insert(key.to_owned(), val);
|
||||
|
||||
@@ -11,49 +11,49 @@ use Result;
|
||||
/// A weight struct for Fuzzy Term and Regex Queries
|
||||
pub struct AutomatonWeight<A>
|
||||
where
|
||||
A: Automaton,
|
||||
A: Automaton,
|
||||
{
|
||||
field: Field,
|
||||
automaton: A,
|
||||
field: Field,
|
||||
automaton: A,
|
||||
}
|
||||
|
||||
impl<A> AutomatonWeight<A>
|
||||
where
|
||||
A: Automaton,
|
||||
A: Automaton,
|
||||
{
|
||||
/// Create a new AutomationWeight
|
||||
pub fn new(field: Field, automaton: A) -> AutomatonWeight<A> {
|
||||
AutomatonWeight { field, automaton }
|
||||
}
|
||||
/// Create a new AutomationWeight
|
||||
pub fn new(field: Field, automaton: A) -> AutomatonWeight<A> {
|
||||
AutomatonWeight { field, automaton }
|
||||
}
|
||||
|
||||
fn automaton_stream<'a>(&'a self, term_dict: &'a TermDictionary) -> TermStreamer<'a, &'a A> {
|
||||
let term_stream_builder = term_dict.search(&self.automaton);
|
||||
term_stream_builder.into_stream()
|
||||
}
|
||||
fn automaton_stream<'a>(&'a self, term_dict: &'a TermDictionary) -> TermStreamer<'a, &'a A> {
|
||||
let term_stream_builder = term_dict.search(&self.automaton);
|
||||
term_stream_builder.into_stream()
|
||||
}
|
||||
}
|
||||
|
||||
impl<A> Weight for AutomatonWeight<A>
|
||||
where
|
||||
A: Automaton,
|
||||
A: Automaton,
|
||||
{
|
||||
fn scorer(&self, reader: &SegmentReader) -> Result<Box<Scorer>> {
|
||||
let max_doc = reader.max_doc();
|
||||
let mut doc_bitset = BitSet::with_max_value(max_doc);
|
||||
fn scorer(&self, reader: &SegmentReader) -> Result<Box<Scorer>> {
|
||||
let max_doc = reader.max_doc();
|
||||
let mut doc_bitset = BitSet::with_max_value(max_doc);
|
||||
|
||||
let inverted_index = reader.inverted_index(self.field);
|
||||
let term_dict = inverted_index.terms();
|
||||
let mut term_stream = self.automaton_stream(term_dict);
|
||||
while term_stream.advance() {
|
||||
let term_info = term_stream.value();
|
||||
let mut block_segment_postings =
|
||||
inverted_index.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic);
|
||||
while block_segment_postings.advance() {
|
||||
for &doc in block_segment_postings.docs() {
|
||||
doc_bitset.insert(doc);
|
||||
let inverted_index = reader.inverted_index(self.field);
|
||||
let term_dict = inverted_index.terms();
|
||||
let mut term_stream = self.automaton_stream(term_dict);
|
||||
while term_stream.advance() {
|
||||
let term_info = term_stream.value();
|
||||
let mut block_segment_postings = inverted_index
|
||||
.read_block_postings_from_terminfo(term_info, IndexRecordOption::Basic);
|
||||
while block_segment_postings.advance() {
|
||||
for &doc in block_segment_postings.docs() {
|
||||
doc_bitset.insert(doc);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let doc_bitset = BitSetDocSet::from(doc_bitset);
|
||||
Ok(Box::new(ConstScorer::new(doc_bitset)))
|
||||
}
|
||||
let doc_bitset = BitSetDocSet::from(doc_bitset);
|
||||
Ok(Box::new(ConstScorer::new(doc_bitset)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +41,8 @@ impl From<Vec<(Occur, Box<Query>)>> for BooleanQuery {
|
||||
|
||||
impl Query for BooleanQuery {
|
||||
fn weight(&self, searcher: &Searcher, scoring_enabled: bool) -> Result<Box<Weight>> {
|
||||
let sub_weights = self.subqueries
|
||||
let sub_weights = self
|
||||
.subqueries
|
||||
.iter()
|
||||
.map(|&(ref occur, ref subquery)| {
|
||||
Ok((*occur, subquery.weight(searcher, scoring_enabled)?))
|
||||
|
||||
@@ -228,7 +228,8 @@ where
|
||||
TOtherScorer: Scorer,
|
||||
{
|
||||
fn score(&mut self) -> Score {
|
||||
self.left.score() + self.right.score()
|
||||
self.left.score()
|
||||
+ self.right.score()
|
||||
+ self.others.iter_mut().map(Scorer::score).sum::<Score>()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,12 +195,16 @@ impl RangeQuery {
|
||||
|
||||
/// Lower bound of range
|
||||
pub fn left_bound(&self) -> Bound<Term> {
|
||||
map_bound(&self.left_bound, &|bytes| Term::from_field_bytes(self.field, bytes))
|
||||
map_bound(&self.left_bound, &|bytes| {
|
||||
Term::from_field_bytes(self.field, bytes)
|
||||
})
|
||||
}
|
||||
|
||||
/// Upper bound of range
|
||||
pub fn right_bound(&self) -> Bound<Term> {
|
||||
map_bound(&self.right_bound, &|bytes| Term::from_field_bytes(self.field, bytes))
|
||||
map_bound(&self.right_bound, &|bytes| {
|
||||
Term::from_field_bytes(self.field, bytes)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -65,79 +65,79 @@ use Searcher;
|
||||
/// ```
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct RegexQuery {
|
||||
regex_pattern: String,
|
||||
field: Field,
|
||||
regex_pattern: String,
|
||||
field: Field,
|
||||
}
|
||||
|
||||
impl RegexQuery {
|
||||
/// Creates a new Fuzzy Query
|
||||
pub fn new(regex_pattern: String, field: Field) -> RegexQuery {
|
||||
RegexQuery {
|
||||
regex_pattern,
|
||||
field,
|
||||
/// Creates a new Fuzzy Query
|
||||
pub fn new(regex_pattern: String, field: Field) -> RegexQuery {
|
||||
RegexQuery {
|
||||
regex_pattern,
|
||||
field,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn specialized_weight(&self) -> Result<AutomatonWeight<Regex>> {
|
||||
let automaton = Regex::new(&self.regex_pattern)
|
||||
.map_err(|_| ErrorKind::InvalidArgument(self.regex_pattern.clone()))?;
|
||||
fn specialized_weight(&self) -> Result<AutomatonWeight<Regex>> {
|
||||
let automaton = Regex::new(&self.regex_pattern)
|
||||
.map_err(|_| ErrorKind::InvalidArgument(self.regex_pattern.clone()))?;
|
||||
|
||||
Ok(AutomatonWeight::new(self.field.clone(), automaton))
|
||||
}
|
||||
Ok(AutomatonWeight::new(self.field.clone(), automaton))
|
||||
}
|
||||
}
|
||||
|
||||
impl Query for RegexQuery {
|
||||
fn weight(&self, _searcher: &Searcher, _scoring_enabled: bool) -> Result<Box<Weight>> {
|
||||
Ok(Box::new(self.specialized_weight()?))
|
||||
}
|
||||
fn weight(&self, _searcher: &Searcher, _scoring_enabled: bool) -> Result<Box<Weight>> {
|
||||
Ok(Box::new(self.specialized_weight()?))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::RegexQuery;
|
||||
use collector::TopCollector;
|
||||
use schema::SchemaBuilder;
|
||||
use schema::TEXT;
|
||||
use tests::assert_nearly_equals;
|
||||
use Index;
|
||||
use super::RegexQuery;
|
||||
use collector::TopCollector;
|
||||
use schema::SchemaBuilder;
|
||||
use schema::TEXT;
|
||||
use tests::assert_nearly_equals;
|
||||
use Index;
|
||||
|
||||
#[test]
|
||||
pub fn test_regex_query() {
|
||||
let mut schema_builder = SchemaBuilder::new();
|
||||
let country_field = schema_builder.add_text_field("country", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
|
||||
index_writer.add_document(doc!(
|
||||
#[test]
|
||||
pub fn test_regex_query() {
|
||||
let mut schema_builder = SchemaBuilder::new();
|
||||
let country_field = schema_builder.add_text_field("country", TEXT);
|
||||
let schema = schema_builder.build();
|
||||
let index = Index::create_in_ram(schema);
|
||||
{
|
||||
let mut index_writer = index.writer_with_num_threads(1, 10_000_000).unwrap();
|
||||
index_writer.add_document(doc!(
|
||||
country_field => "japan",
|
||||
));
|
||||
index_writer.add_document(doc!(
|
||||
index_writer.add_document(doc!(
|
||||
country_field => "korea",
|
||||
));
|
||||
index_writer.commit().unwrap();
|
||||
}
|
||||
index.load_searchers().unwrap();
|
||||
let searcher = index.searcher();
|
||||
{
|
||||
let mut collector = TopCollector::with_limit(2);
|
||||
index_writer.commit().unwrap();
|
||||
}
|
||||
index.load_searchers().unwrap();
|
||||
let searcher = index.searcher();
|
||||
{
|
||||
let mut collector = TopCollector::with_limit(2);
|
||||
|
||||
let regex_query = RegexQuery::new("jap[ao]n".to_string(), country_field);
|
||||
searcher.search(®ex_query, &mut collector).unwrap();
|
||||
let scored_docs = collector.score_docs();
|
||||
assert_eq!(scored_docs.len(), 1, "Expected only 1 document");
|
||||
let (score, _) = scored_docs[0];
|
||||
assert_nearly_equals(1f32, score);
|
||||
}
|
||||
let regex_query = RegexQuery::new("jap[ao]n".to_string(), country_field);
|
||||
searcher.search(®ex_query, &mut collector).unwrap();
|
||||
let scored_docs = collector.score_docs();
|
||||
assert_eq!(scored_docs.len(), 1, "Expected only 1 document");
|
||||
let (score, _) = scored_docs[0];
|
||||
assert_nearly_equals(1f32, score);
|
||||
}
|
||||
|
||||
let searcher = index.searcher();
|
||||
{
|
||||
let mut collector = TopCollector::with_limit(2);
|
||||
let searcher = index.searcher();
|
||||
{
|
||||
let mut collector = TopCollector::with_limit(2);
|
||||
|
||||
let regex_query = RegexQuery::new("jap[A-Z]n".to_string(), country_field);
|
||||
searcher.search(®ex_query, &mut collector).unwrap();
|
||||
let scored_docs = collector.score_docs();
|
||||
assert_eq!(scored_docs.len(), 0, "Expected ZERO document");
|
||||
let regex_query = RegexQuery::new("jap[A-Z]n".to_string(), country_field);
|
||||
searcher.search(®ex_query, &mut collector).unwrap();
|
||||
let scored_docs = collector.score_docs();
|
||||
assert_eq!(scored_docs.len(), 0, "Expected ZERO document");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()>
|
||||
|
||||
pub fn decompress(compressed: &[u8], decompressed: &mut Vec<u8>) -> io::Result<()> {
|
||||
decompressed.clear();
|
||||
snap::Reader::new(compressed)
|
||||
.read_to_end(decompressed)?;
|
||||
snap::Reader::new(compressed).read_to_end(decompressed)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -33,23 +33,22 @@ and should rely on either
|
||||
|
||||
!*/
|
||||
|
||||
mod skiplist;
|
||||
mod reader;
|
||||
mod skiplist;
|
||||
mod writer;
|
||||
pub use self::reader::StoreReader;
|
||||
pub use self::writer::StoreWriter;
|
||||
|
||||
#[cfg(feature="lz4")]
|
||||
#[cfg(feature = "lz4")]
|
||||
mod compression_lz4;
|
||||
#[cfg(feature="lz4")]
|
||||
#[cfg(feature = "lz4")]
|
||||
use self::compression_lz4::*;
|
||||
|
||||
#[cfg(not(feature="lz4"))]
|
||||
#[cfg(not(feature = "lz4"))]
|
||||
mod compression_snap;
|
||||
#[cfg(not(feature="lz4"))]
|
||||
#[cfg(not(feature = "lz4"))]
|
||||
use self::compression_snap::*;
|
||||
|
||||
|
||||
#[cfg(test)]
|
||||
pub mod tests {
|
||||
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
use Result;
|
||||
|
||||
use super::decompress;
|
||||
use super::skiplist::SkipList;
|
||||
use common::BinarySerializable;
|
||||
use common::VInt;
|
||||
use super::skiplist::SkipList;
|
||||
use directory::ReadOnlySource;
|
||||
use super::decompress;
|
||||
use schema::Document;
|
||||
use std::cell::RefCell;
|
||||
use std::io;
|
||||
|
||||
@@ -72,7 +72,8 @@ impl<T: BinarySerializable> SkipListBuilder<T> {
|
||||
let mut skip_pointer = self.data_layer.insert(key, dest)?;
|
||||
loop {
|
||||
skip_pointer = match skip_pointer {
|
||||
Some((skip_doc_id, skip_offset)) => self.get_skip_layer(layer_id)
|
||||
Some((skip_doc_id, skip_offset)) => self
|
||||
.get_skip_layer(layer_id)
|
||||
.insert(skip_doc_id, &skip_offset)?,
|
||||
None => {
|
||||
return Ok(());
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
use super::compress;
|
||||
use super::skiplist::SkipListBuilder;
|
||||
use super::StoreReader;
|
||||
use common::CountingWriter;
|
||||
use common::{BinarySerializable, VInt};
|
||||
use super::skiplist::SkipListBuilder;
|
||||
use directory::WritePtr;
|
||||
use super::compress;
|
||||
use schema::Document;
|
||||
use std::io::{self, Write};
|
||||
use DocId;
|
||||
|
||||
@@ -93,7 +93,8 @@ fn extract_bits(data: &[u8], addr_bits: usize, num_bits: u8) -> u64 {
|
||||
let addr_byte = addr_bits / 8;
|
||||
let bit_shift = (addr_bits % 8) as u64;
|
||||
assert!(data.len() >= addr_byte + 8);
|
||||
let val_unshifted_unmasked: u64 = unsafe { // ok thanks to the 7 byte padding on `.close`
|
||||
let val_unshifted_unmasked: u64 = unsafe {
|
||||
// ok thanks to the 7 byte padding on `.close`
|
||||
let addr = data.as_ptr().offset(addr_byte as isize) as *const u64;
|
||||
ptr::read_unaligned(addr)
|
||||
};
|
||||
|
||||
@@ -164,7 +164,8 @@ impl TermDictionary {
|
||||
let fst = self.fst_index.as_fst();
|
||||
let mut node = fst.root();
|
||||
while ord != 0 || !node.is_final() {
|
||||
if let Some(transition) = node.transitions()
|
||||
if let Some(transition) = node
|
||||
.transitions()
|
||||
.take_while(|transition| transition.out.value() <= ord)
|
||||
.last()
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user