First stab

This commit is contained in:
Paul Masurel
2019-02-05 20:53:10 +01:00
parent 04e9606638
commit d9b2bf98e2
7 changed files with 145 additions and 66 deletions

View File

@@ -1,19 +1,14 @@
use super::pool::LeasedItem;
use super::pool::Pool;
use super::segment::create_segment;
use super::segment::Segment;
use core::searcher::Searcher;
use core::Executor;
use core::IndexMeta;
use core::SegmentId;
use core::SegmentMeta;
use core::SegmentReader;
use core::META_FILEPATH;
use directory::ManagedDirectory;
#[cfg(feature = "mmap")]
use directory::MmapDirectory;
use directory::INDEX_WRITER_LOCK;
use directory::META_LOCK;
use directory::{Directory, RAMDirectory};
use error::DataCorruption;
use error::TantivyError;
@@ -21,6 +16,8 @@ use indexer::index_writer::open_index_writer;
use indexer::index_writer::HEAP_SIZE_MIN;
use indexer::segment_updater::save_new_metas;
use num_cpus;
use reader::IndexReaderBuilder;
use reader::{IndexReader, ReloadPolicy};
use schema::Field;
use schema::FieldType;
use schema::Schema;
@@ -28,7 +25,6 @@ use serde_json;
use std::borrow::BorrowMut;
use std::fmt;
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokenizer::BoxedTokenizer;
use tokenizer::TokenizerManager;
@@ -52,8 +48,6 @@ fn load_metas(directory: &Directory) -> Result<IndexMeta> {
pub struct Index {
directory: ManagedDirectory,
schema: Schema,
num_searchers: Arc<AtomicUsize>,
searcher_pool: Arc<Pool<Searcher>>,
executor: Arc<Executor>,
tokenizers: TokenizerManager,
}
@@ -158,16 +152,12 @@ impl Index {
/// Creates a new index given a directory and an `IndexMeta`.
fn create_from_metas(directory: ManagedDirectory, metas: &IndexMeta) -> Result<Index> {
let schema = metas.schema.clone();
let n_cpus = num_cpus::get();
let index = Index {
directory,
schema,
num_searchers: Arc::new(AtomicUsize::new(n_cpus)),
searcher_pool: Arc::new(Pool::new()),
tokenizers: TokenizerManager::default(),
executor: Arc::new(Executor::single_thread()),
};
index.load_searchers()?;
Ok(index)
}
@@ -197,6 +187,14 @@ impl Index {
}
}
pub fn reader(&self, reload_policy: ReloadPolicy) -> IndexReader {
self.reader_builder().into()
}
pub fn reader_builder(&self) -> IndexReaderBuilder {
IndexReaderBuilder::new(self.clone())
}
/// Opens a new directory from an index path.
#[cfg(feature = "mmap")]
pub fn open_in_dir<P: AsRef<Path>>(directory_path: P) -> Result<Index> {
@@ -335,53 +333,6 @@ impl Index {
.map(|segment_meta| segment_meta.id())
.collect())
}
/// Sets the number of searchers to use
///
/// Only works after the next call to `load_searchers`
pub fn set_num_searchers(&mut self, num_searchers: usize) {
self.num_searchers.store(num_searchers, Ordering::Release);
}
/// Update searchers so that they reflect the state of the last
/// `.commit()`.
///
/// If indexing happens in the same process as searching,
/// you most likely want to call `.load_searchers()` right after each
/// successful call to `.commit()`.
///
/// If indexing and searching happen in different processes, the way to
/// get the freshest `index` at all time, is to watch `meta.json` and
/// call `load_searchers` whenever a changes happen.
pub fn load_searchers(&self) -> Result<()> {
let _meta_lock = self.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = self.searchable_segments()?;
let segment_readers: Vec<SegmentReader> = searchable_segments
.iter()
.map(SegmentReader::open)
.collect::<Result<_>>()?;
let schema = self.schema();
let num_searchers: usize = self.num_searchers.load(Ordering::Acquire);
let searchers = (0..num_searchers)
.map(|_| Searcher::new(schema.clone(), self.clone(), segment_readers.clone()))
.collect();
self.searcher_pool.publish_new_generation(searchers);
Ok(())
}
/// Returns a searcher
///
/// This method should be called every single time a search
/// query is performed.
/// The searchers are taken from a pool of `num_searchers` searchers.
/// If no searcher is available
/// this may block.
///
/// The same searcher must be used for a given query, as it ensures
/// the use of a consistent segment set.
pub fn searcher(&self) -> LeasedItem<Searcher> {
self.searcher_pool.acquire()
}
}
impl fmt::Debug for Index {
@@ -395,8 +346,6 @@ impl Clone for Index {
Index {
directory: self.directory.clone(),
schema: self.schema.clone(),
num_searchers: Arc::clone(&self.num_searchers),
searcher_pool: Arc::clone(&self.searcher_pool),
tokenizers: self.tokenizers.clone(),
executor: self.executor.clone(),
}

View File

@@ -2,7 +2,6 @@ mod executor;
pub mod index;
mod index_meta;
mod inverted_index_reader;
mod pool;
pub mod searcher;
mod segment;
mod segment_component;

View File

@@ -210,6 +210,9 @@ pub mod space_usage;
pub mod store;
pub mod termdict;
mod reader;
pub use self::reader::{IndexReader, IndexReaderBuilder, ReloadPolicy};
mod snippet;
pub use self::snippet::{Snippet, SnippetGenerator};
@@ -298,6 +301,7 @@ mod tests {
use Index;
use IndexWriter;
use Postings;
use ReloadPolicy;
pub fn assert_nearly_equals(expected: f32, val: f32) {
assert!(
@@ -386,7 +390,8 @@ mod tests {
index_writer.commit().unwrap();
}
{
index.load_searchers().unwrap();
let index_reader = index.reader(ReloadPolicy::MANUAL);
indexer_reader.load_searchers().unwrap();
let searcher = index.searcher();
let term_a = Term::from_field_text(text_field, "a");
assert_eq!(searcher.doc_freq(&term_a), 3);

View File

@@ -151,7 +151,8 @@ impl PositionReader {
if self.ahead != Some(0) {
// the block currently available is not the block
// for the current position
self.bit_packer.decompress(position_data, self.buffer.as_mut(), num_bits);
self.bit_packer
.decompress(position_data, self.buffer.as_mut(), num_bits);
self.ahead = Some(0);
}
let block_len = compressed_block_size(num_bits);

View File

@@ -1,9 +1,9 @@
use bitpacking::BitPacker;
use bitpacking::BitPacker4x;
use common::BinarySerializable;
use common::CountingWriter;
use positions::{COMPRESSION_BLOCK_SIZE, LONG_SKIP_INTERVAL};
use std::io::{self, Write};
use bitpacking::BitPacker4x;
pub struct PositionSerializer<W: io::Write> {
bit_packer: BitPacker4x,
@@ -53,7 +53,9 @@ impl<W: io::Write> PositionSerializer<W> {
fn flush_block(&mut self) -> io::Result<()> {
let num_bits = self.bit_packer.num_bits(&self.block[..]);
self.write_skiplist.write_all(&[num_bits])?;
let written_len = self.bit_packer.compress(&self.block[..], &mut self.buffer, num_bits);
let written_len = self
.bit_packer
.compress(&self.block[..], &mut self.buffer, num_bits);
self.write_stream.write_all(&self.buffer[..written_len])?;
self.block.clear();
if (self.num_ints % LONG_SKIP_INTERVAL) == 0u64 {

123
src/reader/mod.rs Normal file
View File

@@ -0,0 +1,123 @@
mod pool;
use self::pool::{LeasedItem, Pool};
use core::Segment;
use directory::Directory;
use directory::META_LOCK;
use std::sync::Arc;
use Index;
use Result;
use Searcher;
use SegmentReader;
#[derive(Clone, Copy)]
pub enum ReloadPolicy {
MANUAL,
// NEAR_REAL_TIME(target_ms),
ON_COMMIT,
}
#[derive(Clone)]
pub struct IndexReaderBuilder {
num_searchers: usize,
reload_policy: ReloadPolicy,
index: Index,
}
impl IndexReaderBuilder {
pub(crate) fn new(index: Index) -> IndexReaderBuilder {
IndexReaderBuilder {
num_searchers: num_cpus::get(),
reload_policy: ReloadPolicy::MANUAL,
index,
}
}
}
impl Into<IndexReader> for IndexReaderBuilder {
fn into(self) -> IndexReader {
IndexReader::new(self.index, self.num_searchers, self.reload_policy)
}
}
struct InnerIndexReader {
num_searchers: usize,
searcher_pool: Pool<Searcher>,
reload_policy: ReloadPolicy,
index: Index,
}
impl InnerIndexReader {
fn load_searchers(&self) -> Result<()> {
let _meta_lock = self.index.directory().acquire_lock(&META_LOCK)?;
let searchable_segments = self.searchable_segments()?;
let segment_readers: Vec<SegmentReader> = searchable_segments
.iter()
.map(SegmentReader::open)
.collect::<Result<_>>()?;
let schema = self.index.schema();
let searchers = (0..self.num_searchers)
.map(|_| Searcher::new(schema.clone(), self.index.clone(), segment_readers.clone()))
.collect();
self.searcher_pool.publish_new_generation(searchers);
Ok(())
}
/// Returns the list of segments that are searchable
fn searchable_segments(&self) -> Result<Vec<Segment>> {
self.index.searchable_segments()
}
fn searcher(&self) -> LeasedItem<Searcher> {
self.searcher_pool.acquire()
}
}
pub struct IndexReader {
inner: Arc<InnerIndexReader>,
}
impl IndexReader {
/// Update searchers so that they reflect the state of the last
/// `.commit()`.
///
/// If indexing happens in the same process as searching,
/// you most likely want to call `.load_searchers()` right after each
/// successful call to `.commit()`.
///
/// If indexing and searching happen in different processes, the way to
/// get the freshest `index` at all time, is to watch `meta.json` and
/// call `load_searchers` whenever a changes happen.
pub fn load_searchers(&self) -> Result<()> {
self.inner.load_searchers()
}
/// Returns a searcher
///
/// This method should be called every single time a search
/// query is performed.
/// The searchers are taken from a pool of `num_searchers` searchers.
/// If no searcher is available
/// this may block.
///
/// The same searcher must be used for a given query, as it ensures
/// the use of a consistent segment set.
pub fn searcher(&self) -> LeasedItem<Searcher> {
self.inner.searcher()
}
pub(crate) fn new(
index: Index,
num_searchers: usize,
reload_policy: ReloadPolicy,
) -> IndexReader {
IndexReader {
inner: Arc::new(InnerIndexReader {
index,
num_searchers,
searcher_pool: Pool::new(),
reload_policy,
}),
}
}
}