Compare commits

..

16 Commits

Author SHA1 Message Date
Paul Masurel
4c846b1202 Added NRT directory kinda working 2019-04-05 10:07:29 +09:00
Paul Masurel
fac0013454 Added flush 2019-04-04 09:36:06 +09:00
Paul Masurel
95db5d9999 Merge branch 'master' into softcommits 2019-03-23 18:08:07 +09:00
Paul Masurel
7f0372fa97 reader 2019-02-16 16:09:16 +09:00
Paul Masurel
f8fdf68fcb unit test 2019-02-16 15:49:22 +09:00
Paul Masurel
c00e95cd04 Uncommited is not SegmentRegisters 2019-02-15 22:44:14 +09:00
Paul Masurel
a623d8f6d9 Added SegmentAvailable readonly view 2019-02-15 08:58:08 +09:00
Paul Masurel
b3ede2dd7e softcommits 2019-02-13 21:29:54 +09:00
Paul Masurel
b68686f040 opstamp constraint 2019-02-12 18:14:07 +09:00
Paul Masurel
629d3fb37f Added opstamp 2019-02-12 08:49:23 +09:00
Paul Masurel
f513f10e05 fmt 2019-02-08 15:04:35 +09:00
Paul Masurel
f262d4cc22 code cleaning 2019-02-08 14:54:34 +09:00
Paul Masurel
91e89714f4 Added soft commits 2019-02-08 14:42:52 +09:00
Paul Masurel
6fd3cb1254 Renaming 2019-02-06 05:48:15 +01:00
Paul Masurel
549b4e66e5 Using the new API 2019-02-06 00:17:56 +01:00
Paul Masurel
d9b2bf98e2 First stab 2019-02-05 21:23:07 +01:00
74 changed files with 981 additions and 5503 deletions

View File

@@ -29,7 +29,7 @@ addons:
matrix:
include:
# Android
- env: TARGET=aarch64-linux-android DISABLE_TESTS=1
- env: TARGET=aarch64-linux-android DISABLE_TESTS
#- env: TARGET=arm-linux-androideabi DISABLE_TESTS=1
#- env: TARGET=armv7-linux-androideabi DISABLE_TESTS=1
#- env: TARGET=i686-linux-android DISABLE_TESTS=1
@@ -61,9 +61,6 @@ before_script:
script:
- bash ci/script.sh
after_success:
- cargo doc-upload
before_deploy:
- sh ci/before_deploy.sh
@@ -71,11 +68,6 @@ cache: cargo
before_cache:
# Travis can't cache files that are not readable by "others"
- chmod -R a+r $HOME/.cargo
- find ./target/debug -type f -maxdepth 1 -delete
- rm -f ./target/.rustc_info.json
- rm -fr ./target/debug/{deps,.fingerprint}/tantivy*
- rm -r target/debug/examples/
- ls -1 examples/ | sed -e 's/\.rs$//' | xargs -I "{}" find target/* -name "*{}*" -type f -delete
#branches:
# only:

View File

@@ -1,41 +1,3 @@
Tantivy 0.10.0
=====================
*Tantivy 0.10.0 index format is compatible with the index format in 0.9.0.*
- Added an ASCII folding filter (@drusellers)
- Bugfix in `query.count` in presence of deletes (@pmasurel)
Minor
---------
- Small simplification of the code.
Calling .freq() or .doc() when .advance() has never been called
on segment postings should panic from now on.
- Tokens exceeding `u16::max_value() - 4` chars are discarded silently instead of panicking.
- Fast fields are now preloaded when the `SegmentReader` is created.
- `IndexMeta` is now public. (@hntd187)
- `IndexWriter` `add_document`, `delete_term`. `IndexWriter` is `Sync`, making it possible to use it with a `
Arc<RwLock<IndexWriter>>`. `add_document` and `delete_term` can
only require a read lock. (@pmasurel)
- Introducing `Opstamp` as an expressive type alias for `u64`. (@petr-tik)
- Stamper now relies on `AtomicU64` on all platforms (@petr-tik)
## How to update?
Your existing indexes are usable as is, but you may need some
trivial updates.
### Fast fields
Fast fields used to be accessed directly from the `SegmentReader`.
The API changed, you are now required to acquire your fast field reader via the
`segment_reader.fast_fields()`, and use one of the typed method:
- `.u64()`, `.i64()` if your field is single-valued ;
- `.u64s()`, `.i64s()` if your field is multi-valued ;
- `.bytes()` if your field is bytes fast field.
Tantivy 0.9.0
=====================
*0.9.0 index format is not compatible with the
@@ -55,35 +17,6 @@ previous index format.*
- Added IndexReader. By default, index is reloaded automatically upon new commits (@fulmicoton)
- SIMD linear search within blocks (@fulmicoton)
## How to update ?
tantivy 0.9 brought some API breaking change.
To update from tantivy 0.8, you will need to go through the following steps.
- `schema::INT_INDEXED` and `schema::INT_STORED` should be replaced by `schema::INDEXED` and `schema::INT_STORED`.
- The index now does not hold the pool of searcher anymore. You are required to create an intermediary object called
`IndexReader` for this.
```rust
// create the reader. You typically need to create 1 reader for the entire
// lifetime of you program.
let reader = index.reader()?;
// Acquire a searcher (previously `index.searcher()`) is now written:
let searcher = reader.searcher();
// With the default setting of the reader, you are not required to
// call `index.load_searchers()` anymore.
//
// The IndexReader will pick up that change automatically, regardless
// of whether the update was done in a different process or not.
// If this behavior is not wanted, you can create your reader with
// the `ReloadPolicy::Manual`, and manually decide when to reload the index
// by calling `reader.reload()?`.
```
Tantivy 0.8.2
=====================
Fixing build for x86_64 platforms. (#496)

View File

@@ -1,6 +1,6 @@
[package]
name = "tantivy"
version = "0.10.0-dev"
version = "0.9.0"
authors = ["Paul Masurel <paul.masurel@gmail.com>"]
license = "MIT"
categories = ["database-implementations", "data-structures"]
@@ -23,7 +23,7 @@ snap = {version="0.2"}
atomicwrites = {version="0.2.2", optional=true}
tempfile = "3.0"
log = "0.4"
combine = ">=3.6.0,<4.0.0"
combine = "3"
tempdir = "0.3"
serde = "1.0"
serde_derive = "1.0"

View File

@@ -18,8 +18,8 @@ use tantivy::fastfield::FastFieldReader;
use tantivy::query::QueryParser;
use tantivy::schema::Field;
use tantivy::schema::{Schema, FAST, INDEXED, TEXT};
use tantivy::Index;
use tantivy::SegmentReader;
use tantivy::{Index, TantivyError};
#[derive(Default)]
struct Stats {
@@ -75,18 +75,9 @@ impl Collector for StatsCollector {
fn for_segment(
&self,
_segment_local_id: u32,
segment_reader: &SegmentReader,
segment: &SegmentReader,
) -> tantivy::Result<StatsSegmentCollector> {
let fast_field_reader = segment_reader
.fast_fields()
.u64(self.field)
.ok_or_else(|| {
let field_name = segment_reader.schema().get_field_name(self.field);
TantivyError::SchemaError(format!(
"Field {:?} is not a u64 fast field.",
field_name
))
})?;
let fast_field_reader = segment.fast_field_reader(self.field)?;
Ok(StatsSegmentCollector {
fast_field_reader,
stats: Stats::default(),

View File

@@ -1,107 +0,0 @@
// # Indexing from different threads.
//
// It is fairly common to have to index from different threads.
// Tantivy forbids to create more than one `IndexWriter` at a time.
//
// This `IndexWriter` itself has its own multithreaded layer, so managing your own
// indexing threads will not help. However, it can still be useful for some applications.
//
// For instance, if preparing documents to send to tantivy before indexing is the bottleneck of
// your application, it is reasonable to have multiple threads.
//
// Another very common reason to want to index from multiple threads, is implementing a webserver
// with CRUD capabilities. The server framework will most likely handle request from
// different threads.
//
// The recommended way to address both of these use case is to wrap your `IndexWriter` into a
// `Arc<RwLock<IndexWriter>>`.
//
// While this is counterintuitive, adding and deleting documents do not require mutability
// over the `IndexWriter`, so several threads will be able to do this operation concurrently.
//
// The example below does not represent an actual real-life use case (who would spawn thread to
// index a single document?), but aims at demonstrating the mechanism that makes indexing
// from several threads possible.
extern crate tempdir;
// ---
// Importing tantivy...
#[macro_use]
extern crate tantivy;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
use tantivy::schema::{Schema, STORED, TEXT};
use tantivy::Opstamp;
use tantivy::{Index, IndexWriter};
fn main() -> tantivy::Result<()> {
// # Defining the schema
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT | STORED);
let body = schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let index_writer: Arc<RwLock<IndexWriter>> = Arc::new(RwLock::new(index.writer(50_000_000)?));
// # First indexing thread.
let index_writer_clone_1 = index_writer.clone();
thread::spawn(move || {
// we index 100 times the document... for the sake of the example.
for i in 0..100 {
let opstamp = {
// A read lock is sufficient here.
let index_writer_rlock = index_writer_clone_1.read().unwrap();
index_writer_rlock.add_document(
doc!(
title => "Of Mice and Men",
body => "A few miles south of Soledad, the Salinas River drops in close to the hillside \
bank and runs deep and green. The water is warm too, for it has slipped twinkling \
over the yellow sands in the sunlight before reaching the narrow pool. On one \
side of the river the golden foothill slopes curve up to the strong and rocky \
Gabilan Mountains, but on the valley side the water is lined with trees—willows \
fresh and green with every spring, carrying in their lower leaf junctures the \
debris of the winters flooding; and sycamores with mottled, white, recumbent \
limbs and branches that arch over the pool"
))
};
println!("add doc {} from thread 1 - opstamp {}", i, opstamp);
thread::sleep(Duration::from_millis(20));
}
});
// # Second indexing thread.
let index_writer_clone_2 = index_writer.clone();
// For convenience, tantivy also comes with a macro to
// reduce the boilerplate above.
thread::spawn(move || {
// we index 100 times the document... for the sake of the example.
for i in 0..100 {
// A read lock is sufficient here.
let opstamp = {
let index_writer_rlock = index_writer_clone_2.read().unwrap();
index_writer_rlock.add_document(doc!(
title => "Manufacturing consent",
body => "Some great book description..."
))
};
println!("add doc {} from thread 2 - opstamp {}", i, opstamp);
thread::sleep(Duration::from_millis(10));
}
});
// # In the main thread, we commit 10 times, once every 500ms.
for _ in 0..10 {
let opstamp: Opstamp = {
// Committing or rollbacking on the other hand requires write lock. This will block other threads.
let mut index_writer_wlock = index_writer.write().unwrap();
index_writer_wlock.commit().unwrap()
};
println!("committed with opstamp {}", opstamp);
thread::sleep(Duration::from_millis(500));
}
Ok(())
}

View File

@@ -17,7 +17,6 @@ use Result;
use Score;
use SegmentLocalId;
use SegmentReader;
use TantivyError;
struct Hit<'a> {
count: u64,
@@ -265,10 +264,7 @@ impl Collector for FacetCollector {
_: SegmentLocalId,
reader: &SegmentReader,
) -> Result<FacetSegmentCollector> {
let field_name = reader.schema().get_field_name(self.field);
let facet_reader = reader.facet_reader(self.field).ok_or_else(|| {
TantivyError::SchemaError(format!("Field {:?} is not a facet field.", field_name))
})?;
let facet_reader = reader.facet_reader(self.field)?;
let mut collapse_mapping = Vec::new();
let mut counts = Vec::new();

View File

@@ -2,7 +2,6 @@ use super::Collector;
use super::SegmentCollector;
use collector::Fruit;
use std::marker::PhantomData;
use std::ops::Deref;
use DocId;
use Result;
use Score;
@@ -200,10 +199,7 @@ impl<'a> Collector for MultiCollector<'a> {
}
fn requires_scoring(&self) -> bool {
self.collector_wrappers
.iter()
.map(Deref::deref)
.any(Collector::requires_scoring)
self.collector_wrappers.iter().any(|c| c.requires_scoring())
}
fn merge_fruits(&self, segments_multifruits: Vec<MultiFruit>) -> Result<MultiFruit> {

View File

@@ -114,15 +114,11 @@ impl Collector for FastFieldTestCollector {
fn for_segment(
&self,
_: SegmentLocalId,
segment_reader: &SegmentReader,
reader: &SegmentReader,
) -> Result<FastFieldSegmentCollector> {
let reader = segment_reader
.fast_fields()
.u64(self.field)
.expect("Requested field is not a fast field.");
Ok(FastFieldSegmentCollector {
vals: Vec::new(),
reader,
reader: reader.fast_field_reader(self.field)?,
})
}
@@ -174,14 +170,11 @@ impl Collector for BytesFastFieldTestCollector {
fn for_segment(
&self,
_segment_local_id: u32,
segment_reader: &SegmentReader,
segment: &SegmentReader,
) -> Result<BytesFastFieldSegmentCollector> {
Ok(BytesFastFieldSegmentCollector {
vals: Vec::new(),
reader: segment_reader
.fast_fields()
.bytes(self.field)
.expect("Field is not a bytes fast field."),
reader: segment.bytes_fast_field_reader(self.field)?,
})
}
@@ -198,7 +191,7 @@ impl SegmentCollector for BytesFastFieldSegmentCollector {
type Fruit = Vec<u8>;
fn collect(&mut self, doc: u32, _score: f32) {
let data = self.reader.get_bytes(doc);
let data = self.reader.get_val(doc);
self.vals.extend(data);
}

View File

@@ -98,11 +98,11 @@ where
.collect())
}
pub(crate) fn for_segment<F: PartialOrd>(
pub(crate) fn for_segment(
&self,
segment_id: SegmentLocalId,
_: &SegmentReader,
) -> Result<TopSegmentCollector<F>> {
) -> Result<TopSegmentCollector<T>> {
Ok(TopSegmentCollector::new(segment_id, self.limit))
}
}

View File

@@ -5,12 +5,10 @@ use collector::SegmentCollector;
use fastfield::FastFieldReader;
use fastfield::FastValue;
use schema::Field;
use std::marker::PhantomData;
use DocAddress;
use Result;
use SegmentLocalId;
use SegmentReader;
use TantivyError;
/// The Top Field Collector keeps track of the K documents
/// sorted by a fast field in the index
@@ -108,15 +106,8 @@ impl<T: FastValue + PartialOrd + Send + Sync + 'static> Collector for TopDocsByF
reader: &SegmentReader,
) -> Result<TopFieldSegmentCollector<T>> {
let collector = self.collector.for_segment(segment_local_id, reader)?;
let reader = reader.fast_fields().u64(self.field).ok_or_else(|| {
let field_name = reader.schema().get_field_name(self.field);
TantivyError::SchemaError(format!("Failed to find fast field reader {:?}", field_name))
})?;
Ok(TopFieldSegmentCollector {
collector,
reader,
_type: PhantomData,
})
let reader = reader.fast_field_reader(self.field)?;
Ok(TopFieldSegmentCollector { collector, reader })
}
fn requires_scoring(&self) -> bool {
@@ -131,10 +122,9 @@ impl<T: FastValue + PartialOrd + Send + Sync + 'static> Collector for TopDocsByF
}
}
pub struct TopFieldSegmentCollector<T> {
collector: TopSegmentCollector<u64>,
reader: FastFieldReader<u64>,
_type: PhantomData<T>,
pub struct TopFieldSegmentCollector<T: FastValue + PartialOrd> {
collector: TopSegmentCollector<T>,
reader: FastFieldReader<T>,
}
impl<T: FastValue + PartialOrd + Send + Sync + 'static> SegmentCollector
@@ -148,11 +138,7 @@ impl<T: FastValue + PartialOrd + Send + Sync + 'static> SegmentCollector
}
fn harvest(self) -> Vec<(T, DocAddress)> {
self.collector
.harvest()
.into_iter()
.map(|(val, doc_address)| (T::from_u64(val), doc_address))
.collect()
self.collector.harvest()
}
}
@@ -249,7 +235,7 @@ mod tests {
.for_segment(0, segment)
.map(|_| ())
.unwrap_err(),
TantivyError::SchemaError(_)
TantivyError::FastFieldError(_)
);
}

View File

@@ -13,6 +13,7 @@ pub use self::serialize::{BinarySerializable, FixedSize};
pub use self::vint::{read_u32_vint, serialize_vint_u32, write_u32_vint, VInt};
pub use byteorder::LittleEndian as Endianness;
/// Segment's max doc must be `< MAX_DOC_LIMIT`.
///
/// We do not allow segments with more than

View File

@@ -340,7 +340,7 @@ impl Index {
Ok(self
.searchable_segment_metas()?
.iter()
.map(SegmentMeta::id)
.map(|segment_meta| segment_meta.id())
.collect())
}
}

View File

@@ -2,7 +2,6 @@ use core::SegmentMeta;
use schema::Schema;
use serde_json;
use std::fmt;
use Opstamp;
/// Meta information about the `Index`.
///
@@ -16,7 +15,7 @@ use Opstamp;
pub struct IndexMeta {
pub segments: Vec<SegmentMeta>,
pub schema: Schema,
pub opstamp: Opstamp,
pub opstamp: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
}

View File

@@ -59,7 +59,7 @@ impl Searcher {
) -> Searcher {
let store_readers = segment_readers
.iter()
.map(SegmentReader::get_store_reader)
.map(|segment_reader| segment_reader.get_store_reader())
.collect();
Searcher {
schema,
@@ -218,7 +218,7 @@ impl fmt::Debug for Searcher {
let segment_ids = self
.segment_readers
.iter()
.map(SegmentReader::segment_id)
.map(|segment_reader| segment_reader.segment_id())
.collect::<Vec<_>>();
write!(f, "Searcher({:?})", segment_ids)
}

View File

@@ -10,7 +10,6 @@ use schema::Schema;
use std::fmt;
use std::path::PathBuf;
use std::result;
use Opstamp;
use Result;
/// A segment is a piece of the index.
@@ -51,7 +50,7 @@ impl Segment {
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),

View File

@@ -5,7 +5,6 @@ use serde;
use std::collections::HashSet;
use std::fmt;
use std::path::PathBuf;
use Opstamp;
lazy_static! {
static ref INVENTORY: Inventory<InnerSegmentMeta> = { Inventory::new() };
@@ -14,7 +13,7 @@ lazy_static! {
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta {
num_deleted_docs: u32,
opstamp: Opstamp,
opstamp: u64,
}
/// `SegmentMeta` contains simple meta information about a segment.
@@ -137,9 +136,9 @@ impl SegmentMeta {
self.max_doc() - self.num_deleted_docs()
}
/// Returns the `Opstamp` of the last delete operation
/// Returns the opstamp of the last delete operation
/// taken in account in this segment.
pub fn delete_opstamp(&self) -> Option<Opstamp> {
pub fn delete_opstamp(&self) -> Option<u64> {
self.tracked
.deletes
.as_ref()
@@ -153,7 +152,7 @@ impl SegmentMeta {
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta {
let delete_meta = DeleteMeta {
num_deleted_docs,
opstamp,

View File

@@ -5,10 +5,14 @@ use core::Segment;
use core::SegmentComponent;
use core::SegmentId;
use directory::ReadOnlySource;
use error::TantivyError;
use fastfield::DeleteBitSet;
use fastfield::FacetReader;
use fastfield::FastFieldReaders;
use fastfield::FastFieldReader;
use fastfield::{self, FastFieldNotAvailableError};
use fastfield::{BytesFastFieldReader, FastValue, MultiValueIntFastFieldReader};
use fieldnorm::FieldNormReader;
use schema::Cardinality;
use schema::Field;
use schema::FieldType;
use schema::Schema;
@@ -47,7 +51,7 @@ pub struct SegmentReader {
postings_composite: CompositeFile,
positions_composite: CompositeFile,
positions_idx_composite: CompositeFile,
fast_fields_readers: Arc<FastFieldReaders>,
fast_fields_composite: CompositeFile,
fieldnorms_composite: CompositeFile,
store_source: ReadOnlySource,
@@ -101,21 +105,93 @@ impl SegmentReader {
///
/// # Panics
/// May panic if the index is corrupted.
pub fn fast_fields(&self) -> &FastFieldReaders {
&self.fast_fields_readers
pub fn fast_field_reader<Item: FastValue>(
&self,
field: Field,
) -> fastfield::Result<FastFieldReader<Item>> {
let field_entry = self.schema.get_field_entry(field);
if Item::fast_field_cardinality(field_entry.field_type()) == Some(Cardinality::SingleValue)
{
self.fast_fields_composite
.open_read(field)
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))
.map(FastFieldReader::open)
} else {
Err(FastFieldNotAvailableError::new(field_entry))
}
}
pub(crate) fn fast_field_reader_with_idx<Item: FastValue>(
&self,
field: Field,
idx: usize,
) -> fastfield::Result<FastFieldReader<Item>> {
if let Some(ff_source) = self.fast_fields_composite.open_read_with_idx(field, idx) {
Ok(FastFieldReader::open(ff_source))
} else {
let field_entry = self.schema.get_field_entry(field);
Err(FastFieldNotAvailableError::new(field_entry))
}
}
/// Accessor to the `MultiValueIntFastFieldReader` associated to a given `Field`.
/// May panick if the field is not a multivalued fastfield of the type `Item`.
pub fn multi_fast_field_reader<Item: FastValue>(
&self,
field: Field,
) -> fastfield::Result<MultiValueIntFastFieldReader<Item>> {
let field_entry = self.schema.get_field_entry(field);
if Item::fast_field_cardinality(field_entry.field_type()) == Some(Cardinality::MultiValues)
{
let idx_reader = self.fast_field_reader_with_idx(field, 0)?;
let vals_reader = self.fast_field_reader_with_idx(field, 1)?;
Ok(MultiValueIntFastFieldReader::open(idx_reader, vals_reader))
} else {
Err(FastFieldNotAvailableError::new(field_entry))
}
}
/// Accessor to the `BytesFastFieldReader` associated to a given `Field`.
pub fn bytes_fast_field_reader(&self, field: Field) -> fastfield::Result<BytesFastFieldReader> {
let field_entry = self.schema.get_field_entry(field);
match *field_entry.field_type() {
FieldType::Bytes => {}
_ => return Err(FastFieldNotAvailableError::new(field_entry)),
}
let idx_reader = self
.fast_fields_composite
.open_read_with_idx(field, 0)
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))
.map(FastFieldReader::open)?;
let values = self
.fast_fields_composite
.open_read_with_idx(field, 1)
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))?;
Ok(BytesFastFieldReader::open(idx_reader, values))
}
/// Accessor to the `FacetReader` associated to a given `Field`.
pub fn facet_reader(&self, field: Field) -> Option<FacetReader> {
pub fn facet_reader(&self, field: Field) -> Result<FacetReader> {
let field_entry = self.schema.get_field_entry(field);
if field_entry.field_type() != &FieldType::HierarchicalFacet {
return None;
return Err(TantivyError::InvalidArgument(format!(
"The field {:?} is not a \
hierarchical facet.",
field_entry
)));
}
let term_ords_reader = self.fast_fields().u64s(field)?;
let termdict_source = self.termdict_composite.open_read(field)?;
let term_ords_reader = self.multi_fast_field_reader(field)?;
let termdict_source = self.termdict_composite.open_read(field).ok_or_else(|| {
TantivyError::InvalidArgument(format!(
"The field \"{}\" is a hierarchical \
but this segment does not seem to have the field term \
dictionary.",
field_entry.name()
))
})?;
let termdict = TermDictionary::from_source(&termdict_source);
let facet_reader = FacetReader::new(term_ords_reader, termdict);
Some(facet_reader)
Ok(facet_reader)
}
/// Accessor to the segment's `Field norms`'s reader.
@@ -171,12 +247,8 @@ impl SegmentReader {
}
};
let schema = segment.schema();
let fast_fields_data = segment.open_read(SegmentComponent::FASTFIELDS)?;
let fast_fields_composite = CompositeFile::open(&fast_fields_data)?;
let fast_field_readers =
Arc::new(FastFieldReaders::load_all(&schema, &fast_fields_composite)?);
let fieldnorms_data = segment.open_read(SegmentComponent::FIELDNORMS)?;
let fieldnorms_composite = CompositeFile::open(&fieldnorms_data)?;
@@ -188,13 +260,14 @@ impl SegmentReader {
None
};
let schema = segment.schema();
Ok(SegmentReader {
inv_idx_reader_cache: Arc::new(RwLock::new(HashMap::new())),
max_doc: segment.meta().max_doc(),
num_docs: segment.meta().num_docs(),
termdict_composite,
postings_composite,
fast_fields_readers: fast_field_readers,
fast_fields_composite,
fieldnorms_composite,
segment_id: segment.id(),
store_source,
@@ -308,12 +381,12 @@ impl SegmentReader {
self.postings_composite.space_usage(),
self.positions_composite.space_usage(),
self.positions_idx_composite.space_usage(),
self.fast_fields_readers.space_usage(),
self.fast_fields_composite.space_usage(),
self.fieldnorms_composite.space_usage(),
self.get_store_reader().space_usage(),
self.delete_bitset_opt
.as_ref()
.map(DeleteBitSet::space_usage)
.map(|x| x.space_usage())
.unwrap_or(0),
)
}

View File

@@ -48,14 +48,14 @@ impl RetryPolicy {
///
/// It is transparently associated to a lock file, that gets deleted
/// on `Drop.` The lock is released automatically on `Drop`.
pub struct DirectoryLock(Box<Drop + Send + Sync + 'static>);
pub struct DirectoryLock(Box<Drop + Send + 'static>);
struct DirectoryLockGuard {
directory: Box<Directory>,
path: PathBuf,
}
impl<T: Drop + Send + Sync + 'static> From<Box<T>> for DirectoryLock {
impl<T: Drop + Send + 'static> From<Box<T>> for DirectoryLock {
fn from(underlying: Box<T>) -> Self {
DirectoryLock(underlying)
}
@@ -205,6 +205,16 @@ pub trait Directory: DirectoryClone + fmt::Debug + Send + Sync + 'static {
/// `OnCommit` `ReloadPolicy`. Not implementing watch in a `Directory` only prevents the
/// `OnCommit` `ReloadPolicy` to work properly.
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle;
/// Ensure that all volatile files reach are persisted (in directory where that makes sense.)
///
/// In order to make Near Real Time efficient, tantivy introduced the notion of soft_commit vs
/// commit. Commit will call `.flush()`, while softcommit won't.
///
/// `meta.json` should be the last file to be flushed.
fn flush(&self) -> io::Result<()> {
Ok(())
}
}
/// DirectoryClone

View File

@@ -368,7 +368,7 @@ impl Drop for ReleaseLockFile {
/// This Write wraps a File, but has the specificity of
/// call `sync_all` on flush.
struct SafeFileWriter(File);
pub struct SafeFileWriter(File);
impl SafeFileWriter {
fn new(file: File) -> SafeFileWriter {

View File

@@ -13,6 +13,7 @@ mod managed_directory;
mod ram_directory;
mod read_only_source;
mod watch_event_router;
mod nrt_directory;
/// Errors specific to the directory module.
pub mod error;

View File

@@ -0,0 +1,195 @@
use directory::Directory;
use std::path::{PathBuf, Path};
use directory::ReadOnlySource;
use directory::error::OpenReadError;
use directory::error::DeleteError;
use std::io::{BufWriter, Cursor};
use directory::SeekableWrite;
use directory::error::OpenWriteError;
use directory::WatchHandle;
use directory::ram_directory::InnerRamDirectory;
use std::sync::RwLock;
use std::sync::Arc;
use directory::WatchCallback;
use std::fmt;
use std::io;
use std::io::{Seek, Write};
use directory::DirectoryClone;
const BUFFER_LEN: usize = 1_000_000;
pub enum NRTWriter {
InRam {
buffer: Cursor<Vec<u8>>,
path: PathBuf,
nrt_directory: NRTDirectory
},
UnderlyingFile(BufWriter<Box<SeekableWrite>>)
}
impl NRTWriter {
pub fn new(path: PathBuf, nrt_directory: NRTDirectory) -> NRTWriter {
NRTWriter::InRam {
buffer: Cursor::new(Vec::with_capacity(BUFFER_LEN)),
path,
nrt_directory,
}
}
}
impl io::Seek for NRTWriter {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
match self {
NRTWriter::InRam { buffer, path, nrt_directory } => {
buffer.seek(pos)
}
NRTWriter::UnderlyingFile(file) => {
file.seek(pos)
}
}
}
}
impl io::Write for NRTWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.write_all(buf)?;
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
match self {
NRTWriter::InRam { buffer, path, nrt_directory } => {
let mut cache_wlock = nrt_directory.cache.write().unwrap();
cache_wlock.write(path.clone(), buffer.get_ref());
Ok(())
}
NRTWriter::UnderlyingFile(file) => {
file.flush()
}
}
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
// Working around the borrow checker.
let mut underlying_write_opt: Option<BufWriter<Box<SeekableWrite>>> = None;
if let NRTWriter::InRam { buffer, path, nrt_directory } = self {
if buffer.get_ref().len() + buf.len() > BUFFER_LEN {
// We can't keep this in RAM. Let's move it to the underlying directory.
underlying_write_opt = Some(nrt_directory.open_write(path)
.map_err(|open_err| {
io::Error::new(io::ErrorKind::Other, open_err)
})?);
}
}
if let Some(underlying_write) = underlying_write_opt {
*self = NRTWriter::UnderlyingFile(underlying_write);
}
match self {
NRTWriter::InRam { buffer, path, nrt_directory } => {
assert!(buffer.get_ref().len() + buf.len() <= BUFFER_LEN);
buffer.write_all(buf)
}
NRTWriter::UnderlyingFile(file) => {
file.write_all(buf)
}
}
}
}
pub struct NRTDirectory {
underlying: Box<Directory>,
cache: Arc<RwLock<InnerRamDirectory>>,
}
impl Clone for NRTDirectory {
fn clone(&self) -> Self {
NRTDirectory {
underlying: self.underlying.box_clone(),
cache: self.cache.clone()
}
}
}
impl NRTDirectory {
fn wrap(underlying: Box<Directory>) -> NRTDirectory {
NRTDirectory {
underlying,
cache: Default::default()
}
}
}
impl fmt::Debug for NRTDirectory {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "NRTDirectory({:?})", self.underlying)
}
}
impl Directory for NRTDirectory {
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
unimplemented!()
}
fn delete(&self, path: &Path) -> Result<(), DeleteError> {
// We explicitly release the lock, to prevent a panic on the underlying directory
// to poison the lock.
//
// File can only go from cache to underlying so the result does not lead to
// any inconsistency.
{
let mut cache_wlock = self.cache.write().unwrap();
if cache_wlock.exists(path) {
return cache_wlock.delete(path);
}
}
self.underlying.delete(path)
}
fn exists(&self, path: &Path) -> bool {
// We explicitly release the lock, to prevent a panic on the underlying directory
// to poison the lock.
//
// File can only go from cache to underlying so the result does not lead to
// any inconsistency.
{
let rlock_cache = self.cache.read().unwrap();
if rlock_cache.exists(path) {
return true;
}
}
self.underlying.exists(path)
}
fn open_write(&mut self, path: &Path) -> Result<BufWriter<Box<SeekableWrite>>, OpenWriteError> {
let mut cache_wlock = self.cache.write().unwrap();
// TODO might poison our lock. I don't know have a sound solution yet.
let path_buf = path.to_owned();
if self.underlying.exists(path) {
return Err(OpenWriteError::FileAlreadyExists(path_buf));
}
let exists = cache_wlock.write(path_buf.clone(), &[]);
// force the creation of the file to mimic the MMap directory.
if exists {
Err(OpenWriteError::FileAlreadyExists(path_buf))
} else {
let vec_writer = NRTWriter::new(path_buf.clone(), self.clone());
Ok(BufWriter::new(Box::new(vec_writer)))
}
}
fn atomic_read(&self, path: &Path) -> Result<Vec<u8>, OpenReadError> {
self.underlying.atomic_read(path)
}
fn atomic_write(&mut self, path: &Path, data: &[u8]) -> io::Result<()> {
self.underlying.atomic_write(path, data)
}
fn watch(&self, watch_callback: WatchCallback) -> WatchHandle {
self.underlying.watch(watch_callback)
}
}

View File

@@ -71,36 +71,36 @@ impl Write for VecWriter {
}
#[derive(Default)]
struct InnerDirectory {
pub(crate) struct InnerRamDirectory {
fs: HashMap<PathBuf, ReadOnlySource>,
watch_router: WatchCallbackList,
}
impl InnerDirectory {
fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
impl InnerRamDirectory {
pub fn write(&mut self, path: PathBuf, data: &[u8]) -> bool {
let data = ReadOnlySource::new(Vec::from(data));
self.fs.insert(path, data).is_some()
}
fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
pub fn open_read(&self, path: &Path) -> Result<ReadOnlySource, OpenReadError> {
self.fs
.get(path)
.ok_or_else(|| OpenReadError::FileDoesNotExist(PathBuf::from(path)))
.map(Clone::clone)
.map(|el| el.clone())
}
fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> {
pub fn delete(&mut self, path: &Path) -> result::Result<(), DeleteError> {
match self.fs.remove(path) {
Some(_) => Ok(()),
None => Err(DeleteError::FileDoesNotExist(PathBuf::from(path))),
}
}
fn exists(&self, path: &Path) -> bool {
pub fn exists(&self, path: &Path) -> bool {
self.fs.contains_key(path)
}
fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
pub fn watch(&mut self, watch_handle: WatchCallback) -> WatchHandle {
self.watch_router.subscribe(watch_handle)
}
}
@@ -118,7 +118,7 @@ impl fmt::Debug for RAMDirectory {
///
#[derive(Clone, Default)]
pub struct RAMDirectory {
fs: Arc<RwLock<InnerDirectory>>,
fs: Arc<RwLock<InnerRamDirectory>>,
}
impl RAMDirectory {

View File

@@ -1,5 +1,4 @@
use common::BitSet;
use fastfield::DeleteBitSet;
use std::borrow::Borrow;
use std::borrow::BorrowMut;
use std::cmp::Ordering;
@@ -96,23 +95,9 @@ pub trait DocSet {
}
/// Returns the number documents matching.
/// Calling this method consumes the `DocSet`.
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
let mut count = 0u32;
while self.advance() {
if !delete_bitset.is_deleted(self.doc()) {
count += 1u32;
}
}
count
}
/// Returns the count of documents, deleted or not.
/// Calling this method consumes the `DocSet`.
///
/// Of course, the result is an upper bound of the result
/// given by `count()`.
fn count_including_deleted(&mut self) -> u32 {
/// Calling this method consumes the `DocSet`.
fn count(&mut self) -> u32 {
let mut count = 0u32;
while self.advance() {
count += 1u32;
@@ -142,14 +127,9 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.size_hint()
}
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
fn count(&mut self) -> u32 {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.count(delete_bitset)
}
fn count_including_deleted(&mut self) -> u32 {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.count_including_deleted()
unboxed.count()
}
fn append_to_bitset(&mut self, bitset: &mut BitSet) {

View File

@@ -23,14 +23,14 @@ mod tests {
index_writer.add_document(doc!(field=>vec![0u8; 1000]));
assert!(index_writer.commit().is_ok());
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let bytes_reader = segment_reader.fast_fields().bytes(field).unwrap();
let reader = searcher.segment_reader(0);
let bytes_reader = reader.bytes_fast_field_reader(field).unwrap();
assert_eq!(bytes_reader.get_bytes(0), &[0u8, 1, 2, 3]);
assert!(bytes_reader.get_bytes(1).is_empty());
assert_eq!(bytes_reader.get_bytes(2), &[255u8]);
assert_eq!(bytes_reader.get_bytes(3), &[1u8, 3, 5, 7, 9]);
assert_eq!(bytes_reader.get_val(0), &[0u8, 1, 2, 3]);
assert!(bytes_reader.get_val(1).is_empty());
assert_eq!(bytes_reader.get_val(2), &[255u8]);
assert_eq!(bytes_reader.get_val(3), &[1u8, 3, 5, 7, 9]);
let long = vec![0u8; 1000];
assert_eq!(bytes_reader.get_bytes(4), long.as_slice());
assert_eq!(bytes_reader.get_val(4), long.as_slice());
}
}

View File

@@ -14,7 +14,6 @@ use DocId;
///
/// Reading the value for a document is done by reading the start index for it,
/// and the start index for the next document, and keeping the bytes in between.
#[derive(Clone)]
pub struct BytesFastFieldReader {
idx_reader: FastFieldReader<u64>,
values: OwningRef<ReadOnlySource, [u8]>,
@@ -29,20 +28,10 @@ impl BytesFastFieldReader {
BytesFastFieldReader { idx_reader, values }
}
fn range(&self, doc: DocId) -> (usize, usize) {
/// Returns the bytes associated to the given `doc`
pub fn get_val(&self, doc: DocId) -> &[u8] {
let start = self.idx_reader.get(doc) as usize;
let stop = self.idx_reader.get(doc + 1) as usize;
(start, stop)
}
/// Returns the bytes associated to the given `doc`
pub fn get_bytes(&self, doc: DocId) -> &[u8] {
let (start, stop) = self.range(doc);
&self.values[start..stop]
}
/// Returns the overall number of bytes in this bytes fast field.
pub fn total_num_bytes(&self) -> usize {
self.values.len()
}
}

View File

@@ -53,18 +53,16 @@ impl DeleteBitSet {
}
}
/// Returns true iff the document is still "alive". In other words, if it has not been deleted.
pub fn is_alive(&self, doc: DocId) -> bool {
!self.is_deleted(doc)
}
/// Returns true iff the document has been marked as deleted.
#[inline(always)]
/// Returns whether the document has been marked as deleted.
pub fn is_deleted(&self, doc: DocId) -> bool {
let byte_offset = doc / 8u32;
let b: u8 = (*self.data)[byte_offset as usize];
let shift = (doc & 7u32) as u8;
b & (1u8 << shift) != 0
if self.len == 0 {
false
} else {
let byte_offset = doc / 8u32;
let b: u8 = (*self.data)[byte_offset as usize];
let shift = (doc & 7u32) as u8;
b & (1u8 << shift) != 0
}
}
/// Summarize total space usage of this bitset.

View File

@@ -30,7 +30,6 @@ pub use self::error::{FastFieldNotAvailableError, Result};
pub use self::facet_reader::FacetReader;
pub use self::multivalued::{MultiValueIntFastFieldReader, MultiValueIntFastFieldWriter};
pub use self::reader::FastFieldReader;
pub use self::readers::FastFieldReaders;
pub use self::serializer::FastFieldSerializer;
pub use self::writer::{FastFieldsWriter, IntFastFieldWriter};
use common;
@@ -44,7 +43,6 @@ mod error;
mod facet_reader;
mod multivalued;
mod reader;
mod readers;
mod serializer;
mod writer;
@@ -80,6 +78,10 @@ impl FastValue for u64 {
*self
}
fn as_u64(&self) -> u64 {
*self
}
fn fast_field_cardinality(field_type: &FieldType) -> Option<Cardinality> {
match *field_type {
FieldType::U64(ref integer_options) => integer_options.get_fastfield_cardinality(),
@@ -87,10 +89,6 @@ impl FastValue for u64 {
_ => None,
}
}
fn as_u64(&self) -> u64 {
*self
}
}
impl FastValue for i64 {

View File

@@ -37,7 +37,9 @@ mod tests {
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let mut vals = Vec::new();
let multi_value_reader = segment_reader.fast_fields().u64s(field).unwrap();
let multi_value_reader = segment_reader
.multi_fast_field_reader::<u64>(field)
.unwrap();
{
multi_value_reader.get_vals(2, &mut vals);
assert_eq!(&vals, &[4u64]);
@@ -196,9 +198,9 @@ mod tests {
assert!(index_writer.commit().is_ok());
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);
let reader = searcher.segment_reader(0);
let mut vals = Vec::new();
let multi_value_reader = segment_reader.fast_fields().i64s(field).unwrap();
let multi_value_reader = reader.multi_fast_field_reader::<i64>(field).unwrap();
{
multi_value_reader.get_vals(2, &mut vals);
assert_eq!(&vals, &[-4i64]);

View File

@@ -26,13 +26,6 @@ impl<Item: FastValue> MultiValueIntFastFieldReader<Item> {
}
}
pub(crate) fn into_u64s_reader(self) -> MultiValueIntFastFieldReader<u64> {
MultiValueIntFastFieldReader {
idx_reader: self.idx_reader,
vals_reader: self.vals_reader.into_u64_reader(),
}
}
/// Returns `(start, stop)`, such that the values associated
/// to the given document are `start..stop`.
fn range(&self, doc: DocId) -> (u64, u64) {
@@ -48,24 +41,13 @@ impl<Item: FastValue> MultiValueIntFastFieldReader<Item> {
vals.resize(len, Item::default());
self.vals_reader.get_range_u64(start, &mut vals[..]);
}
/// Returns the number of values associated with the document `DocId`.
pub fn num_vals(&self, doc: DocId) -> usize {
let (start, stop) = self.range(doc);
(stop - start) as usize
}
/// Returns the overall number of values in this field .
pub fn total_num_vals(&self) -> u64 {
self.idx_reader.max_value()
}
}
#[cfg(test)]
mod tests {
use core::Index;
use schema::{Facet, Schema};
use schema::{Document, Facet, Schema};
#[test]
fn test_multifastfield_reader() {
@@ -76,12 +58,22 @@ mod tests {
let mut index_writer = index
.writer_with_num_threads(1, 30_000_000)
.expect("Failed to create index writer.");
index_writer.add_document(doc!(
facet_field => Facet::from("/category/cat2"),
facet_field => Facet::from("/category/cat1"),
));
index_writer.add_document(doc!(facet_field => Facet::from("/category/cat2")));
index_writer.add_document(doc!(facet_field => Facet::from("/category/cat3")));
{
let mut doc = Document::new();
doc.add_facet(facet_field, "/category/cat2");
doc.add_facet(facet_field, "/category/cat1");
index_writer.add_document(doc);
}
{
let mut doc = Document::new();
doc.add_facet(facet_field, "/category/cat2");
index_writer.add_document(doc);
}
{
let mut doc = Document::new();
doc.add_facet(facet_field, "/category/cat3");
index_writer.add_document(doc);
}
index_writer.commit().expect("Commit failed");
let searcher = index.reader().unwrap().searcher();
let segment_reader = searcher.segment_reader(0);

View File

@@ -50,15 +50,6 @@ impl<Item: FastValue> FastFieldReader<Item> {
}
}
pub(crate) fn into_u64_reader(self) -> FastFieldReader<u64> {
FastFieldReader {
bit_unpacker: self.bit_unpacker,
min_value_u64: self.min_value_u64,
max_value_u64: self.max_value_u64,
_phantom: PhantomData,
}
}
/// Return the value associated to the given document.
///
/// This accessor should return as fast as possible.

View File

@@ -1,191 +0,0 @@
use common::CompositeFile;
use fastfield::BytesFastFieldReader;
use fastfield::MultiValueIntFastFieldReader;
use fastfield::{FastFieldNotAvailableError, FastFieldReader};
use schema::{Cardinality, Field, FieldType, Schema};
use space_usage::PerFieldSpaceUsage;
use std::collections::HashMap;
use Result;
/// Provides access to all of the FastFieldReader.
///
/// Internally, `FastFieldReaders` have preloaded fast field readers,
/// and just wraps several `HashMap`.
pub struct FastFieldReaders {
fast_field_i64: HashMap<Field, FastFieldReader<i64>>,
fast_field_u64: HashMap<Field, FastFieldReader<u64>>,
fast_field_i64s: HashMap<Field, MultiValueIntFastFieldReader<i64>>,
fast_field_u64s: HashMap<Field, MultiValueIntFastFieldReader<u64>>,
fast_bytes: HashMap<Field, BytesFastFieldReader>,
fast_fields_composite: CompositeFile,
}
enum FastType {
I64,
U64,
}
fn type_and_cardinality(field_type: &FieldType) -> Option<(FastType, Cardinality)> {
match field_type {
FieldType::U64(options) => options
.get_fastfield_cardinality()
.map(|cardinality| (FastType::U64, cardinality)),
FieldType::I64(options) => options
.get_fastfield_cardinality()
.map(|cardinality| (FastType::I64, cardinality)),
FieldType::HierarchicalFacet => Some((FastType::U64, Cardinality::MultiValues)),
_ => None,
}
}
impl FastFieldReaders {
pub(crate) fn load_all(
schema: &Schema,
fast_fields_composite: &CompositeFile,
) -> Result<FastFieldReaders> {
let mut fast_field_readers = FastFieldReaders {
fast_field_i64: Default::default(),
fast_field_u64: Default::default(),
fast_field_i64s: Default::default(),
fast_field_u64s: Default::default(),
fast_bytes: Default::default(),
fast_fields_composite: fast_fields_composite.clone(),
};
for (field_id, field_entry) in schema.fields().iter().enumerate() {
let field = Field(field_id as u32);
let field_type = field_entry.field_type();
if field_type == &FieldType::Bytes {
let idx_reader = fast_fields_composite
.open_read_with_idx(field, 0)
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))
.map(FastFieldReader::open)?;
let data = fast_fields_composite
.open_read_with_idx(field, 1)
.ok_or_else(|| FastFieldNotAvailableError::new(field_entry))?;
fast_field_readers
.fast_bytes
.insert(field, BytesFastFieldReader::open(idx_reader, data));
} else if let Some((fast_type, cardinality)) = type_and_cardinality(field_type) {
match cardinality {
Cardinality::SingleValue => {
if let Some(fast_field_data) = fast_fields_composite.open_read(field) {
match fast_type {
FastType::U64 => {
let fast_field_reader = FastFieldReader::open(fast_field_data);
fast_field_readers
.fast_field_u64
.insert(field, fast_field_reader);
}
FastType::I64 => {
fast_field_readers.fast_field_i64.insert(
field,
FastFieldReader::open(fast_field_data.clone()),
);
}
}
} else {
return Err(From::from(FastFieldNotAvailableError::new(field_entry)));
}
}
Cardinality::MultiValues => {
let idx_opt = fast_fields_composite.open_read_with_idx(field, 0);
let data_opt = fast_fields_composite.open_read_with_idx(field, 1);
if let (Some(fast_field_idx), Some(fast_field_data)) = (idx_opt, data_opt) {
let idx_reader = FastFieldReader::open(fast_field_idx);
match fast_type {
FastType::I64 => {
let vals_reader = FastFieldReader::open(fast_field_data);
let multivalued_int_fast_field =
MultiValueIntFastFieldReader::open(idx_reader, vals_reader);
fast_field_readers
.fast_field_i64s
.insert(field, multivalued_int_fast_field);
}
FastType::U64 => {
let vals_reader = FastFieldReader::open(fast_field_data);
let multivalued_int_fast_field =
MultiValueIntFastFieldReader::open(idx_reader, vals_reader);
fast_field_readers
.fast_field_u64s
.insert(field, multivalued_int_fast_field);
}
}
} else {
return Err(From::from(FastFieldNotAvailableError::new(field_entry)));
}
}
}
}
}
Ok(fast_field_readers)
}
pub(crate) fn space_usage(&self) -> PerFieldSpaceUsage {
self.fast_fields_composite.space_usage()
}
/// Returns the `u64` fast field reader reader associated to `field`.
///
/// If `field` is not a u64 fast field, this method returns `None`.
pub fn u64(&self, field: Field) -> Option<FastFieldReader<u64>> {
self.fast_field_u64.get(&field).cloned()
}
/// If the field is a u64-fast field return the associated reader.
/// If the field is a i64-fast field, return the associated u64 reader. Values are
/// mapped from i64 to u64 using a (well the, it is unique) monotonic mapping. ///
///
/// This method is useful when merging segment reader.
pub(crate) fn u64_lenient(&self, field: Field) -> Option<FastFieldReader<u64>> {
if let Some(u64_ff_reader) = self.u64(field) {
return Some(u64_ff_reader);
}
if let Some(i64_ff_reader) = self.i64(field) {
return Some(i64_ff_reader.into_u64_reader());
}
None
}
/// Returns the `i64` fast field reader reader associated to `field`.
///
/// If `field` is not a i64 fast field, this method returns `None`.
pub fn i64(&self, field: Field) -> Option<FastFieldReader<i64>> {
self.fast_field_i64.get(&field).cloned()
}
/// Returns a `u64s` multi-valued fast field reader reader associated to `field`.
///
/// If `field` is not a u64 multi-valued fast field, this method returns `None`.
pub fn u64s(&self, field: Field) -> Option<MultiValueIntFastFieldReader<u64>> {
self.fast_field_u64s.get(&field).cloned()
}
/// If the field is a u64s-fast field return the associated reader.
/// If the field is a i64s-fast field, return the associated u64s reader. Values are
/// mapped from i64 to u64 using a (well the, it is unique) monotonic mapping.
///
/// This method is useful when merging segment reader.
pub(crate) fn u64s_lenient(&self, field: Field) -> Option<MultiValueIntFastFieldReader<u64>> {
if let Some(u64s_ff_reader) = self.u64s(field) {
return Some(u64s_ff_reader);
}
if let Some(i64s_ff_reader) = self.i64s(field) {
return Some(i64s_ff_reader.into_u64s_reader());
}
None
}
/// Returns a `i64s` multi-valued fast field reader reader associated to `field`.
///
/// If `field` is not a i64 multi-valued fast field, this method returns `None`.
pub fn i64s(&self, field: Field) -> Option<MultiValueIntFastFieldReader<i64>> {
self.fast_field_i64s.get(&field).cloned()
}
/// Returns the `bytes` fast field reader associated to `field`.
///
/// If `field` is not a bytes fast field, returns `None`.
pub fn bytes(&self, field: Field) -> Option<BytesFastFieldReader> {
self.fast_bytes.get(&field).cloned()
}
}

View File

@@ -2,7 +2,6 @@ use super::operation::DeleteOperation;
use std::mem;
use std::ops::DerefMut;
use std::sync::{Arc, RwLock};
use Opstamp;
// The DeleteQueue is similar in conceptually to a multiple
// consumer single producer broadcast channel.
@@ -180,12 +179,17 @@ pub struct DeleteCursor {
}
impl DeleteCursor {
pub fn empty() -> DeleteCursor {
DeleteQueue::new().cursor()
}
/// Skips operations and position it so that
/// - either all of the delete operation currently in the
/// queue are consume and the next get will return None.
/// - the next get will return the first operation with an
/// `opstamp >= target_opstamp`.
pub fn skip_to(&mut self, target_opstamp: Opstamp) {
pub fn skip_to(&mut self, target_opstamp: u64) {
// TODO Can be optimize as we work with block.
while self.is_behind_opstamp(target_opstamp) {
self.advance();
@@ -193,7 +197,7 @@ impl DeleteCursor {
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::wrong_self_convention))]
fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool {
fn is_behind_opstamp(&mut self, target_opstamp: u64) -> bool {
self.get()
.map(|operation| operation.opstamp < target_opstamp)
.unwrap_or(false)

View File

@@ -1,6 +1,5 @@
use std::sync::Arc;
use DocId;
use Opstamp;
// Doc to opstamp is used to identify which
// document should be deleted.
@@ -24,7 +23,7 @@ pub enum DocToOpstampMapping {
}
impl From<Vec<u64>> for DocToOpstampMapping {
fn from(opstamps: Vec<Opstamp>) -> DocToOpstampMapping {
fn from(opstamps: Vec<u64>) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(Arc::new(opstamps))
}
}
@@ -36,7 +35,7 @@ impl DocToOpstampMapping {
//
// The edge case opstamp = some doc opstamp is in practise
// never called.
pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId {
pub fn compute_doc_limit(&self, target_opstamp: u64) -> DocId {
match *self {
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
match doc_opstamps.binary_search(&target_opstamp) {

View File

@@ -30,7 +30,6 @@ use std::ops::Range;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use Opstamp;
use Result;
// Size of the margin for the heap. A segment is closed when the remaining memory
@@ -100,7 +99,7 @@ pub struct IndexWriter {
delete_queue: DeleteQueue,
stamper: Stamper,
committed_opstamp: Opstamp,
committed_opstamp: u64,
}
/// Open a new index writer. Attempts to acquire a lockfile.
@@ -178,7 +177,7 @@ pub fn compute_deleted_bitset(
segment_reader: &SegmentReader,
delete_cursor: &mut DeleteCursor,
doc_opstamps: &DocToOpstampMapping,
target_opstamp: Opstamp,
target_opstamp: u64,
) -> Result<bool> {
let mut might_have_changed = false;
@@ -220,7 +219,7 @@ pub fn compute_deleted_bitset(
pub fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp,
target_opstamp: u64,
) -> Result<()> {
{
if segment_entry.meta().delete_opstamp() == Some(target_opstamp) {
@@ -260,7 +259,7 @@ pub fn advance_deletes(
write_delete_bitset(&delete_bitset, &mut delete_file)?;
}
}
segment_entry.set_meta(segment.meta().clone());
segment_entry.set_meta(target_opstamp, segment.meta().clone());
Ok(())
}
@@ -300,11 +299,11 @@ fn index_documents(
// the worker thread.
assert!(num_docs > 0);
let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?;
let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
let segment_meta = SegmentMeta::new(segment_id, num_docs);
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
let delete_bitset_opt = if delete_cursor.get().is_some() {
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
@@ -327,7 +326,12 @@ fn index_documents(
// to even open the segment.
None
};
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, delete_bitset_opt);
let segment_entry = SegmentEntry::new(
segment_meta,
delete_cursor,
delete_bitset_opt,
last_docstamp,
);
Ok(segment_updater.add_segment(generation, segment_entry))
}
@@ -362,9 +366,9 @@ impl IndexWriter {
}
#[doc(hidden)]
pub fn add_segment(&mut self, segment_meta: SegmentMeta) {
pub fn add_segment(&mut self, segment_meta: SegmentMeta, opstamp: u64) {
let delete_cursor = self.delete_queue.cursor();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None);
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor, None, opstamp);
self.segment_updater
.add_segment(self.generation, segment_entry);
}
@@ -495,7 +499,7 @@ impl IndexWriter {
/// state as it was after the last commit.
///
/// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<Opstamp> {
pub fn rollback(&mut self) -> Result<()> {
info!("Rolling back to opstamp {}", self.committed_opstamp);
// marks the segment updater as killed. From now on, all
@@ -528,9 +532,9 @@ impl IndexWriter {
//
// This will reach an end as the only document_sender
// was dropped with the index_writer.
for _ in document_receiver.clone() {}
for _ in document_receiver.iter() {}
Ok(self.committed_opstamp)
Ok(())
}
/// Prepares a commit.
@@ -555,6 +559,16 @@ impl IndexWriter {
/// using this API.
/// See [`PreparedCommit::set_payload()`](PreparedCommit.html)
pub fn prepare_commit(&mut self) -> Result<PreparedCommit> {
info!("Preparing commit");
self.prepare_commit_internal(false)
}
pub fn prepare_commit_soft(&mut self) -> Result<PreparedCommit> {
info!("Preparing soft commit");
self.prepare_commit_internal(true)
}
pub(crate) fn prepare_commit_internal(&mut self, soft: bool) -> Result<PreparedCommit> {
// Here, because we join all of the worker threads,
// all of the segment update for this commit have been
// sent.
@@ -568,7 +582,7 @@ impl IndexWriter {
info!("Preparing commit");
// this will drop the current document channel
// and recreate a new one.
// and recreate a new one channels.
self.recreate_document_channel();
let former_workers_join_handle = mem::replace(&mut self.workers_join_handle, Vec::new());
@@ -577,13 +591,13 @@ impl IndexWriter {
let indexing_worker_result = worker_handle
.join()
.map_err(|e| TantivyError::ErrorInThread(format!("{:?}", e)))?;
indexing_worker_result?;
// add a new worker for the next generation.
// add a new worker for the next generation, whether the worker failed or not.
self.add_indexing_worker()?;
indexing_worker_result?;
}
let commit_opstamp = self.stamper.stamp();
let prepared_commit = PreparedCommit::new(self, commit_opstamp);
let prepared_commit = PreparedCommit::new(self, commit_opstamp, soft);
info!("Prepared commit {}", commit_opstamp);
Ok(prepared_commit)
}
@@ -602,10 +616,15 @@ impl IndexWriter {
/// Commit returns the `opstamp` of the last document
/// that made it in the commit.
///
pub fn commit(&mut self) -> Result<Opstamp> {
pub fn commit(&mut self) -> Result<u64> {
self.prepare_commit()?.commit()
}
pub fn soft_commit(&mut self) -> Result<u64> {
self.prepare_commit_soft()?.commit()
}
pub(crate) fn segment_updater(&self) -> &SegmentUpdater {
&self.segment_updater
}
@@ -618,7 +637,7 @@ impl IndexWriter {
///
/// Like adds, the deletion itself will be visible
/// only after calling `commit()`.
pub fn delete_term(&self, term: Term) -> Opstamp {
pub fn delete_term(&mut self, term: Term) -> u64 {
let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation { opstamp, term };
self.delete_queue.push(delete_operation);
@@ -632,7 +651,7 @@ impl IndexWriter {
///
/// This is also the opstamp of the commit that is currently
/// available for searchers.
pub fn commit_opstamp(&self) -> Opstamp {
pub fn commit_opstamp(&self) -> u64 {
self.committed_opstamp
}
@@ -646,7 +665,7 @@ impl IndexWriter {
///
/// Currently it represents the number of documents that
/// have been added since the creation of the index.
pub fn add_document(&self, document: Document) -> Opstamp {
pub fn add_document(&mut self, document: Document) -> u64 {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document };
let send_result = self.operation_sender.send(vec![add_operation]);
@@ -663,7 +682,7 @@ impl IndexWriter {
/// The total number of stamps generated by this method is `count + 1`;
/// each operation gets a stamp from the `stamps` iterator and `last_opstamp`
/// is for the batch itself.
fn get_batch_opstamps(&self, count: Opstamp) -> (Opstamp, Range<Opstamp>) {
fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range<u64>) {
let Range { start, end } = self.stamper.stamps(count + 1u64);
let last_opstamp = end - 1;
let stamps = Range {
@@ -689,7 +708,7 @@ impl IndexWriter {
/// Like adds and deletes (see `IndexWriter.add_document` and
/// `IndexWriter.delete_term`), the changes made by calling `run` will be
/// visible to readers only after calling `commit()`.
pub fn run(&self, user_operations: Vec<UserOperation>) -> Opstamp {
pub fn run(&mut self, user_operations: Vec<UserOperation>) -> u64 {
let count = user_operations.len() as u64;
if count == 0 {
return self.stamper.stamp();
@@ -733,6 +752,7 @@ mod tests {
use Index;
use ReloadPolicy;
use Term;
use IndexReader;
#[test]
fn test_operations_group() {
@@ -740,7 +760,7 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let operations = vec![
UserOperation::Add(doc!(text_field=>"a")),
UserOperation::Add(doc!(text_field=>"b")),
@@ -802,7 +822,7 @@ mod tests {
fn test_empty_operations_group() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(3_000_000).unwrap();
let mut index_writer = index.writer(3_000_000).unwrap();
let operations1 = vec![];
let batch_opstamp1 = index_writer.run(operations1);
assert_eq!(batch_opstamp1, 0u64);
@@ -866,6 +886,13 @@ mod tests {
let _index_writer_two = index.writer(3_000_000).unwrap();
}
fn num_docs_containing_text(reader: &IndexReader, term: &str) -> u64 {
let searcher = reader.searcher();
let text_field = reader.schema().get_field("text").unwrap();
let term = Term::from_field_text(text_field, term);
searcher.doc_freq(&term)
}
#[test]
fn test_commit_and_rollback() {
let mut schema_builder = schema::Schema::builder();
@@ -882,9 +909,12 @@ mod tests {
searcher.doc_freq(&term)
};
let mut index_writer = index.writer(3_000_000).unwrap();
assert_eq!(index_writer.commit_opstamp(), 0u64);
assert_eq!(num_docs_containing_text(&reader, "a"), 0);
{
// writing the segment
let mut index_writer = index.writer(3_000_000).unwrap();
index_writer.add_document(doc!(text_field=>"a"));
index_writer.rollback().unwrap();
assert_eq!(index_writer.commit_opstamp(), 0u64);
@@ -903,6 +933,35 @@ mod tests {
reader.searcher();
}
#[test]
fn test_softcommit_and_rollback() {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let reader = index.reader().unwrap();
// writing the segment
let mut index_writer = index.writer(3_000_000).unwrap();
index_writer.add_document(doc!(text_field=>"a"));
index_writer.rollback().unwrap();
assert_eq!(index_writer.commit_opstamp(), 0u64);
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
{
index_writer.add_document(doc!(text_field=>"b"));
index_writer.add_document(doc!(text_field=>"c"));
}
assert!(index_writer.soft_commit().is_ok());
reader.reload().unwrap(); // we need to load soft committed stuff.
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
assert_eq!(num_docs_containing_text(&reader, "b"), 1u64);
assert_eq!(num_docs_containing_text(&reader, "c"), 1u64);
index_writer.rollback().unwrap();
reader.reload().unwrap();
assert_eq!(num_docs_containing_text(&reader, "a"), 0u64);
assert_eq!(num_docs_containing_text(&reader, "b"), 0u64);
assert_eq!(num_docs_containing_text(&reader, "c"), 0u64);
}
#[test]
fn test_with_merges() {
let mut schema_builder = schema::Schema::builder();
@@ -936,7 +995,7 @@ mod tests {
reader.reload().unwrap();
assert_eq!(num_docs_containing("a"), 200);
assert_eq!(num_docs_containing_text(&reader, "a"), 200);
assert!(index.searchable_segments().unwrap().len() < 8);
}
}
@@ -979,7 +1038,7 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let reader = index.reader();
{
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();

View File

@@ -52,7 +52,7 @@ impl MergePolicy for LogMergePolicy {
let mut size_sorted_tuples = segments
.iter()
.map(SegmentMeta::num_docs)
.map(|x| x.num_docs())
.enumerate()
.collect::<Vec<(usize, u32)>>();

View File

@@ -1,6 +1,5 @@
use census::{Inventory, TrackedObject};
use std::collections::HashSet;
use Opstamp;
use SegmentId;
#[derive(Default)]
@@ -18,8 +17,8 @@ impl MergeOperationInventory {
}
}
/// A `MergeOperation` has two roles.
/// It carries all of the information required to describe a merge:
/// A `MergeOperation` has two role.
/// It carries all of the information required to describe a merge :
/// - `target_opstamp` is the opstamp up to which we want to consume the
/// delete queue and reflect their deletes.
/// - `segment_ids` is the list of segment to be merged.
@@ -36,14 +35,14 @@ pub struct MergeOperation {
}
struct InnerMergeOperation {
target_opstamp: Opstamp,
target_opstamp: u64,
segment_ids: Vec<SegmentId>,
}
impl MergeOperation {
pub fn new(
inventory: &MergeOperationInventory,
target_opstamp: Opstamp,
target_opstamp: u64,
segment_ids: Vec<SegmentId>,
) -> MergeOperation {
let inner_merge_operation = InnerMergeOperation {
@@ -55,7 +54,7 @@ impl MergeOperation {
}
}
pub fn target_opstamp(&self) -> Opstamp {
pub fn target_opstamp(&self) -> u64 {
self.inner.target_opstamp
}

View File

@@ -3,7 +3,6 @@ use core::Segment;
use core::SegmentReader;
use core::SerializableSegment;
use docset::DocSet;
use fastfield::BytesFastFieldReader;
use fastfield::DeleteBitSet;
use fastfield::FastFieldReader;
use fastfield::FastFieldSerializer;
@@ -73,7 +72,7 @@ fn compute_min_max_val(
// some deleted documents,
// we need to recompute the max / min
(0..max_doc)
.filter(|doc_id| delete_bitset.is_alive(*doc_id))
.filter(|doc_id| !delete_bitset.is_deleted(*doc_id))
.map(|doc_id| u64_reader.get(doc_id))
.minmax()
.into_option()
@@ -240,10 +239,7 @@ impl IndexMerger {
let mut max_value = u64::min_value();
for reader in &self.readers {
let u64_reader: FastFieldReader<u64> = reader
.fast_fields()
.u64_lenient(field)
.expect("Failed to find a reader for single fast field. This is a tantivy bug and it should never happen.");
let u64_reader: FastFieldReader<u64> = reader.fast_field_reader(field)?;
if let Some((seg_min_val, seg_max_val)) =
compute_min_max_val(&u64_reader, reader.max_doc(), reader.delete_bitset())
{
@@ -286,28 +282,24 @@ impl IndexMerger {
fast_field_serializer: &mut FastFieldSerializer,
) -> Result<()> {
let mut total_num_vals = 0u64;
let mut u64s_readers: Vec<MultiValueIntFastFieldReader<u64>> = Vec::new();
// In the first pass, we compute the total number of vals.
//
// This is required by the bitpacker, as it needs to know
// what should be the bit length use for bitpacking.
for reader in &self.readers {
let u64s_reader = reader.fast_fields()
.u64s_lenient(field)
.expect("Failed to find index for multivalued field. This is a bug in tantivy, please report.");
let idx_reader = reader.fast_field_reader_with_idx::<u64>(field, 0)?;
if let Some(delete_bitset) = reader.delete_bitset() {
for doc in 0u32..reader.max_doc() {
if delete_bitset.is_alive(doc) {
let num_vals = u64s_reader.num_vals(doc) as u64;
total_num_vals += num_vals;
if !delete_bitset.is_deleted(doc) {
let start = idx_reader.get(doc);
let end = idx_reader.get(doc + 1);
total_num_vals += end - start;
}
}
} else {
total_num_vals += u64s_reader.total_num_vals();
total_num_vals += idx_reader.max_value();
}
u64s_readers.push(u64s_reader);
}
// We can now create our `idx` serializer, and in a second pass,
@@ -315,10 +307,13 @@ impl IndexMerger {
let mut serialize_idx =
fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?;
let mut idx = 0;
for (segment_reader, u64s_reader) in self.readers.iter().zip(&u64s_readers) {
for doc in segment_reader.doc_ids_alive() {
for reader in &self.readers {
let idx_reader = reader.fast_field_reader_with_idx::<u64>(field, 0)?;
for doc in reader.doc_ids_alive() {
serialize_idx.add_val(idx)?;
idx += u64s_reader.num_vals(doc) as u64;
let start = idx_reader.get(doc);
let end = idx_reader.get(doc + 1);
idx += end - start;
}
}
serialize_idx.add_val(idx)?;
@@ -349,10 +344,8 @@ impl IndexMerger {
for (segment_ord, segment_reader) in self.readers.iter().enumerate() {
let term_ordinal_mapping: &[TermOrdinal] =
term_ordinal_mappings.get_segment(segment_ord);
let ff_reader: MultiValueIntFastFieldReader<u64> = segment_reader
.fast_fields()
.u64s(field)
.expect("Could not find multivalued u64 fast value reader.");
let ff_reader: MultiValueIntFastFieldReader<u64> =
segment_reader.multi_fast_field_reader(field)?;
// TODO optimize if no deletes
for doc in segment_reader.doc_ids_alive() {
ff_reader.get_vals(doc, &mut vals);
@@ -384,8 +377,6 @@ impl IndexMerger {
let mut vals = Vec::with_capacity(100);
let mut ff_readers = Vec::new();
// Our values are bitpacked and we need to know what should be
// our bitwidth and our minimum value before serializing any values.
//
@@ -394,10 +385,7 @@ impl IndexMerger {
// maximum value and initialize our Serializer.
for reader in &self.readers {
let ff_reader: MultiValueIntFastFieldReader<u64> =
reader.fast_fields().u64s_lenient(field).expect(
"Failed to find multivalued fast field reader. This is a bug in \
tantivy. Please report.",
);
reader.multi_fast_field_reader(field)?;
for doc in reader.doc_ids_alive() {
ff_reader.get_vals(doc, &mut vals);
for &val in &vals {
@@ -405,7 +393,6 @@ impl IndexMerger {
max_value = cmp::max(val, max_value);
}
}
ff_readers.push(ff_reader);
// TODO optimize when no deletes
}
@@ -418,7 +405,9 @@ impl IndexMerger {
{
let mut serialize_vals = fast_field_serializer
.new_u64_fast_field_with_idx(field, min_value, max_value, 1)?;
for (reader, ff_reader) in self.readers.iter().zip(ff_readers) {
for reader in &self.readers {
let ff_reader: MultiValueIntFastFieldReader<u64> =
reader.multi_fast_field_reader(field)?;
// TODO optimize if no deletes
for doc in reader.doc_ids_alive() {
ff_reader.get_vals(doc, &mut vals);
@@ -437,53 +426,19 @@ impl IndexMerger {
field: Field,
fast_field_serializer: &mut FastFieldSerializer,
) -> Result<()> {
let mut total_num_vals = 0u64;
let mut bytes_readers: Vec<BytesFastFieldReader> = Vec::new();
for reader in &self.readers {
let bytes_reader = reader.fast_fields().bytes(field).expect(
"Failed to find bytes fast field reader. This is a bug in tantivy, please report.",
);
if let Some(delete_bitset) = reader.delete_bitset() {
for doc in 0u32..reader.max_doc() {
if delete_bitset.is_alive(doc) {
let num_vals = bytes_reader.get_bytes(doc).len() as u64;
total_num_vals += num_vals;
}
}
} else {
total_num_vals += bytes_reader.total_num_bytes() as u64;
}
bytes_readers.push(bytes_reader);
}
{
// We can now create our `idx` serializer, and in a second pass,
// can effectively push the different indexes.
let mut serialize_idx =
fast_field_serializer.new_u64_fast_field_with_idx(field, 0, total_num_vals, 0)?;
let mut idx = 0;
for (segment_reader, bytes_reader) in self.readers.iter().zip(&bytes_readers) {
for doc in segment_reader.doc_ids_alive() {
serialize_idx.add_val(idx)?;
idx += bytes_reader.get_bytes(doc).len() as u64;
}
}
serialize_idx.add_val(idx)?;
serialize_idx.close_field()?;
}
self.write_fast_field_idx(field, fast_field_serializer)?;
let mut serialize_vals = fast_field_serializer.new_bytes_fast_field_with_idx(field, 1)?;
for segment_reader in &self.readers {
let bytes_reader = segment_reader.fast_fields().bytes(field)
.expect("Failed to find bytes field in fast field reader. This is a bug in tantivy. Please report.");
for reader in &self.readers {
let bytes_reader = reader.bytes_fast_field_reader(field)?;
// TODO: optimize if no deletes
for doc in segment_reader.doc_ids_alive() {
let val = bytes_reader.get_bytes(doc);
for doc in reader.doc_ids_alive() {
let val = bytes_reader.get_val(doc);
serialize_vals.write_all(val)?;
}
}
serialize_vals.flush()?;
Ok(())
}
@@ -1024,16 +979,14 @@ mod tests {
let score_field_reader = searcher
.segment_reader(0)
.fast_fields()
.u64(score_field)
.fast_field_reader::<u64>(score_field)
.unwrap();
assert_eq!(score_field_reader.min_value(), 4000);
assert_eq!(score_field_reader.max_value(), 7000);
let score_field_reader = searcher
.segment_reader(1)
.fast_fields()
.u64(score_field)
.fast_field_reader::<u64>(score_field)
.unwrap();
assert_eq!(score_field_reader.min_value(), 1);
assert_eq!(score_field_reader.max_value(), 3);
@@ -1084,8 +1037,7 @@ mod tests {
);
let score_field_reader = searcher
.segment_reader(0)
.fast_fields()
.u64(score_field)
.fast_field_reader::<u64>(score_field)
.unwrap();
assert_eq!(score_field_reader.min_value(), 3);
assert_eq!(score_field_reader.max_value(), 7000);
@@ -1131,8 +1083,7 @@ mod tests {
);
let score_field_reader = searcher
.segment_reader(0)
.fast_fields()
.u64(score_field)
.fast_field_reader::<u64>(score_field)
.unwrap();
assert_eq!(score_field_reader.min_value(), 3);
assert_eq!(score_field_reader.max_value(), 7000);
@@ -1184,8 +1135,7 @@ mod tests {
);
let score_field_reader = searcher
.segment_reader(0)
.fast_fields()
.u64(score_field)
.fast_field_reader::<u64>(score_field)
.unwrap();
assert_eq!(score_field_reader.min_value(), 6000);
assert_eq!(score_field_reader.max_value(), 7000);
@@ -1431,7 +1381,7 @@ mod tests {
{
let segment = searcher.segment_reader(0u32);
let ff_reader = segment.fast_fields().u64s(int_field).unwrap();
let ff_reader = segment.multi_fast_field_reader(int_field).unwrap();
ff_reader.get_vals(0, &mut vals);
assert_eq!(&vals, &[1, 2]);
@@ -1466,7 +1416,7 @@ mod tests {
{
let segment = searcher.segment_reader(1u32);
let ff_reader = segment.fast_fields().u64s(int_field).unwrap();
let ff_reader = segment.multi_fast_field_reader(int_field).unwrap();
ff_reader.get_vals(0, &mut vals);
assert_eq!(&vals, &[28, 27]);
@@ -1476,7 +1426,7 @@ mod tests {
{
let segment = searcher.segment_reader(2u32);
let ff_reader = segment.fast_fields().u64s(int_field).unwrap();
let ff_reader = segment.multi_fast_field_reader(int_field).unwrap();
ff_reader.get_vals(0, &mut vals);
assert_eq!(&vals, &[20]);
}
@@ -1509,7 +1459,7 @@ mod tests {
.collect::<Vec<_>>()
);
let segment = searcher.segment_reader(0u32);
let ff_reader = segment.fast_fields().u64s(int_field).unwrap();
let ff_reader = segment.multi_fast_field_reader(int_field).unwrap();
ff_reader.get_vals(0, &mut vals);
assert_eq!(&vals, &[1, 2]);

View File

@@ -1,18 +1,17 @@
use schema::Document;
use schema::Term;
use Opstamp;
/// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct DeleteOperation {
pub opstamp: Opstamp,
pub opstamp: u64,
pub term: Term,
}
/// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)]
pub struct AddOperation {
pub opstamp: Opstamp,
pub opstamp: u64,
pub document: Document,
}

View File

@@ -1,24 +1,29 @@
use super::IndexWriter;
use Opstamp;
use Result;
/// A prepared commit
pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter,
payload: Option<String>,
opstamp: Opstamp,
opstamp: u64,
soft: bool,
}
impl<'a> PreparedCommit<'a> {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp) -> PreparedCommit {
pub(crate) fn new(
index_writer: &'a mut IndexWriter,
opstamp: u64,
soft: bool,
) -> PreparedCommit {
PreparedCommit {
index_writer,
payload: None,
opstamp,
soft,
}
}
pub fn opstamp(&self) -> Opstamp {
pub fn opstamp(&self) -> u64 {
self.opstamp
}
@@ -26,15 +31,15 @@ impl<'a> PreparedCommit<'a> {
self.payload = Some(payload.to_string())
}
pub fn abort(self) -> Result<Opstamp> {
pub fn abort(self) -> Result<()> {
self.index_writer.rollback()
}
pub fn commit(self) -> Result<Opstamp> {
pub fn commit(self) -> Result<u64> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()
.commit(self.opstamp, self.payload)?;
.commit(self.opstamp, self.payload, self.soft)?;
Ok(self.opstamp)
}
}

View File

@@ -22,6 +22,7 @@ pub struct SegmentEntry {
meta: SegmentMeta,
delete_bitset: Option<BitSet>,
delete_cursor: DeleteCursor,
opstamp: u64,
}
impl SegmentEntry {
@@ -30,14 +31,20 @@ impl SegmentEntry {
segment_meta: SegmentMeta,
delete_cursor: DeleteCursor,
delete_bitset: Option<BitSet>,
opstamp: u64,
) -> SegmentEntry {
SegmentEntry {
meta: segment_meta,
delete_bitset,
delete_cursor,
opstamp,
}
}
pub fn opstamp(&self) -> u64 {
self.opstamp
}
/// Return a reference to the segment entry deleted bitset.
///
/// `DocId` in this bitset are flagged as deleted.
@@ -46,7 +53,8 @@ impl SegmentEntry {
}
/// Set the `SegmentMeta` for this segment.
pub fn set_meta(&mut self, segment_meta: SegmentMeta) {
pub fn set_meta(&mut self, opstamp: u64, segment_meta: SegmentMeta) {
self.opstamp = opstamp;
self.meta = segment_meta;
}

View File

@@ -11,11 +11,47 @@ use std::path::PathBuf;
use std::sync::RwLock;
use std::sync::{RwLockReadGuard, RwLockWriteGuard};
use Result as TantivyResult;
use std::sync::Arc;
use std::collections::HashMap;
/// Provides a read-only view of the available segments.
#[derive(Clone)]
pub struct AvailableSegments {
registers: Arc<RwLock<SegmentRegisters>>,
}
impl AvailableSegments {
pub fn committed(&self) -> Vec<SegmentMeta> {
self.registers
.read()
.unwrap()
.committed
.segment_metas()
}
pub fn soft_committed(&self) -> Vec<SegmentMeta> {
self.registers
.read()
.unwrap()
.soft_committed
.segment_metas()
}
}
#[derive(Default)]
struct SegmentRegisters {
uncommitted: SegmentRegister,
uncommitted: HashMap<SegmentId, SegmentEntry>,
committed: SegmentRegister,
/// soft commits can advance committed segment to a future delete
/// opstamp.
///
/// In that case the same `SegmentId` can appear in both `committed`
/// and in `committed_in_the_future`.
///
/// We do not consider these segments for merges.
soft_committed: SegmentRegister,
/// `DeleteCursor`, positionned on the soft commit.
delete_cursor: DeleteCursor,
}
/// The segment manager stores the list of segments
@@ -23,9 +59,8 @@ struct SegmentRegisters {
///
/// It guarantees the atomicity of the
/// changes (merges especially)
#[derive(Default)]
pub struct SegmentManager {
registers: RwLock<SegmentRegisters>,
registers: Arc<RwLock<SegmentRegisters>>
}
impl Debug for SegmentManager {
@@ -46,11 +81,17 @@ pub fn get_mergeable_segments(
let registers_lock = segment_manager.read();
(
registers_lock
.committed
.soft_committed
.get_mergeable_segments(in_merge_segment_ids),
registers_lock
.uncommitted
.get_mergeable_segments(in_merge_segment_ids),
.values()
.map(|segment_entry| segment_entry.meta())
.filter(|segment_meta| {
!in_merge_segment_ids.contains(&segment_meta.id())
})
.cloned()
.collect::<Vec<_>>()
)
}
@@ -58,21 +99,22 @@ impl SegmentManager {
pub fn from_segments(
segment_metas: Vec<SegmentMeta>,
delete_cursor: &DeleteCursor,
opstamp: u64,
) -> SegmentManager {
SegmentManager {
registers: RwLock::new(SegmentRegisters {
uncommitted: SegmentRegister::default(),
committed: SegmentRegister::new(segment_metas, delete_cursor),
}),
registers: Arc::new(RwLock::new(SegmentRegisters {
uncommitted: HashMap::default(),
committed: SegmentRegister::new(segment_metas.clone(), opstamp),
soft_committed: SegmentRegister::new(segment_metas, opstamp),
delete_cursor: delete_cursor.clone(),
}))
}
}
/// Returns all of the segment entries (committed or uncommitted)
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
let registers_lock = self.read();
let mut segment_entries = registers_lock.uncommitted.segment_entries();
segment_entries.extend(registers_lock.committed.segment_entries());
segment_entries
pub fn available_segments_view(&self) -> AvailableSegments {
AvailableSegments {
registers: self.registers.clone()
}
}
/// List the files that are useful to the index.
@@ -108,44 +150,76 @@ impl SegmentManager {
let mut registers_lock = self.write();
registers_lock
.committed
.segment_entries()
.segment_metas()
.iter()
.filter(|segment| segment.meta().num_docs() == 0)
.for_each(|segment| {
.filter(|segment_meta| segment_meta.num_docs() == 0)
.for_each(|segment_meta| {
registers_lock
.committed
.remove_segment(&segment.segment_id())
.remove_segment(&segment_meta.id())
});
registers_lock
.soft_committed
.segment_metas()
.iter()
.filter(|segment_meta| segment_meta.num_docs() == 0)
.for_each(|segment_meta| {
registers_lock
.committed
.remove_segment(&segment_meta.id())
});
}
pub fn commit(&self, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
registers_lock.committed.clear();
registers_lock.uncommitted.clear();
for segment_entry in segment_entries {
registers_lock.committed.add_segment_entry(segment_entry);
}
/// Returns all of the segment entries (soft committed or uncommitted)
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
let registers_lock = self.read();
let mut segment_entries: Vec<SegmentEntry > = registers_lock.uncommitted.values().cloned().collect();
segment_entries.extend(registers_lock.soft_committed.segment_entries(&registers_lock.delete_cursor).into_iter());
segment_entries
}
/// Marks a list of segments as in merge.
pub fn commit(&self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
registers_lock.uncommitted.clear();
registers_lock
.committed
.set_commit(opstamp, segment_entries.clone());
registers_lock
.soft_committed
.set_commit(opstamp, segment_entries);
registers_lock.delete_cursor.skip_to(opstamp);
}
pub fn soft_commit(&self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
let mut registers_lock = self.write();
registers_lock.uncommitted.clear();
registers_lock
.soft_committed
.set_commit(opstamp, segment_entries);
registers_lock.delete_cursor.skip_to(opstamp);
}
/// Gets the list of segment_entries associated to a list of `segment_ids`.
/// This method is used when starting a merge operations.
///
/// Returns an error if some segments are missing, or if
/// the `segment_ids` are not either all committed or all
/// the `segment_ids` are not either all soft_committed or all
/// uncommitted.
pub fn start_merge(&self, segment_ids: &[SegmentId]) -> TantivyResult<Vec<SegmentEntry>> {
let registers_lock = self.read();
let mut segment_entries = vec![];
if registers_lock.uncommitted.contains_all(segment_ids) {
if segment_ids.iter().all(|segment_id| registers_lock.uncommitted.contains_key(segment_id)) {
for segment_id in segment_ids {
let segment_entry = registers_lock.uncommitted
.get(segment_id)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
segment_entries.push(segment_entry.clone());
}
} else if registers_lock.committed.contains_all(segment_ids) {
} else if registers_lock.soft_committed.contains_all(segment_ids) {
for segment_id in segment_ids {
let segment_entry = registers_lock.committed
.get(segment_id)
let segment_entry = registers_lock.soft_committed
.get(segment_id, &registers_lock.delete_cursor)
.expect("Segment id not found {}. Should never happen because of the contains all if-block.");
segment_entries.push(segment_entry);
}
@@ -160,35 +234,32 @@ impl SegmentManager {
pub fn add_segment(&self, segment_entry: SegmentEntry) {
let mut registers_lock = self.write();
registers_lock.uncommitted.add_segment_entry(segment_entry);
registers_lock
.uncommitted
.insert(segment_entry.segment_id(), segment_entry);
}
pub fn end_merge(
&self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_entry: SegmentEntry,
after_merge_segment_entry: SegmentEntry
) {
let mut registers_lock = self.write();
let target_register: &mut SegmentRegister = {
if registers_lock
if before_merge_segment_ids.iter().all(|seg_id|
registers_lock
.uncommitted
.contains_all(before_merge_segment_ids)
{
&mut registers_lock.uncommitted
} else if registers_lock
.committed
.contains_all(before_merge_segment_ids)
{
&mut registers_lock.committed
} else {
warn!("couldn't find segment in SegmentManager");
return;
.contains_key(seg_id))
{
for segment_id in before_merge_segment_ids {
registers_lock.uncommitted.remove(&segment_id);
}
};
for segment_id in before_merge_segment_ids {
target_register.remove_segment(segment_id);
registers_lock.uncommitted.insert(after_merge_segment_entry.segment_id(),
after_merge_segment_entry);
} else {
registers_lock.committed.receive_merge(&before_merge_segment_ids, &after_merge_segment_entry);
registers_lock.soft_committed.receive_merge(&before_merge_segment_ids, &after_merge_segment_entry)
}
target_register.add_segment_entry(after_merge_segment_entry);
}
pub fn committed_segment_metas(&self) -> Vec<SegmentMeta> {

View File

@@ -16,7 +16,8 @@ use std::fmt::{self, Debug, Formatter};
/// merge candidates.
#[derive(Default)]
pub struct SegmentRegister {
segment_states: HashMap<SegmentId, SegmentEntry>,
segment_states: HashMap<SegmentId, SegmentMeta>,
opstamp_constraint: u64,
}
impl Debug for SegmentRegister {
@@ -41,23 +42,28 @@ impl SegmentRegister {
) -> Vec<SegmentMeta> {
self.segment_states
.values()
.filter(|segment_entry| !in_merge_segment_ids.contains(&segment_entry.segment_id()))
.map(|segment_entry| segment_entry.meta().clone())
.filter(|segment_meta| !in_merge_segment_ids.contains(&segment_meta.id()))
.cloned()
.collect()
}
pub fn segment_entries(&self) -> Vec<SegmentEntry> {
self.segment_states.values().cloned().collect()
}
pub fn segment_metas(&self) -> Vec<SegmentMeta> {
let mut segment_ids: Vec<SegmentMeta> = self
let mut segment_metas: Vec<SegmentMeta> = self
.segment_states
.values()
.map(|segment_entry| segment_entry.meta().clone())
.cloned()
.collect();
segment_ids.sort_by_key(SegmentMeta::id);
segment_ids
segment_metas.sort_by_key(|meta| meta.id());
segment_metas
}
pub fn segment_entries(&self, delete_cursor: &DeleteCursor) -> Vec<SegmentEntry> {
self.segment_states
.values()
.map(|segment_meta| {
SegmentEntry::new(segment_meta.clone(), delete_cursor.clone(), None, self.opstamp_constraint)
})
.collect()
}
pub fn contains_all(&self, segment_ids: &[SegmentId]) -> bool {
@@ -66,27 +72,77 @@ impl SegmentRegister {
.all(|segment_id| self.segment_states.contains_key(segment_id))
}
pub fn add_segment_entry(&mut self, segment_entry: SegmentEntry) {
pub fn receive_merge(&mut self,
before_merge_segment_ids: &[SegmentId],
after_merge_segment_entry: &SegmentEntry) {
if after_merge_segment_entry.opstamp() != self.opstamp_constraint {
return;
}
if !self.contains_all(before_merge_segment_ids) {
return;
}
for segment_id in before_merge_segment_ids {
self.segment_states.remove(segment_id);
}
self.register_segment_entry(after_merge_segment_entry.clone());
}
/// Registers a `SegmentEntry`.
///
/// If a segment entry associated to this `SegmentId` is already there,
/// override it with the new `SegmentEntry`.
pub fn register_segment_entry(&mut self, segment_entry: SegmentEntry) {
if self.opstamp_constraint != segment_entry.opstamp() {
panic!(format!(
"Invalid segment. Expect opstamp {}, got {}.",
self.opstamp_constraint,
segment_entry.opstamp()
));
}
if segment_entry.meta().num_docs() == 0 {
return;
}
let segment_id = segment_entry.segment_id();
self.segment_states.insert(segment_id, segment_entry);
// Check that we are ok with deletes.
self.segment_states.insert(segment_id, segment_entry.meta().clone());
}
pub fn set_commit(&mut self, opstamp: u64, segment_entries: Vec<SegmentEntry>) {
self.segment_states.clear();
self.opstamp_constraint = opstamp;
for segment_entry in segment_entries {
self.register_segment_entry(segment_entry);
}
}
pub fn remove_segment(&mut self, segment_id: &SegmentId) {
self.segment_states.remove(segment_id);
self.segment_states.remove(&segment_id);
}
pub fn get(&self, segment_id: &SegmentId) -> Option<SegmentEntry> {
self.segment_states.get(segment_id).cloned()
pub fn get(&self, segment_id: &SegmentId, delete_cursor: &DeleteCursor) -> Option<SegmentEntry> {
self.segment_states
.get(&segment_id)
.map(|segment_meta|
SegmentEntry::new(
segment_meta.clone(),
delete_cursor.clone(),
None,
self.opstamp_constraint
))
}
pub fn new(segment_metas: Vec<SegmentMeta>, delete_cursor: &DeleteCursor) -> SegmentRegister {
pub fn new(
segment_metas: Vec<SegmentMeta>,
opstamp: u64,
) -> SegmentRegister {
let mut segment_states = HashMap::new();
for segment_meta in segment_metas {
let segment_id = segment_meta.id();
let segment_entry = SegmentEntry::new(segment_meta, delete_cursor.clone(), None);
segment_states.insert(segment_id, segment_entry);
segment_states.insert(segment_meta.id(), segment_meta);
}
SegmentRegister {
segment_states,
opstamp_constraint: opstamp,
}
SegmentRegister { segment_states }
}
}
@@ -115,22 +171,22 @@ mod tests {
let segment_id_merged = SegmentId::generate_random();
{
let segment_meta = SegmentMeta::new(segment_id_a, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
let segment_meta = SegmentMeta::new(segment_id_a, 1u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None, 0u64);
segment_register.register_segment_entry(segment_entry);
}
assert_eq!(segment_ids(&segment_register), vec![segment_id_a]);
{
let segment_meta = SegmentMeta::new(segment_id_b, 0u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
let segment_meta = SegmentMeta::new(segment_id_b, 2u32);
let segment_entry = SegmentEntry::new(segment_meta, delete_queue.cursor(), None, 0u64);
segment_register.register_segment_entry(segment_entry);
}
segment_register.remove_segment(&segment_id_a);
segment_register.remove_segment(&segment_id_b);
{
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 0u32);
let segment_entry = SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None);
segment_register.add_segment_entry(segment_entry);
let segment_meta_merged = SegmentMeta::new(segment_id_merged, 3u32);
let segment_entry =
SegmentEntry::new(segment_meta_merged, delete_queue.cursor(), None, 0u64);
segment_register.receive_merge(&[segment_id_a, segment_id_b], &segment_entry);
segment_register.register_segment_entry(segment_entry);
}
assert_eq!(segment_ids(&segment_register), vec![segment_id_merged]);
}

View File

@@ -36,7 +36,6 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
use std::thread::JoinHandle;
use Opstamp;
use Result;
/// Save the index meta file.
@@ -126,7 +125,7 @@ fn perform_merge(
let segment_meta = SegmentMeta::new(merged_segment.id(), num_docs);
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None);
let after_merge_segment_entry = SegmentEntry::new(segment_meta.clone(), delete_cursor, None, target_opstamp);
Ok(after_merge_segment_entry)
}
@@ -156,8 +155,11 @@ impl SegmentUpdater {
stamper: Stamper,
delete_cursor: &DeleteCursor,
) -> Result<SegmentUpdater> {
let index_meta = index.load_metas()?;
let segments = index.searchable_segment_metas()?;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor);
let opstamp = index_meta.opstamp;
let segment_manager = SegmentManager::from_segments(segments, delete_cursor, opstamp);
let pool = CpuPoolBuilder::new()
.name_prefix("segment_updater")
.pool_size(1)
@@ -225,7 +227,7 @@ impl SegmentUpdater {
///
/// Tne method returns copies of the segment entries,
/// updated with the delete information.
fn purge_deletes(&self, target_opstamp: Opstamp) -> Result<Vec<SegmentEntry>> {
fn purge_deletes(&self, target_opstamp: u64) -> Result<Vec<SegmentEntry>> {
let mut segment_entries = self.0.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
let segment = self.0.index.segment(segment_entry.meta().clone());
@@ -234,7 +236,7 @@ impl SegmentUpdater {
Ok(segment_entries)
}
pub fn save_metas(&self, opstamp: Opstamp, commit_message: Option<String>) {
pub fn save_metas(&self, opstamp: u64, commit_message: Option<String>) {
if self.is_alive() {
let index = &self.0.index;
let directory = index.directory();
@@ -281,14 +283,30 @@ impl SegmentUpdater {
.garbage_collect(|| self.0.segment_manager.list_files());
}
pub fn commit(&self, opstamp: Opstamp, payload: Option<String>) -> Result<()> {
pub fn commit(&self, opstamp: u64, payload: Option<String>, soft: bool) -> Result<()> {
self.run_async(move |segment_updater| {
if segment_updater.is_alive() {
let segment_entries = segment_updater
.purge_deletes(opstamp)
.expect("Failed purge deletes");
segment_updater.0.segment_manager.commit(segment_entries);
segment_updater.save_metas(opstamp, payload);
if soft {
// Soft commit.
//
// The list `segment_entries` above is what we might want to use as searchable
// segment. However, we do not want to mark them as committed, and we want
// to keep the current set of committed segment.
segment_updater.0.segment_manager.soft_commit(opstamp, segment_entries);
// ... We do not save the meta file.
} else {
// Hard_commit. We register the new segment entries as committed.
segment_updater
.0
.segment_manager
.commit(opstamp, segment_entries);
// TODO error handling.
segment_updater.save_metas(opstamp, payload);
segment_updater.0.index.directory().flush().unwrap();
}
segment_updater.garbage_collect_files_exec();
segment_updater.consider_merge_options();
}

View File

@@ -5,7 +5,6 @@ use fastfield::FastFieldsWriter;
use fieldnorm::FieldNormsWriter;
use indexer::segment_serializer::SegmentSerializer;
use postings::MultiFieldPostingsWriter;
use schema::FieldEntry;
use schema::FieldType;
use schema::Schema;
use schema::Term;
@@ -16,7 +15,6 @@ use tokenizer::BoxedTokenizer;
use tokenizer::FacetTokenizer;
use tokenizer::{TokenStream, Tokenizer};
use DocId;
use Opstamp;
use Result;
/// A `SegmentWriter` is in charge of creating segment index from a
@@ -30,7 +28,7 @@ pub struct SegmentWriter {
segment_serializer: SegmentSerializer,
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: FieldNormsWriter,
doc_opstamps: Vec<Opstamp>,
doc_opstamps: Vec<u64>,
tokenizers: Vec<Option<Box<BoxedTokenizer>>>,
}
@@ -55,7 +53,7 @@ impl SegmentWriter {
schema
.fields()
.iter()
.map(FieldEntry::field_type)
.map(|field_entry| field_entry.field_type())
.map(|field_type| match *field_type {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()

View File

@@ -1,27 +1,70 @@
use std::ops::Range;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use Opstamp;
/// Stamper provides Opstamps, which is just an auto-increment id to label
/// an operation.
///
/// Cloning does not "fork" the stamp generation. The stamper actually wraps an `Arc`.
// AtomicU64 have not landed in stable.
// For the moment let's just use AtomicUsize on
// x86/64 bit platform, and a mutex on other platform.
#[cfg(target_arch = "x86_64")]
mod archicture_impl {
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Default)]
pub struct AtomicU64Ersatz(AtomicUsize);
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize))
}
pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
self.0.fetch_add(val as usize, order) as u64
}
}
}
#[cfg(not(target_arch = "x86_64"))]
mod archicture_impl {
use std::sync::atomic::Ordering;
/// Under other architecture, we rely on a mutex.
use std::sync::RwLock;
#[derive(Default)]
pub struct AtomicU64Ersatz(RwLock<u64>);
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(RwLock::new(first_opstamp))
}
pub fn fetch_add(&self, incr: u64, _order: Ordering) -> u64 {
let mut lock = self.0.write().unwrap();
let previous_val = *lock;
*lock = previous_val + incr;
previous_val
}
}
}
use self::archicture_impl::AtomicU64Ersatz;
#[derive(Clone, Default)]
pub struct Stamper(Arc<AtomicU64>);
pub struct Stamper(Arc<AtomicU64Ersatz>);
impl Stamper {
pub fn new(first_opstamp: Opstamp) -> Stamper {
Stamper(Arc::new(AtomicU64::new(first_opstamp)))
pub fn new(first_opstamp: u64) -> Stamper {
Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp)))
}
pub fn stamp(&self) -> Opstamp {
pub fn stamp(&self) -> u64 {
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
}
/// Given a desired count `n`, `stamps` returns an iterator that
/// will supply `n` number of u64 stamps.
pub fn stamps(&self, n: u64) -> Range<Opstamp> {
pub fn stamps(&self, n: u64) -> Range<u64> {
let start = self.0.fetch_add(n, Ordering::SeqCst);
Range {
start,
@@ -49,5 +92,4 @@ mod test {
assert_eq!(stamper.stamps(3u64), (12..15));
assert_eq!(stamper.stamp(), 15u64);
}
}

View File

@@ -226,7 +226,7 @@ mod docset;
pub use self::docset::{DocSet, SkipResult};
pub use core::SegmentComponent;
pub use core::{Index, Searcher, Segment, SegmentId, SegmentMeta, IndexMeta};
pub use core::{Index, Searcher, Segment, SegmentId, SegmentMeta};
pub use core::{InvertedIndexReader, SegmentReader};
pub use directory::Directory;
pub use indexer::IndexWriter;
@@ -254,16 +254,6 @@ pub mod merge_policy {
/// as they are added in the segment.
pub type DocId = u32;
/// A u64 assigned to every operation incrementally
///
/// All operations modifying the index receives an monotonic Opstamp.
/// The resulting state of the index is consistent with the opstamp ordering.
///
/// For instance, a commit with opstamp `32_423` will reflect all Add and Delete operations
/// with an opstamp `<= 32_423`. A delete operation with opstamp n will no affect a document added
/// with opstamp `n+1`.
pub type Opstamp = u64;
/// A f32 that represents the relevance of the document to the query
///
/// This is modelled internally as a `f32`. The
@@ -886,28 +876,28 @@ mod tests {
let searcher = reader.searcher();
let segment_reader: &SegmentReader = searcher.segment_reader(0);
{
let fast_field_reader_opt = segment_reader.fast_fields().u64(text_field);
assert!(fast_field_reader_opt.is_none());
let fast_field_reader_res = segment_reader.fast_field_reader::<u64>(text_field);
assert!(fast_field_reader_res.is_err());
}
{
let fast_field_reader_opt = segment_reader.fast_fields().u64(stored_int_field);
assert!(fast_field_reader_opt.is_none());
let fast_field_reader_res = segment_reader.fast_field_reader::<u64>(stored_int_field);
assert!(fast_field_reader_res.is_err());
}
{
let fast_field_reader_opt = segment_reader.fast_fields().u64(fast_field_signed);
assert!(fast_field_reader_opt.is_none());
let fast_field_reader_res = segment_reader.fast_field_reader::<u64>(fast_field_signed);
assert!(fast_field_reader_res.is_err());
}
{
let fast_field_reader_opt = segment_reader.fast_fields().i64(fast_field_signed);
assert!(fast_field_reader_opt.is_some());
let fast_field_reader = fast_field_reader_opt.unwrap();
let fast_field_reader_res = segment_reader.fast_field_reader::<i64>(fast_field_signed);
assert!(fast_field_reader_res.is_ok());
let fast_field_reader = fast_field_reader_res.unwrap();
assert_eq!(fast_field_reader.get(0), 4i64)
}
{
let fast_field_reader_opt = segment_reader.fast_fields().i64(fast_field_signed);
assert!(fast_field_reader_opt.is_some());
let fast_field_reader = fast_field_reader_opt.unwrap();
let fast_field_reader_res = segment_reader.fast_field_reader::<i64>(fast_field_signed);
assert!(fast_field_reader_res.is_ok());
let fast_field_reader = fast_field_reader_res.unwrap();
assert_eq!(fast_field_reader.get(0), 4i64)
}
}

View File

@@ -1,5 +1,3 @@
use postings::compression::AlignedBuffer;
/// This modules define the logic used to search for a doc in a given
/// block. (at most 128 docs)
///
@@ -8,7 +6,7 @@ use postings::compression::AlignedBuffer;
#[cfg(target_arch = "x86_64")]
mod sse2 {
use postings::compression::{AlignedBuffer, COMPRESSION_BLOCK_SIZE};
use postings::compression::COMPRESSION_BLOCK_SIZE;
use std::arch::x86_64::__m128i as DataType;
use std::arch::x86_64::_mm_add_epi32 as op_add;
use std::arch::x86_64::_mm_cmplt_epi32 as op_lt;
@@ -25,9 +23,9 @@ mod sse2 {
///
/// There is no early exit here. We simply count the
/// number of elements that are `< target`.
pub(crate) fn linear_search_sse2_128(arr: &AlignedBuffer, target: u32) -> usize {
pub fn linear_search_sse2_128(arr: &[u32], target: u32) -> usize {
unsafe {
let ptr = arr as *const AlignedBuffer as *const DataType;
let ptr = arr.as_ptr() as *const DataType;
let vkey = set1(target as i32);
let mut cnt = set0();
// We work over 4 `__m128i` at a time.
@@ -49,16 +47,14 @@ mod sse2 {
#[cfg(test)]
mod test {
use super::linear_search_sse2_128;
use postings::compression::{AlignedBuffer, COMPRESSION_BLOCK_SIZE};
#[test]
fn test_linear_search_sse2_128_u32() {
let mut block = [0u32; COMPRESSION_BLOCK_SIZE];
for el in 0u32..128u32 {
block[el as usize] = el * 2 + 1 << 18;
for i in 0..23 {
dbg!(i);
let arr: Vec<u32> = (0..128).map(|el| el * 2 + 1 << 18).collect();
assert_eq!(linear_search_sse2_128(&arr, arr[64] + 1), 65);
}
let target = block[64] + 1;
assert_eq!(linear_search_sse2_128(&AlignedBuffer(block), target), 65);
}
}
}
@@ -131,21 +127,17 @@ impl BlockSearcher {
/// then we use a different implementation that does an exhaustive linear search over
/// the full block whenever the block is full (`len == 128`). It is surprisingly faster, most likely because of the lack
/// of branch.
pub(crate) fn search_in_block(
self,
block_docs: &AlignedBuffer,
len: usize,
start: usize,
target: u32,
) -> usize {
pub fn search_in_block(&self, block_docs: &[u32], start: usize, target: u32) -> usize {
#[cfg(target_arch = "x86_64")]
{
use postings::compression::COMPRESSION_BLOCK_SIZE;
if self == BlockSearcher::SSE2 && len == COMPRESSION_BLOCK_SIZE {
return sse2::linear_search_sse2_128(block_docs, target);
if *self == BlockSearcher::SSE2 {
if block_docs.len() == COMPRESSION_BLOCK_SIZE {
return sse2::linear_search_sse2_128(block_docs, target);
}
}
}
start + galloping(&block_docs.0[start..len], target)
start + galloping(&block_docs[start..], target)
}
}
@@ -166,7 +158,6 @@ mod tests {
use super::exponential_search;
use super::linear_search;
use super::BlockSearcher;
use postings::compression::{AlignedBuffer, COMPRESSION_BLOCK_SIZE};
#[test]
fn test_linear_search() {
@@ -195,19 +186,8 @@ mod tests {
fn util_test_search_in_block(block_searcher: BlockSearcher, block: &[u32], target: u32) {
let cursor = search_in_block_trivial_but_slow(block, target);
assert!(block.len() < COMPRESSION_BLOCK_SIZE);
let mut output_buffer = [u32::max_value(); COMPRESSION_BLOCK_SIZE];
output_buffer[..block.len()].copy_from_slice(block);
for i in 0..cursor {
assert_eq!(
block_searcher.search_in_block(
&AlignedBuffer(output_buffer),
block.len(),
i,
target
),
cursor
);
assert_eq!(block_searcher.search_in_block(block, i, target), cursor);
}
}

View File

@@ -46,11 +46,11 @@ impl BlockEncoder {
/// We ensure that the OutputBuffer is align on 128 bits
/// in order to run SSE2 linear search on it.
#[repr(align(128))]
pub(crate) struct AlignedBuffer(pub [u32; COMPRESSION_BLOCK_SIZE]);
struct OutputBuffer([u32; COMPRESSION_BLOCK_SIZE + 1]);
pub struct BlockDecoder {
bitpacker: BitPacker4x,
output: AlignedBuffer,
output: OutputBuffer,
pub output_len: usize,
}
@@ -60,9 +60,11 @@ impl BlockDecoder {
}
pub fn with_val(val: u32) -> BlockDecoder {
let mut output = [val; COMPRESSION_BLOCK_SIZE + 1];
output[COMPRESSION_BLOCK_SIZE] = 0u32;
BlockDecoder {
bitpacker: BitPacker4x::new(),
output: AlignedBuffer([val; COMPRESSION_BLOCK_SIZE]),
output: OutputBuffer(output),
output_len: 0,
}
}
@@ -89,11 +91,6 @@ impl BlockDecoder {
&self.output.0[..self.output_len]
}
#[inline]
pub(crate) fn output_aligned(&self) -> (&AlignedBuffer, usize) {
(&self.output, self.output_len)
}
#[inline]
pub fn output(&self, idx: usize) -> u32 {
self.output.0[idx]

View File

@@ -55,15 +55,13 @@ pub mod tests {
use fieldnorm::FieldNormReader;
use indexer::operation::AddOperation;
use indexer::SegmentWriter;
use merge_policy::NoMergePolicy;
use query::Scorer;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use schema::Field;
use schema::IndexRecordOption;
use schema::{Document, Schema, Term, INDEXED, STRING, TEXT};
use schema::{Field, TextOptions};
use schema::{IndexRecordOption, TextFieldIndexing};
use std::iter;
use tokenizer::{SimpleTokenizer, MAX_TOKEN_LEN};
use DocId;
use Score;
@@ -162,52 +160,6 @@ pub mod tests {
}
}
#[test]
pub fn test_drop_token_that_are_too_long() {
let ok_token_text: String = iter::repeat('A').take(MAX_TOKEN_LEN).collect();
let mut exceeding_token_text: String = iter::repeat('A').take(MAX_TOKEN_LEN + 1).collect();
exceeding_token_text.push_str(" hello");
let mut schema_builder = Schema::builder();
let text_options = TextOptions::default().set_indexing_options(
TextFieldIndexing::default()
.set_index_option(IndexRecordOption::WithFreqsAndPositions)
.set_tokenizer("simple_no_truncation"),
);
let text_field = schema_builder.add_text_field("text", text_options);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema.clone());
index
.tokenizers()
.register("simple_no_truncation", SimpleTokenizer);
let reader = index.reader().unwrap();
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.set_merge_policy(Box::new(NoMergePolicy));
{
index_writer.add_document(doc!(text_field=>exceeding_token_text));
index_writer.commit().unwrap();
reader.reload().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(0u32);
let inverted_index = segment_reader.inverted_index(text_field);
assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
assert_eq!(&bytes, b"hello");
}
{
index_writer.add_document(doc!(text_field=>ok_token_text.clone()));
index_writer.commit().unwrap();
reader.reload().unwrap();
let searcher = reader.searcher();
let segment_reader = searcher.segment_reader(1u32);
let inverted_index = segment_reader.inverted_index(text_field);
assert_eq!(inverted_index.terms().num_terms(), 1);
let mut bytes = vec![];
assert!(inverted_index.terms().ord_to_term(0, &mut bytes));
assert_eq!(&bytes[..], ok_token_text.as_bytes());
}
}
#[test]
pub fn test_position_and_fieldnorm1() {
let mut positions = Vec::new();

View File

@@ -12,8 +12,8 @@ use std::io;
use std::marker::PhantomData;
use std::ops::DerefMut;
use termdict::TermOrdinal;
use tokenizer::Token;
use tokenizer::TokenStream;
use tokenizer::{Token, MAX_TOKEN_LEN};
use DocId;
use Result;
@@ -210,18 +210,8 @@ pub trait PostingsWriter {
) -> u32 {
let mut term = Term::for_field(field);
let mut sink = |token: &Token| {
// We skip all tokens with a len greater than u16.
if token.text.len() <= MAX_TOKEN_LEN {
term.set_text(token.text.as_str());
self.subscribe(term_index, doc_id, token.position as u32, &term, heap);
} else {
info!(
"A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \
MAX_TOKEN_LEN in the documentation for more information.",
token.text.len(),
MAX_TOKEN_LEN
);
}
term.set_text(token.text.as_str());
self.subscribe(term_index, doc_id, token.position as u32, &term, heap);
};
token_stream.process(&mut sink)
}

View File

@@ -4,7 +4,7 @@ use common::{BinarySerializable, VInt};
use docset::{DocSet, SkipResult};
use owned_read::OwnedRead;
use positions::PositionReader;
use postings::compression::{compressed_block_size, AlignedBuffer};
use postings::compression::compressed_block_size;
use postings::compression::{BlockDecoder, VIntDecoder, COMPRESSION_BLOCK_SIZE};
use postings::serializer::PostingsSerializer;
use postings::BlockSearcher;
@@ -130,11 +130,9 @@ impl DocSet for SegmentPostings {
// next needs to be called a first time to point to the correct element.
#[inline]
fn advance(&mut self) -> bool {
if self.position_computer.is_some() && self.cur < COMPRESSION_BLOCK_SIZE {
if self.position_computer.is_some() {
let term_freq = self.term_freq() as usize;
if let Some(position_computer) = self.position_computer.as_mut() {
position_computer.add_skip(term_freq);
}
self.position_computer.as_mut().unwrap().add_skip(term_freq);
}
self.cur += 1;
if self.cur >= self.block_cursor.block_len() {
@@ -169,6 +167,7 @@ impl DocSet for SegmentPostings {
// skip blocks until one that might contain the target
// check if we need to go to the next block
let need_positions = self.position_computer.is_some();
let mut sum_freqs_skipped: u32 = 0;
if !self
.block_cursor
@@ -182,7 +181,7 @@ impl DocSet for SegmentPostings {
// we are not in the right block.
//
// First compute all of the freqs skipped from the current block.
if self.position_computer.is_some() {
if need_positions {
sum_freqs_skipped = self.block_cursor.freqs()[self.cur..].iter().sum();
match self.block_cursor.skip_to(target) {
BlockSegmentPostingsSkipResult::Success(block_skip_freqs) => {
@@ -201,21 +200,24 @@ impl DocSet for SegmentPostings {
self.cur = 0;
}
let cur = self.cur;
// we're in the right block now, start with an exponential search
let (output, len) = self.block_cursor.docs_aligned();
let block_docs = self.block_cursor.docs();
let new_cur = self
.block_searcher
.search_in_block(&output, len, cur, target);
if let Some(position_computer) = self.position_computer.as_mut() {
sum_freqs_skipped += self.block_cursor.freqs()[cur..new_cur].iter().sum::<u32>();
position_computer.add_skip(sum_freqs_skipped as usize);
.search_in_block(&block_docs, self.cur, target);
if need_positions {
sum_freqs_skipped += self.block_cursor.freqs()[self.cur..new_cur]
.iter()
.sum::<u32>();
self.position_computer
.as_mut()
.unwrap()
.add_skip(sum_freqs_skipped as usize);
}
self.cur = new_cur;
// `doc` is now the first element >= `target`
let doc = output.0[new_cur];
let doc = block_docs[new_cur];
debug_assert!(doc >= target);
if doc == target {
SkipResult::Reached
@@ -225,16 +227,12 @@ impl DocSet for SegmentPostings {
}
/// Return the current document's `DocId`.
///
/// # Panics
///
/// Will panics if called without having called advance before.
#[inline]
fn doc(&self) -> DocId {
let docs = self.block_cursor.docs();
debug_assert!(
self.cur < docs.len(),
"Have you forgotten to call `.advance()` at least once before calling `.doc()` ."
"Have you forgotten to call `.advance()` at least once before calling .doc()."
);
docs[self.cur]
}
@@ -266,33 +264,17 @@ impl HasLen for SegmentPostings {
}
impl Postings for SegmentPostings {
/// Returns the frequency associated to the current document.
/// If the schema is set up so that no frequency have been encoded,
/// this method should always return 1.
///
/// # Panics
///
/// Will panics if called without having called advance before.
fn term_freq(&self) -> u32 {
debug_assert!(
// Here we do not use the len of `freqs()`
// because it is actually ok to request for the freq of doc
// even if no frequency were encoded for the field.
//
// In that case we hit the block just as if the frequency had been
// decoded. The block is simply prefilled by the value 1.
self.cur < COMPRESSION_BLOCK_SIZE,
"Have you forgotten to call `.advance()` at least once before calling \
`.term_freq()`."
);
self.block_cursor.freq(self.cur)
}
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let term_freq = self.term_freq() as usize;
if let Some(position_comp) = self.position_computer.as_mut() {
output.resize(term_freq, 0u32);
position_comp.positions_with_offset(offset, &mut output[..]);
if self.position_computer.is_some() {
output.resize(self.term_freq() as usize, 0u32);
self.position_computer
.as_mut()
.unwrap()
.positions_with_offset(offset, &mut output[..])
} else {
output.clear();
}
@@ -414,10 +396,6 @@ impl BlockSegmentPostings {
self.doc_decoder.output_array()
}
pub(crate) fn docs_aligned(&self) -> (&AlignedBuffer, usize) {
self.doc_decoder.output_aligned()
}
/// Return the document at index `idx` of the block.
#[inline]
pub fn doc(&self, idx: usize) -> u32 {
@@ -614,7 +592,6 @@ mod tests {
use common::HasLen;
use core::Index;
use docset::DocSet;
use postings::postings::Postings;
use schema::IndexRecordOption;
use schema::Schema;
use schema::Term;
@@ -631,18 +608,6 @@ mod tests {
assert_eq!(postings.len(), 0);
}
#[test]
#[should_panic(expected = "Have you forgotten to call `.advance()`")]
fn test_panic_if_doc_called_before_advance() {
SegmentPostings::empty().doc();
}
#[test]
#[should_panic(expected = "Have you forgotten to call `.advance()`")]
fn test_panic_if_freq_called_before_advance() {
SegmentPostings::empty().term_freq();
}
#[test]
fn test_empty_block_segment_postings() {
let mut postings = BlockSegmentPostings::empty();

View File

@@ -14,7 +14,7 @@ use termdict::{TermDictionaryBuilder, TermOrdinal};
use DocId;
use Result;
/// `InvertedIndexSerializer` is in charge of serializing
/// `PostingsSerializer` is in charge of serializing
/// postings on disk, in the
/// * `.idx` (inverted index)
/// * `.pos` (positions file)
@@ -54,7 +54,7 @@ pub struct InvertedIndexSerializer {
}
impl InvertedIndexSerializer {
/// Open a new `InvertedIndexSerializer` for the given segment
/// Open a new `PostingsSerializer` for the given segment
fn create(
terms_write: CompositeWrite<WritePtr>,
postings_write: CompositeWrite<WritePtr>,
@@ -175,7 +175,7 @@ impl<'a> FieldSerializer<'a> {
let positions_idx = self
.positions_serializer_opt
.as_ref()
.map(PositionSerializer::positions_idx)
.map(|positions_serializer| positions_serializer.positions_idx())
.unwrap_or(0u64);
TermInfo {
doc_freq: 0,

View File

@@ -205,332 +205,4 @@ mod tests {
assert_eq!(score_docs(&boolean_query), vec![0.977973, 0.84699446]);
}
}
/*
DoC 0
{
"_index": "test",
"_type": "_doc",
"_id": "0",
"matched": true,
"explanation": {
"value": 6.2610235,
"description": "max of:",
"details": [{
"value": 6.1969156,
"description": "sum of:",
"details": [{
"value": 6.1969156,
"description": "weight(text:оксана in 561) [PerFieldSimilarity], result of:",
"details": [{
"value": 6.1969156,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 5.65998,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 3,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.49766606,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 19.0,
"description": "dl, length of field",
"details": []
}, {
"value": 24.105577,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}]
}, {
"value": 6.2610235,
"description": "sum of:",
"details": [{
"value": 6.2610235,
"description": "weight(title:оксана in 561) [PerFieldSimilarity], result of:",
"details": [{
"value": 6.2610235,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 5.4086657,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 4,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.52617776,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 4.0,
"description": "dl, length of field",
"details": []
}, {
"value": 5.99502,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}]
}]
}
}
doc 2
{
"_index": "test",
"_type": "_doc",
"_id": "2",
"matched": true,
"explanation": {
"value": 11.911896,
"description": "max of:",
"details": [{
"value": 11.911896,
"description": "sum of:",
"details": [{
"value": 5.4068284,
"description": "weight(title:оксана in 0) [PerFieldSimilarity], result of:",
"details": [{
"value": 5.4068284,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 5.4086657,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 4,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.45439103,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 6.0,
"description": "dl, length of field",
"details": []
}, {
"value": 5.99502,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}, {
"value": 6.505067,
"description": "weight(title:лифенко in 0) [PerFieldSimilarity], result of:",
"details": [{
"value": 6.505067,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 6.5072775,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 1,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.45439103,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 6.0,
"description": "dl, length of field",
"details": []
}, {
"value": 5.99502,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}]
}]
}
}
*/
// motivated by #554
#[test]
fn test_bm25_several_fields() {
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT);
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(
// tf = 1 0
title => "Законы притяжения Оксана Кулакова",
// tf = 1 0
text => "Законы притяжения Оксана Кулакова] \n\nТема: Сексуальное искусство, Женственность\nТип товара: Запись вебинара (аудио)\nПродолжительность: 1,5 часа\n\nСсылка на вебинар:\n ",
));
index_writer.add_document(doc!(
// tf = 1 0
title => "Любимые русские пироги (Оксана Путан)",
// tf = 2 0
text => "http://i95.fastpic.ru/big/2017/0628/9a/615b9c8504d94a3893d7f496ac53539a.jpg \n\nОт издателя\nОксана Путан профессиональный повар, автор кулинарных книг и известный кулинарный блогер. Ее рецепты отличаются практичностью, доступностью и пользуются огромной популярностью в русскоязычном интернете. Это третья книга автора о самом вкусном и ароматном настоящих русских пирогах и выпечке!\nДаже новички на кухне легко готовят по ее рецептам. Оксана описывает процесс приготовления настолько подробно и понятно, что вам остается только наслаждаться готовкой и не тратить время на лишние усилия. Готовьте легко и просто!\n\nhttps://www.ozon.ru/context/detail/id/139872462/"
));
index_writer.add_document(doc!(
// tf = 1 1
title => "PDF Мастер Класс \"Морячок\" (Оксана Лифенко)",
// tf = 0 0
text => "https://i.ibb.co/pzvHrDN/I3d U T6 Gg TM.jpg\nhttps://i.ibb.co/NFrb6v6/N0ls Z9nwjb U.jpg\nВ описание входит штаны, кофта, берет, матросский воротник. Описание продается в формате PDF, состоит из 12 страниц формата А4 и может быть напечатано на любом принтере.\nОписание предназначено для кукол BJD RealPuki от FairyLand, но может подойти и другим подобным куклам. Также вы можете вязать этот наряд из обычной пряжи, и он подойдет для куколок побольше.\nhttps://vk.com/market 95724412?w=product 95724412_2212"
));
for _ in 0..1_000 {
index_writer.add_document(doc!(
title => "a b d e f g",
text => "maitre corbeau sur un arbre perche tenait dans son bec un fromage Maitre rnard par lodeur alleche lui tint a peu pres ce langage."
));
}
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![title, text]);
let query = query_parser
.parse_query("Оксана Лифенко")
.unwrap();
let weight = query.weight(&searcher, true).unwrap();
let mut scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
// let mut scores = vec![];
// while
println!("=====|");
scorer.advance();
dbg!("scorer.score()");
assert!(false);
// scores.push(scorer.score());
// assert_eq!(scores, &[0.8017307, 0.72233325, 1.0300813]);
}
// motivated by #554
#[test]
fn test_bm25_several_fields_bbb() {
let mut schema_builder = Schema::builder();
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(
text => "Законы притяжения Оксана Кулакова] \n\nТема: Сексуальное искусство, Женственность\nТип товара: Запись вебинара (аудио)\nПродолжительность: 1,5 часа\n\nСсылка на вебинар:\n ",
));
index_writer.add_document(doc!(
text => "http://i95.fastpic.ru/big/2017/0628/9a/615b9c8504d94a3893d7f496ac53539a.jpg \n\nОт издателя\nОксана Путан профессиональный повар, автор кулинарных книг и известный кулинарный блогер. Ее рецепты отличаются практичностью, доступностью и пользуются огромной популярностью в русскоязычном интернете. Это третья книга автора о самом вкусном и ароматном настоящих русских пирогах и выпечке!\nДаже новички на кухне легко готовят по ее рецептам. Оксана описывает процесс приготовления настолько подробно и понятно, что вам остается только наслаждаться готовкой и не тратить время на лишние усилия. Готовьте легко и просто!\n\nhttps://www.ozon.ru/context/detail/id/139872462/"
));
index_writer.add_document(doc!(
text => "https://i.ibb.co/pzvHrDN/I3d U T6 Gg TM.jpg\nhttps://i.ibb.co/NFrb6v6/N0ls Z9nwjb U.jpg\nВ описание входит штаны, кофта, берет, матросский воротник. Описание продается в формате PDF, состоит из 12 страниц формата А4 и может быть напечатано на любом принтере.\nОписание предназначено для кукол BJD RealPuki от FairyLand, но может подойти и другим подобным куклам. Также вы можете вязать этот наряд из обычной пряжи, и он подойдет для куколок побольше.\nhttps://vk.com/market 95724412?w=product 95724412_2212"
));
for _ in 0..100 {
index_writer.add_document(doc!(
text => "maitre corbeau sur un arbre perche tenait dans son bec un fromage Maitre rnard par lodeur alleche lui tint a peu pres ce langage."
));
}
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![text]);
let query = query_parser
.parse_query("Оксана Лифенко")
.unwrap();
let weight = query.weight(&searcher, true).unwrap();
let mut scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
let mut scores = vec![];
while scorer.advance() {
scores.push(scorer.score());
}
assert_eq!(scores, &[0.8017307, 0.72233325, 1.0300813]);
index_writer.commit().unwrap();
}
}

View File

@@ -4,7 +4,6 @@ use error::TantivyError;
use query::bm25::BM25Weight;
use query::Query;
use query::Weight;
use schema::IndexRecordOption;
use schema::{Field, Term};
use std::collections::BTreeSet;
use Result;
@@ -84,7 +83,7 @@ impl Query for PhraseQuery {
let has_positions = field_entry
.field_type()
.get_index_record_option()
.map(IndexRecordOption::has_positions)
.map(|index_record_option| index_record_option.has_positions())
.unwrap_or(false);
if !has_positions {
let field_name = field_entry.name();

View File

@@ -1,7 +1,6 @@
#![cfg_attr(feature = "cargo-clippy", allow(clippy::unneeded_field_pattern))]
#![cfg_attr(feature = "cargo-clippy", allow(clippy::toplevel_ref_arg))]
use super::query_grammar;
use super::user_input_ast::*;
use combine::char::*;
use combine::error::StreamError;
@@ -23,7 +22,7 @@ parser! {
parser! {
fn word[I]()(I) -> String
where [I: Stream<Item = char>] {
many1(satisfy(char::is_alphanumeric))
many1(satisfy(|c: char| c.is_alphanumeric()))
.and_then(|s: String| {
match s.as_str() {
"OR" => Err(StreamErrorFor::<I>::unexpected_static_message("OR")),
@@ -63,7 +62,7 @@ parser! {
fn negative_number[I]()(I) -> String
where [I: Stream<Item = char>]
{
(char('-'), many1(satisfy(char::is_numeric)))
(char('-'), many1(satisfy(|c: char| c.is_numeric())))
.map(|(s1, s2): (char, String)| format!("{}{}", s1, s2))
}
}
@@ -185,7 +184,7 @@ parser! {
}
)
)
.map(query_grammar::Element::into_dnf)
.map(|el| el.into_dnf())
.map(|fnd| {
if fnd.len() == 1 {
UserInputAST::and(fnd.into_iter().next().unwrap()) //< safe

View File

@@ -98,20 +98,4 @@ mod tests {
}
}
#[test]
fn test_term_query_count_when_there_are_deletes() {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 5_000_000).unwrap();
index_writer.add_document(doc!(text_field=>"a b"));
index_writer.add_document(doc!(text_field=>"a c"));
index_writer.delete_term(Term::from_field_text(text_field, "b"));
index_writer.commit().unwrap();
let term_a = Term::from_field_text(text_field, "a");
let term_query = TermQuery::new(term_a, IndexRecordOption::Basic);
let reader = index.reader().unwrap();
assert_eq!(term_query.count(&*reader.searcher()).unwrap(), 1);
}
}

View File

@@ -39,15 +39,15 @@ impl Weight for TermWeight {
}
fn count(&self, reader: &SegmentReader) -> Result<u32> {
if let Some(delete_bitset) = reader.delete_bitset() {
Ok(self.scorer(reader)?.count(delete_bitset))
} else {
if reader.num_deleted_docs() == 0 {
let field = self.term.field();
Ok(reader
.inverted_index(field)
.get_term_info(&self.term)
.map(|term_info| term_info.doc_freq)
.unwrap_or(0))
} else {
Ok(self.scorer(reader)?.count())
}
}
}

View File

@@ -96,7 +96,7 @@ fn refill<TScorer: Scorer, TScoreCombiner: ScoreCombiner>(
impl<TScorer: Scorer, TScoreCombiner: ScoreCombiner> Union<TScorer, TScoreCombiner> {
fn refill(&mut self) -> bool {
if let Some(min_doc) = self.docsets.iter().map(DocSet::doc).min() {
if let Some(min_doc) = self.docsets.iter_mut().map(|docset| docset.doc()).min() {
self.offset = min_doc;
self.cursor = 0;
refill(
@@ -145,7 +145,7 @@ where
}
}
fn count_including_deleted(&mut self) -> u32 {
fn count(&mut self) -> u32 {
let mut count = self.bitsets[self.cursor..HORIZON_NUM_TINYBITSETS]
.iter()
.map(|bitset| bitset.len())
@@ -163,8 +163,6 @@ where
count
}
// TODO implement `count` efficiently.
fn skip_next(&mut self, target: DocId) -> SkipResult {
if !self.advance() {
return SkipResult::End;
@@ -302,7 +300,7 @@ mod tests {
count += 1;
}
assert!(!union_expected.advance());
assert_eq!(count, make_union().count_including_deleted());
assert_eq!(count, make_union().count());
}
#[test]

View File

@@ -13,11 +13,6 @@ pub trait Weight: Send + Sync + 'static {
/// Returns the number documents within the given `SegmentReader`.
fn count(&self, reader: &SegmentReader) -> Result<u32> {
let mut scorer = self.scorer(reader)?;
if let Some(delete_bitset) = reader.delete_bitset() {
Ok(scorer.count(delete_bitset))
} else {
Ok(scorer.count_including_deleted())
}
Ok(self.scorer(reader)?.count())
}
}

View File

@@ -10,6 +10,7 @@ use Index;
use Result;
use Searcher;
use SegmentReader;
use schema::Schema;
/// Defines when a new version of the index should be reloaded.
///
@@ -158,6 +159,11 @@ pub struct IndexReader {
}
impl IndexReader {
pub fn schema(&self) -> Schema {
self.inner.index.schema()
}
/// Update searchers so that they reflect the state of the last
/// `.commit()`.
///

View File

@@ -128,7 +128,7 @@ impl Document {
self.field_values
.iter()
.filter(|field_value| field_value.field() == field)
.map(FieldValue::value)
.map(|field_value| field_value.value())
.collect()
}
@@ -137,7 +137,7 @@ impl Document {
self.field_values
.iter()
.find(|field_value| field_value.field() == field)
.map(FieldValue::value)
.map(|field_value| field_value.value())
}
}

View File

@@ -4,7 +4,6 @@ use schema::{IntOptions, TextOptions};
use schema::Facet;
use schema::IndexRecordOption;
use schema::TextFieldIndexing;
use schema::Value;
use serde_json::Value as JsonValue;
@@ -95,7 +94,7 @@ impl FieldType {
match *self {
FieldType::Str(ref text_options) => text_options
.get_indexing_options()
.map(TextFieldIndexing::index_option),
.map(|indexing_options| indexing_options.index_option()),
FieldType::U64(ref int_options)
| FieldType::I64(ref int_options)
| FieldType::Date(ref int_options) => {

View File

@@ -130,16 +130,7 @@ impl SchemaBuilder {
self.add_field(field_entry)
}
/// Adds a fast bytes field to the schema.
///
/// Bytes field are not searchable and are only used
/// as fast field, to associate any kind of payload
/// to a document.
///
/// For instance, learning-to-rank often requires to access
/// some document features at scoring time.
/// These can be serializing and stored as a bytes field to
/// get access rapidly when scoring each document.
/// Adds a fast bytes field to the schema
pub fn add_bytes_field(&mut self, field_name: &str) -> Field {
let field_entry = FieldEntry::new_bytes(field_name.to_string());
self.add_field(field_entry)
@@ -233,7 +224,7 @@ impl Schema {
let field_name = self.get_field_name(field);
let values: Vec<Value> = field_values
.into_iter()
.map(FieldValue::value)
.map(|field_val| field_val.value())
.cloned()
.collect();
field_map.insert(field_name.to_string(), values);

View File

@@ -1,7 +1,6 @@
use htmlescape::encode_minimal;
use query::Query;
use schema::Field;
use schema::Value;
use std::cmp::Ordering;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
@@ -304,7 +303,7 @@ impl SnippetGenerator {
let text: String = doc
.get_all(self.field)
.into_iter()
.flat_map(Value::text)
.flat_map(|val| val.text())
.collect::<Vec<&str>>()
.join(" ");
self.snippet(&text)

View File

@@ -227,7 +227,7 @@ pub struct PerFieldSpaceUsage {
impl PerFieldSpaceUsage {
pub(crate) fn new(fields: HashMap<Field, FieldUsage>) -> PerFieldSpaceUsage {
let total = fields.values().map(FieldUsage::total).sum();
let total = fields.values().map(|x| x.total()).sum();
PerFieldSpaceUsage { fields, total }
}

File diff suppressed because it is too large Load Diff

View File

@@ -44,17 +44,18 @@ where
}
fn advance(&mut self) -> bool {
if !self.tail.advance() {
return false;
}
if self.token_mut().text.is_ascii() {
// fast track for ascii.
self.token_mut().text.make_ascii_lowercase();
if self.tail.advance() {
if self.token_mut().text.is_ascii() {
// fast track for ascii.
self.token_mut().text.make_ascii_lowercase();
} else {
to_lowercase_unicode(&mut self.tail.token_mut().text, &mut self.buffer);
mem::swap(&mut self.tail.token_mut().text, &mut self.buffer);
}
true
} else {
to_lowercase_unicode(&mut self.tail.token_mut().text, &mut self.buffer);
mem::swap(&mut self.tail.token_mut().text, &mut self.buffer);
false
}
true
}
}

View File

@@ -97,8 +97,6 @@
//! If you built your schema programmatically, a complete example
//! could like this for instance.
//!
//! Note that tokens with a len greater or equal to [`MAX_TOKEN_LEN`](./constant.MAX_TOKEN_LEN.html).
//!
//! # Example
//!
//! ```
@@ -131,7 +129,6 @@
//! ```
//!
mod alphanum_only;
mod ascii_folding_filter;
mod facet_tokenizer;
mod lower_caser;
mod ngram_tokenizer;
@@ -145,7 +142,6 @@ mod tokenizer;
mod tokenizer_manager;
pub use self::alphanum_only::AlphaNumOnlyFilter;
pub use self::ascii_folding_filter::AsciiFoldingFilter;
pub use self::facet_tokenizer::FacetTokenizer;
pub use self::lower_caser::LowerCaser;
pub use self::ngram_tokenizer::NgramTokenizer;
@@ -161,13 +157,6 @@ pub use self::tokenizer::BoxedTokenizer;
pub use self::tokenizer::{Token, TokenFilter, TokenStream, Tokenizer};
pub use self::tokenizer_manager::TokenizerManager;
/// Maximum authorized len (in bytes) for a token.
///
/// Tokenizer are in charge of not emitting tokens larger than this value.
/// Currently, if a faulty tokenizer implementation emits tokens with a length larger than
/// `2^16 - 1 - 4`, the token will simply be ignored downstream.
pub const MAX_TOKEN_LEN: usize = u16::max_value() as usize - 4;
#[cfg(test)]
pub mod tests {
use super::{
@@ -239,27 +228,27 @@ pub mod tests {
fn test_non_en_tokenizer() {
let tokenizer_manager = TokenizerManager::default();
tokenizer_manager.register(
"el_stem",
"es_stem",
SimpleTokenizer
.filter(RemoveLongFilter::limit(40))
.filter(LowerCaser)
.filter(Stemmer::new(Language::Greek)),
.filter(Stemmer::new(Language::Spanish)),
);
let en_tokenizer = tokenizer_manager.get("el_stem").unwrap();
let en_tokenizer = tokenizer_manager.get("es_stem").unwrap();
let mut tokens: Vec<Token> = vec![];
{
let mut add_token = |token: &Token| {
tokens.push(token.clone());
};
en_tokenizer
.token_stream("Καλημέρα, χαρούμενε φορολογούμενε!")
.token_stream("Hola, feliz contribuyente!")
.process(&mut add_token);
}
assert_eq!(tokens.len(), 3);
assert_token(&tokens[0], 0, "καλημερ", 0, 16);
assert_token(&tokens[1], 1, "χαρουμεν", 18, 36);
assert_token(&tokens[2], 2, "φορολογουμεν", 37, 63);
assert_token(&tokens[0], 0, "hola", 0, 4);
assert_token(&tokens[1], 1, "feliz", 6, 11);
assert_token(&tokens[2], 2, "contribuyent", 12, 25);
}
#[test]

View File

@@ -29,9 +29,12 @@ impl<'a> Tokenizer<'a> for RawTokenizer {
impl TokenStream for RawTokenStream {
fn advance(&mut self) -> bool {
let result = self.has_token;
self.has_token = false;
result
if self.has_token {
self.has_token = false;
true
} else {
false
}
}
fn token(&self) -> &Token {

View File

@@ -91,6 +91,7 @@ where
return true;
}
}
false
}
}

View File

@@ -38,16 +38,23 @@ impl<'a> TokenStream for SimpleTokenStream<'a> {
fn advance(&mut self) -> bool {
self.token.text.clear();
self.token.position = self.token.position.wrapping_add(1);
while let Some((offset_from, c)) = self.chars.next() {
if c.is_alphanumeric() {
let offset_to = self.search_token_end();
self.token.offset_from = offset_from;
self.token.offset_to = offset_to;
self.token.text.push_str(&self.text[offset_from..offset_to]);
return true;
loop {
match self.chars.next() {
Some((offset_from, c)) => {
if c.is_alphanumeric() {
let offset_to = self.search_token_end();
self.token.offset_from = offset_from;
self.token.offset_to = offset_to;
self.token.text.push_str(&self.text[offset_from..offset_to]);
return true;
}
}
None => {
return false;
}
}
}
false
}
fn token(&self) -> &Token {

View File

@@ -2,6 +2,7 @@
use super::{Token, TokenFilter, TokenStream};
use rust_stemmers::{self, Algorithm};
use std::sync::Arc;
/// Available stemmer languages.
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Copy, Clone)]
@@ -56,14 +57,14 @@ impl Language {
/// Tokens are expected to be lowercased beforehand.
#[derive(Clone)]
pub struct Stemmer {
stemmer_algorithm: Algorithm,
stemmer_algorithm: Arc<Algorithm>,
}
impl Stemmer {
/// Creates a new Stemmer `TokenFilter` for a given language algorithm.
pub fn new(language: Language) -> Stemmer {
Stemmer {
stemmer_algorithm: language.algorithm(),
stemmer_algorithm: Arc::new(language.algorithm()),
}
}
}
@@ -82,7 +83,7 @@ where
type ResultTokenStream = StemmerTokenStream<TailTokenStream>;
fn transform(&self, token_stream: TailTokenStream) -> Self::ResultTokenStream {
let inner_stemmer = rust_stemmers::Stemmer::create(self.stemmer_algorithm);
let inner_stemmer = rust_stemmers::Stemmer::create(Algorithm::English);
StemmerTokenStream::wrap(inner_stemmer, token_stream)
}
}
@@ -108,14 +109,15 @@ where
}
fn advance(&mut self) -> bool {
if !self.tail.advance() {
return false;
if self.tail.advance() {
// TODO remove allocation
let stemmed_str: String = self.stemmer.stem(&self.token().text).into_owned();
self.token_mut().text.clear();
self.token_mut().text.push_str(&stemmed_str);
true
} else {
false
}
// TODO remove allocation
let stemmed_str: String = self.stemmer.stem(&self.token().text).into_owned();
self.token_mut().text.clear();
self.token_mut().text.push_str(&stemmed_str);
true
}
}

View File

@@ -104,6 +104,7 @@ where
return true;
}
}
false
}
}

View File

@@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, RwLock};
use tokenizer::box_tokenizer;
use tokenizer::stemmer::Language;
@@ -47,8 +46,7 @@ impl TokenizerManager {
.read()
.expect("Acquiring the lock should never fail")
.get(tokenizer_name)
.map(Deref::deref)
.map(BoxedTokenizer::boxed_clone)
.map(|boxed_tokenizer| boxed_tokenizer.boxed_clone())
}
}