Compare commits

..

3 Commits

Author SHA1 Message Date
Paul Masurel
a7a98b11d7 exploratory 2019-05-22 10:18:53 +09:00
Paul Masurel
a18932165f for_each in union 2019-05-07 08:08:55 +09:00
Paul Masurel
8f82d0b773 Added impl for for_each specific to unions. 2019-05-05 17:31:32 +09:00
22 changed files with 226 additions and 529 deletions

View File

@@ -61,9 +61,6 @@ before_script:
script:
- bash ci/script.sh
after_success:
- cargo doc-upload
before_deploy:
- sh ci/before_deploy.sh

View File

@@ -9,20 +9,14 @@ Tantivy 0.10.0
Minor
---------
- Small simplification of the code.
Calling .freq() or .doc() when .advance() has never been called
Calling .freq() or .doc() when .advance() has never
on segment postings should panic from now on.
- Tokens exceeding `u16::max_value() - 4` chars are discarded silently instead of panicking.
- Fast fields are now preloaded when the `SegmentReader` is created.
- `IndexMeta` is now public. (@hntd187)
- `IndexWriter` `add_document`, `delete_term`. `IndexWriter` is `Sync`, making it possible to use it with a `
Arc<RwLock<IndexWriter>>`. `add_document` and `delete_term` can
only require a read lock. (@pmasurel)
- Introducing `Opstamp` as an expressive type alias for `u64`. (@petr-tik)
- Stamper now relies on `AtomicU64` on all platforms (@petr-tik)
## How to update?
Your existing indexes are usable as is, but you may need some
Your existing indexes are usable as is. Your may or may need some
trivial updates.
### Fast fields

View File

@@ -1,107 +0,0 @@
// # Indexing from different threads.
//
// It is fairly common to have to index from different threads.
// Tantivy forbids to create more than one `IndexWriter` at a time.
//
// This `IndexWriter` itself has its own multithreaded layer, so managing your own
// indexing threads will not help. However, it can still be useful for some applications.
//
// For instance, if preparing documents to send to tantivy before indexing is the bottleneck of
// your application, it is reasonable to have multiple threads.
//
// Another very common reason to want to index from multiple threads, is implementing a webserver
// with CRUD capabilities. The server framework will most likely handle request from
// different threads.
//
// The recommended way to address both of these use case is to wrap your `IndexWriter` into a
// `Arc<RwLock<IndexWriter>>`.
//
// While this is counterintuitive, adding and deleting documents do not require mutability
// over the `IndexWriter`, so several threads will be able to do this operation concurrently.
//
// The example below does not represent an actual real-life use case (who would spawn thread to
// index a single document?), but aims at demonstrating the mechanism that makes indexing
// from several threads possible.
extern crate tempdir;
// ---
// Importing tantivy...
#[macro_use]
extern crate tantivy;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
use tantivy::schema::{Schema, STORED, TEXT};
use tantivy::Opstamp;
use tantivy::{Index, IndexWriter};
fn main() -> tantivy::Result<()> {
// # Defining the schema
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT | STORED);
let body = schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let index_writer: Arc<RwLock<IndexWriter>> = Arc::new(RwLock::new(index.writer(50_000_000)?));
// # First indexing thread.
let index_writer_clone_1 = index_writer.clone();
thread::spawn(move || {
// we index 100 times the document... for the sake of the example.
for i in 0..100 {
let opstamp = {
// A read lock is sufficient here.
let index_writer_rlock = index_writer_clone_1.read().unwrap();
index_writer_rlock.add_document(
doc!(
title => "Of Mice and Men",
body => "A few miles south of Soledad, the Salinas River drops in close to the hillside \
bank and runs deep and green. The water is warm too, for it has slipped twinkling \
over the yellow sands in the sunlight before reaching the narrow pool. On one \
side of the river the golden foothill slopes curve up to the strong and rocky \
Gabilan Mountains, but on the valley side the water is lined with trees—willows \
fresh and green with every spring, carrying in their lower leaf junctures the \
debris of the winters flooding; and sycamores with mottled, white, recumbent \
limbs and branches that arch over the pool"
))
};
println!("add doc {} from thread 1 - opstamp {}", i, opstamp);
thread::sleep(Duration::from_millis(20));
}
});
// # Second indexing thread.
let index_writer_clone_2 = index_writer.clone();
// For convenience, tantivy also comes with a macro to
// reduce the boilerplate above.
thread::spawn(move || {
// we index 100 times the document... for the sake of the example.
for i in 0..100 {
// A read lock is sufficient here.
let opstamp = {
let index_writer_rlock = index_writer_clone_2.read().unwrap();
index_writer_rlock.add_document(doc!(
title => "Manufacturing consent",
body => "Some great book description..."
))
};
println!("add doc {} from thread 2 - opstamp {}", i, opstamp);
thread::sleep(Duration::from_millis(10));
}
});
// # In the main thread, we commit 10 times, once every 500ms.
for _ in 0..10 {
let opstamp: Opstamp = {
// Committing or rollbacking on the other hand requires write lock. This will block other threads.
let mut index_writer_wlock = index_writer.write().unwrap();
index_writer_wlock.commit().unwrap()
};
println!("committed with opstamp {}", opstamp);
thread::sleep(Duration::from_millis(500));
}
Ok(())
}

View File

@@ -2,7 +2,6 @@ use core::SegmentMeta;
use schema::Schema;
use serde_json;
use std::fmt;
use Opstamp;
/// Meta information about the `Index`.
///
@@ -16,7 +15,7 @@ use Opstamp;
pub struct IndexMeta {
pub segments: Vec<SegmentMeta>,
pub schema: Schema,
pub opstamp: Opstamp,
pub opstamp: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
}

View File

@@ -10,7 +10,6 @@ use schema::Schema;
use std::fmt;
use std::path::PathBuf;
use std::result;
use Opstamp;
use Result;
/// A segment is a piece of the index.
@@ -51,7 +50,7 @@ impl Segment {
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),

View File

@@ -5,7 +5,6 @@ use serde;
use std::collections::HashSet;
use std::fmt;
use std::path::PathBuf;
use Opstamp;
lazy_static! {
static ref INVENTORY: Inventory<InnerSegmentMeta> = { Inventory::new() };
@@ -14,7 +13,7 @@ lazy_static! {
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta {
num_deleted_docs: u32,
opstamp: Opstamp,
opstamp: u64,
}
/// `SegmentMeta` contains simple meta information about a segment.
@@ -137,9 +136,9 @@ impl SegmentMeta {
self.max_doc() - self.num_deleted_docs()
}
/// Returns the `Opstamp` of the last delete operation
/// Returns the opstamp of the last delete operation
/// taken in account in this segment.
pub fn delete_opstamp(&self) -> Option<Opstamp> {
pub fn delete_opstamp(&self) -> Option<u64> {
self.tracked
.deletes
.as_ref()
@@ -153,7 +152,7 @@ impl SegmentMeta {
}
#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta {
let delete_meta = DeleteMeta {
num_deleted_docs,
opstamp,

View File

@@ -48,14 +48,14 @@ impl RetryPolicy {
///
/// It is transparently associated to a lock file, that gets deleted
/// on `Drop.` The lock is released automatically on `Drop`.
pub struct DirectoryLock(Box<Drop + Send + Sync + 'static>);
pub struct DirectoryLock(Box<Drop + Send + 'static>);
struct DirectoryLockGuard {
directory: Box<Directory>,
path: PathBuf,
}
impl<T: Drop + Send + Sync + 'static> From<Box<T>> for DirectoryLock {
impl<T: Drop + Send + 'static> From<Box<T>> for DirectoryLock {
fn from(underlying: Box<T>) -> Self {
DirectoryLock(underlying)
}

View File

@@ -142,6 +142,11 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.size_hint()
}
fn append_to_bitset(&mut self, bitset: &mut BitSet) {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.append_to_bitset(bitset);
}
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.count(delete_bitset)
@@ -151,9 +156,4 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.count_including_deleted()
}
fn append_to_bitset(&mut self, bitset: &mut BitSet) {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.append_to_bitset(bitset);
}
}

View File

@@ -2,7 +2,6 @@ use super::operation::DeleteOperation;
use std::mem;
use std::ops::DerefMut;
use std::sync::{Arc, RwLock};
use Opstamp;
// The DeleteQueue is similar in conceptually to a multiple
// consumer single producer broadcast channel.
@@ -185,7 +184,7 @@ impl DeleteCursor {
/// queue are consume and the next get will return None.
/// - the next get will return the first operation with an
/// `opstamp >= target_opstamp`.
pub fn skip_to(&mut self, target_opstamp: Opstamp) {
pub fn skip_to(&mut self, target_opstamp: u64) {
// TODO Can be optimize as we work with block.
while self.is_behind_opstamp(target_opstamp) {
self.advance();
@@ -193,7 +192,7 @@ impl DeleteCursor {
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::wrong_self_convention))]
fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool {
fn is_behind_opstamp(&mut self, target_opstamp: u64) -> bool {
self.get()
.map(|operation| operation.opstamp < target_opstamp)
.unwrap_or(false)

View File

@@ -1,6 +1,5 @@
use std::sync::Arc;
use DocId;
use Opstamp;
// Doc to opstamp is used to identify which
// document should be deleted.
@@ -24,7 +23,7 @@ pub enum DocToOpstampMapping {
}
impl From<Vec<u64>> for DocToOpstampMapping {
fn from(opstamps: Vec<Opstamp>) -> DocToOpstampMapping {
fn from(opstamps: Vec<u64>) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(Arc::new(opstamps))
}
}
@@ -36,7 +35,7 @@ impl DocToOpstampMapping {
//
// The edge case opstamp = some doc opstamp is in practise
// never called.
pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId {
pub fn compute_doc_limit(&self, target_opstamp: u64) -> DocId {
match *self {
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
match doc_opstamps.binary_search(&target_opstamp) {

View File

@@ -30,7 +30,6 @@ use std::ops::Range;
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use Opstamp;
use Result;
// Size of the margin for the heap. A segment is closed when the remaining memory
@@ -100,7 +99,7 @@ pub struct IndexWriter {
delete_queue: DeleteQueue,
stamper: Stamper,
committed_opstamp: Opstamp,
committed_opstamp: u64,
}
/// Open a new index writer. Attempts to acquire a lockfile.
@@ -178,7 +177,7 @@ pub fn compute_deleted_bitset(
segment_reader: &SegmentReader,
delete_cursor: &mut DeleteCursor,
doc_opstamps: &DocToOpstampMapping,
target_opstamp: Opstamp,
target_opstamp: u64,
) -> Result<bool> {
let mut might_have_changed = false;
@@ -220,7 +219,7 @@ pub fn compute_deleted_bitset(
pub fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp,
target_opstamp: u64,
) -> Result<()> {
{
if segment_entry.meta().delete_opstamp() == Some(target_opstamp) {
@@ -300,11 +299,11 @@ fn index_documents(
// the worker thread.
assert!(num_docs > 0);
let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?;
let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
let segment_meta = SegmentMeta::new(segment_id, num_docs);
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());
let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
let delete_bitset_opt = if delete_cursor.get().is_some() {
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
@@ -495,7 +494,7 @@ impl IndexWriter {
/// state as it was after the last commit.
///
/// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<Opstamp> {
pub fn rollback(&mut self) -> Result<()> {
info!("Rolling back to opstamp {}", self.committed_opstamp);
// marks the segment updater as killed. From now on, all
@@ -530,7 +529,7 @@ impl IndexWriter {
// was dropped with the index_writer.
for _ in document_receiver.clone() {}
Ok(self.committed_opstamp)
Ok(())
}
/// Prepares a commit.
@@ -568,7 +567,7 @@ impl IndexWriter {
info!("Preparing commit");
// this will drop the current document channel
// and recreate a new one.
// and recreate a new one channels.
self.recreate_document_channel();
let former_workers_join_handle = mem::replace(&mut self.workers_join_handle, Vec::new());
@@ -602,7 +601,7 @@ impl IndexWriter {
/// Commit returns the `opstamp` of the last document
/// that made it in the commit.
///
pub fn commit(&mut self) -> Result<Opstamp> {
pub fn commit(&mut self) -> Result<u64> {
self.prepare_commit()?.commit()
}
@@ -618,7 +617,7 @@ impl IndexWriter {
///
/// Like adds, the deletion itself will be visible
/// only after calling `commit()`.
pub fn delete_term(&self, term: Term) -> Opstamp {
pub fn delete_term(&mut self, term: Term) -> u64 {
let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation { opstamp, term };
self.delete_queue.push(delete_operation);
@@ -632,7 +631,7 @@ impl IndexWriter {
///
/// This is also the opstamp of the commit that is currently
/// available for searchers.
pub fn commit_opstamp(&self) -> Opstamp {
pub fn commit_opstamp(&self) -> u64 {
self.committed_opstamp
}
@@ -646,7 +645,7 @@ impl IndexWriter {
///
/// Currently it represents the number of documents that
/// have been added since the creation of the index.
pub fn add_document(&self, document: Document) -> Opstamp {
pub fn add_document(&mut self, document: Document) -> u64 {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document };
let send_result = self.operation_sender.send(vec![add_operation]);
@@ -663,7 +662,7 @@ impl IndexWriter {
/// The total number of stamps generated by this method is `count + 1`;
/// each operation gets a stamp from the `stamps` iterator and `last_opstamp`
/// is for the batch itself.
fn get_batch_opstamps(&self, count: Opstamp) -> (Opstamp, Range<Opstamp>) {
fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range<u64>) {
let Range { start, end } = self.stamper.stamps(count + 1u64);
let last_opstamp = end - 1;
let stamps = Range {
@@ -689,7 +688,7 @@ impl IndexWriter {
/// Like adds and deletes (see `IndexWriter.add_document` and
/// `IndexWriter.delete_term`), the changes made by calling `run` will be
/// visible to readers only after calling `commit()`.
pub fn run(&self, user_operations: Vec<UserOperation>) -> Opstamp {
pub fn run(&mut self, user_operations: Vec<UserOperation>) -> u64 {
let count = user_operations.len() as u64;
if count == 0 {
return self.stamper.stamp();
@@ -740,7 +739,7 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let operations = vec![
UserOperation::Add(doc!(text_field=>"a")),
UserOperation::Add(doc!(text_field=>"b")),
@@ -802,7 +801,7 @@ mod tests {
fn test_empty_operations_group() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(3_000_000).unwrap();
let mut index_writer = index.writer(3_000_000).unwrap();
let operations1 = vec![];
let batch_opstamp1 = index_writer.run(operations1);
assert_eq!(batch_opstamp1, 0u64);

View File

@@ -1,6 +1,5 @@
use census::{Inventory, TrackedObject};
use std::collections::HashSet;
use Opstamp;
use SegmentId;
#[derive(Default)]
@@ -18,8 +17,8 @@ impl MergeOperationInventory {
}
}
/// A `MergeOperation` has two roles.
/// It carries all of the information required to describe a merge:
/// A `MergeOperation` has two role.
/// It carries all of the information required to describe a merge :
/// - `target_opstamp` is the opstamp up to which we want to consume the
/// delete queue and reflect their deletes.
/// - `segment_ids` is the list of segment to be merged.
@@ -36,14 +35,14 @@ pub struct MergeOperation {
}
struct InnerMergeOperation {
target_opstamp: Opstamp,
target_opstamp: u64,
segment_ids: Vec<SegmentId>,
}
impl MergeOperation {
pub fn new(
inventory: &MergeOperationInventory,
target_opstamp: Opstamp,
target_opstamp: u64,
segment_ids: Vec<SegmentId>,
) -> MergeOperation {
let inner_merge_operation = InnerMergeOperation {
@@ -55,7 +54,7 @@ impl MergeOperation {
}
}
pub fn target_opstamp(&self) -> Opstamp {
pub fn target_opstamp(&self) -> u64 {
self.inner.target_opstamp
}

View File

@@ -1,18 +1,17 @@
use schema::Document;
use schema::Term;
use Opstamp;
/// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct DeleteOperation {
pub opstamp: Opstamp,
pub opstamp: u64,
pub term: Term,
}
/// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)]
pub struct AddOperation {
pub opstamp: Opstamp,
pub opstamp: u64,
pub document: Document,
}

View File

@@ -1,16 +1,15 @@
use super::IndexWriter;
use Opstamp;
use Result;
/// A prepared commit
pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter,
payload: Option<String>,
opstamp: Opstamp,
opstamp: u64,
}
impl<'a> PreparedCommit<'a> {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp) -> PreparedCommit {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
PreparedCommit {
index_writer,
payload: None,
@@ -18,7 +17,7 @@ impl<'a> PreparedCommit<'a> {
}
}
pub fn opstamp(&self) -> Opstamp {
pub fn opstamp(&self) -> u64 {
self.opstamp
}
@@ -26,11 +25,11 @@ impl<'a> PreparedCommit<'a> {
self.payload = Some(payload.to_string())
}
pub fn abort(self) -> Result<Opstamp> {
pub fn abort(self) -> Result<()> {
self.index_writer.rollback()
}
pub fn commit(self) -> Result<Opstamp> {
pub fn commit(self) -> Result<u64> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()

View File

@@ -36,7 +36,6 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::thread;
use std::thread::JoinHandle;
use Opstamp;
use Result;
/// Save the index meta file.
@@ -225,7 +224,7 @@ impl SegmentUpdater {
///
/// Tne method returns copies of the segment entries,
/// updated with the delete information.
fn purge_deletes(&self, target_opstamp: Opstamp) -> Result<Vec<SegmentEntry>> {
fn purge_deletes(&self, target_opstamp: u64) -> Result<Vec<SegmentEntry>> {
let mut segment_entries = self.0.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
let segment = self.0.index.segment(segment_entry.meta().clone());
@@ -234,7 +233,7 @@ impl SegmentUpdater {
Ok(segment_entries)
}
pub fn save_metas(&self, opstamp: Opstamp, commit_message: Option<String>) {
pub fn save_metas(&self, opstamp: u64, commit_message: Option<String>) {
if self.is_alive() {
let index = &self.0.index;
let directory = index.directory();
@@ -281,7 +280,7 @@ impl SegmentUpdater {
.garbage_collect(|| self.0.segment_manager.list_files());
}
pub fn commit(&self, opstamp: Opstamp, payload: Option<String>) -> Result<()> {
pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
self.run_async(move |segment_updater| {
if segment_updater.is_alive() {
let segment_entries = segment_updater

View File

@@ -16,7 +16,6 @@ use tokenizer::BoxedTokenizer;
use tokenizer::FacetTokenizer;
use tokenizer::{TokenStream, Tokenizer};
use DocId;
use Opstamp;
use Result;
/// A `SegmentWriter` is in charge of creating segment index from a
@@ -30,7 +29,7 @@ pub struct SegmentWriter {
segment_serializer: SegmentSerializer,
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: FieldNormsWriter,
doc_opstamps: Vec<Opstamp>,
doc_opstamps: Vec<u64>,
tokenizers: Vec<Option<Box<BoxedTokenizer>>>,
}

View File

@@ -1,27 +1,70 @@
use std::ops::Range;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use Opstamp;
/// Stamper provides Opstamps, which is just an auto-increment id to label
/// an operation.
///
/// Cloning does not "fork" the stamp generation. The stamper actually wraps an `Arc`.
// AtomicU64 have not landed in stable.
// For the moment let's just use AtomicUsize on
// x86/64 bit platform, and a mutex on other platform.
#[cfg(target_arch = "x86_64")]
mod archicture_impl {
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Default)]
pub struct AtomicU64Ersatz(AtomicUsize);
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize))
}
pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
self.0.fetch_add(val as usize, order) as u64
}
}
}
#[cfg(not(target_arch = "x86_64"))]
mod archicture_impl {
use std::sync::atomic::Ordering;
/// Under other architecture, we rely on a mutex.
use std::sync::RwLock;
#[derive(Default)]
pub struct AtomicU64Ersatz(RwLock<u64>);
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(RwLock::new(first_opstamp))
}
pub fn fetch_add(&self, incr: u64, _order: Ordering) -> u64 {
let mut lock = self.0.write().unwrap();
let previous_val = *lock;
*lock = previous_val + incr;
previous_val
}
}
}
use self::archicture_impl::AtomicU64Ersatz;
#[derive(Clone, Default)]
pub struct Stamper(Arc<AtomicU64>);
pub struct Stamper(Arc<AtomicU64Ersatz>);
impl Stamper {
pub fn new(first_opstamp: Opstamp) -> Stamper {
Stamper(Arc::new(AtomicU64::new(first_opstamp)))
pub fn new(first_opstamp: u64) -> Stamper {
Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp)))
}
pub fn stamp(&self) -> Opstamp {
pub fn stamp(&self) -> u64 {
self.0.fetch_add(1u64, Ordering::SeqCst) as u64
}
/// Given a desired count `n`, `stamps` returns an iterator that
/// will supply `n` number of u64 stamps.
pub fn stamps(&self, n: u64) -> Range<Opstamp> {
pub fn stamps(&self, n: u64) -> Range<u64> {
let start = self.0.fetch_add(n, Ordering::SeqCst);
Range {
start,
@@ -49,5 +92,4 @@ mod test {
assert_eq!(stamper.stamps(3u64), (12..15));
assert_eq!(stamper.stamp(), 15u64);
}
}

View File

@@ -226,7 +226,7 @@ mod docset;
pub use self::docset::{DocSet, SkipResult};
pub use core::SegmentComponent;
pub use core::{Index, Searcher, Segment, SegmentId, SegmentMeta, IndexMeta};
pub use core::{Index, Searcher, Segment, SegmentId, SegmentMeta};
pub use core::{InvertedIndexReader, SegmentReader};
pub use directory::Directory;
pub use indexer::IndexWriter;
@@ -254,16 +254,6 @@ pub mod merge_policy {
/// as they are added in the segment.
pub type DocId = u32;
/// A u64 assigned to every operation incrementally
///
/// All operations modifying the index receives an monotonic Opstamp.
/// The resulting state of the index is consistent with the opstamp ordering.
///
/// For instance, a commit with opstamp `32_423` will reflect all Add and Delete operations
/// with an opstamp `<= 32_423`. A delete operation with opstamp n will no affect a document added
/// with opstamp `n+1`.
pub type Opstamp = u64;
/// A f32 that represents the relevance of the document to the query
///
/// This is modelled internally as a `f32`. The

View File

@@ -205,332 +205,4 @@ mod tests {
assert_eq!(score_docs(&boolean_query), vec![0.977973, 0.84699446]);
}
}
/*
DoC 0
{
"_index": "test",
"_type": "_doc",
"_id": "0",
"matched": true,
"explanation": {
"value": 6.2610235,
"description": "max of:",
"details": [{
"value": 6.1969156,
"description": "sum of:",
"details": [{
"value": 6.1969156,
"description": "weight(text:оксана in 561) [PerFieldSimilarity], result of:",
"details": [{
"value": 6.1969156,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 5.65998,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 3,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.49766606,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 19.0,
"description": "dl, length of field",
"details": []
}, {
"value": 24.105577,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}]
}, {
"value": 6.2610235,
"description": "sum of:",
"details": [{
"value": 6.2610235,
"description": "weight(title:оксана in 561) [PerFieldSimilarity], result of:",
"details": [{
"value": 6.2610235,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 5.4086657,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 4,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.52617776,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 4.0,
"description": "dl, length of field",
"details": []
}, {
"value": 5.99502,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}]
}]
}
}
doc 2
{
"_index": "test",
"_type": "_doc",
"_id": "2",
"matched": true,
"explanation": {
"value": 11.911896,
"description": "max of:",
"details": [{
"value": 11.911896,
"description": "sum of:",
"details": [{
"value": 5.4068284,
"description": "weight(title:оксана in 0) [PerFieldSimilarity], result of:",
"details": [{
"value": 5.4068284,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 5.4086657,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 4,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.45439103,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 6.0,
"description": "dl, length of field",
"details": []
}, {
"value": 5.99502,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}, {
"value": 6.505067,
"description": "weight(title:лифенко in 0) [PerFieldSimilarity], result of:",
"details": [{
"value": 6.505067,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 6.5072775,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 1,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.45439103,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 6.0,
"description": "dl, length of field",
"details": []
}, {
"value": 5.99502,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}]
}]
}
}
*/
// motivated by #554
#[test]
fn test_bm25_several_fields() {
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT);
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(
// tf = 1 0
title => "Законы притяжения Оксана Кулакова",
// tf = 1 0
text => "Законы притяжения Оксана Кулакова] \n\nТема: Сексуальное искусство, Женственность\nТип товара: Запись вебинара (аудио)\nПродолжительность: 1,5 часа\n\nСсылка на вебинар:\n ",
));
index_writer.add_document(doc!(
// tf = 1 0
title => "Любимые русские пироги (Оксана Путан)",
// tf = 2 0
text => "http://i95.fastpic.ru/big/2017/0628/9a/615b9c8504d94a3893d7f496ac53539a.jpg \n\nОт издателя\nОксана Путан профессиональный повар, автор кулинарных книг и известный кулинарный блогер. Ее рецепты отличаются практичностью, доступностью и пользуются огромной популярностью в русскоязычном интернете. Это третья книга автора о самом вкусном и ароматном настоящих русских пирогах и выпечке!\nДаже новички на кухне легко готовят по ее рецептам. Оксана описывает процесс приготовления настолько подробно и понятно, что вам остается только наслаждаться готовкой и не тратить время на лишние усилия. Готовьте легко и просто!\n\nhttps://www.ozon.ru/context/detail/id/139872462/"
));
index_writer.add_document(doc!(
// tf = 1 1
title => "PDF Мастер Класс \"Морячок\" (Оксана Лифенко)",
// tf = 0 0
text => "https://i.ibb.co/pzvHrDN/I3d U T6 Gg TM.jpg\nhttps://i.ibb.co/NFrb6v6/N0ls Z9nwjb U.jpg\nВ описание входит штаны, кофта, берет, матросский воротник. Описание продается в формате PDF, состоит из 12 страниц формата А4 и может быть напечатано на любом принтере.\nОписание предназначено для кукол BJD RealPuki от FairyLand, но может подойти и другим подобным куклам. Также вы можете вязать этот наряд из обычной пряжи, и он подойдет для куколок побольше.\nhttps://vk.com/market 95724412?w=product 95724412_2212"
));
for _ in 0..1_000 {
index_writer.add_document(doc!(
title => "a b d e f g",
text => "maitre corbeau sur un arbre perche tenait dans son bec un fromage Maitre rnard par lodeur alleche lui tint a peu pres ce langage."
));
}
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![title, text]);
let query = query_parser
.parse_query("Оксана Лифенко")
.unwrap();
let weight = query.weight(&searcher, true).unwrap();
let mut scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
// let mut scores = vec![];
// while
println!("=====|");
scorer.advance();
dbg!("scorer.score()");
assert!(false);
// scores.push(scorer.score());
// assert_eq!(scores, &[0.8017307, 0.72233325, 1.0300813]);
}
// motivated by #554
#[test]
fn test_bm25_several_fields_bbb() {
let mut schema_builder = Schema::builder();
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(
text => "Законы притяжения Оксана Кулакова] \n\nТема: Сексуальное искусство, Женственность\nТип товара: Запись вебинара (аудио)\nПродолжительность: 1,5 часа\n\nСсылка на вебинар:\n ",
));
index_writer.add_document(doc!(
text => "http://i95.fastpic.ru/big/2017/0628/9a/615b9c8504d94a3893d7f496ac53539a.jpg \n\nОт издателя\nОксана Путан профессиональный повар, автор кулинарных книг и известный кулинарный блогер. Ее рецепты отличаются практичностью, доступностью и пользуются огромной популярностью в русскоязычном интернете. Это третья книга автора о самом вкусном и ароматном настоящих русских пирогах и выпечке!\nДаже новички на кухне легко готовят по ее рецептам. Оксана описывает процесс приготовления настолько подробно и понятно, что вам остается только наслаждаться готовкой и не тратить время на лишние усилия. Готовьте легко и просто!\n\nhttps://www.ozon.ru/context/detail/id/139872462/"
));
index_writer.add_document(doc!(
text => "https://i.ibb.co/pzvHrDN/I3d U T6 Gg TM.jpg\nhttps://i.ibb.co/NFrb6v6/N0ls Z9nwjb U.jpg\nВ описание входит штаны, кофта, берет, матросский воротник. Описание продается в формате PDF, состоит из 12 страниц формата А4 и может быть напечатано на любом принтере.\nОписание предназначено для кукол BJD RealPuki от FairyLand, но может подойти и другим подобным куклам. Также вы можете вязать этот наряд из обычной пряжи, и он подойдет для куколок побольше.\nhttps://vk.com/market 95724412?w=product 95724412_2212"
));
for _ in 0..100 {
index_writer.add_document(doc!(
text => "maitre corbeau sur un arbre perche tenait dans son bec un fromage Maitre rnard par lodeur alleche lui tint a peu pres ce langage."
));
}
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![text]);
let query = query_parser
.parse_query("Оксана Лифенко")
.unwrap();
let weight = query.weight(&searcher, true).unwrap();
let mut scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
let mut scores = vec![];
while scorer.advance() {
scores.push(scorer.score());
}
assert_eq!(scores, &[0.8017307, 0.72233325, 1.0300813]);
index_writer.commit().unwrap();
}
}

View File

@@ -214,6 +214,102 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
}
}
// `ahead` is assumed to be initialized (ahead.advance() has been called at least once,
// and this returned true).
//
// If behind is either uninitialized or `ahead.doc() > behind.doc()`.
fn next_in_intersection<'a, TScorer: Scorer>(
ahead: &'a mut TScorer,
behind: &'a mut TScorer,
) -> Option<DocId> {
let candidate = ahead.doc();
match behind.skip_next(candidate) {
SkipResult::Reached => Some(candidate),
SkipResult::OverStep => {
// yeah for tail-recursion
next_in_intersection(behind, ahead)
}
SkipResult::End => None,
}
}
enum SkipResultComplex {
Reached,
Overstep { other_ord: usize, candidate: DocId },
End,
}
fn skip_several_scorers<TDocSet: DocSet>(
others: &mut [TDocSet],
except_candidate_ord: usize,
target: DocId,
) -> SkipResultComplex {
for (ord, docset) in others.iter_mut().enumerate() {
// `candidate_ord` is already at the
// right position.
//
// Calling `skip_next` would advance this docset
// and miss it.
if ord == except_candidate_ord {
continue;
}
match docset.skip_next(target) {
SkipResult::Reached => {}
SkipResult::OverStep => {
return SkipResultComplex::Overstep {
other_ord: ord,
candidate: docset.doc(),
};
}
SkipResult::End => {
return SkipResultComplex::End;
}
}
}
SkipResultComplex::Reached
}
fn for_each<'a, TScorer: Scorer, TOtherscorer: Scorer>(
left: &'a mut TScorer,
right: &'a mut TScorer,
others: &'a mut [TOtherscorer],
callback: &mut FnMut(DocId, Score),
) {
let mut other_candidate_ord: usize = usize::max_value();
if !left.advance() {
return;
}
while let Some(candidate) = next_in_intersection(left, right) {
// test the remaining scorers
match skip_several_scorers(others, other_candidate_ord, candidate) {
SkipResultComplex::Reached => {
let intersection_score: Score = left.score()
+ right.score()
+ others.iter_mut().map(|other| other.score()).sum::<Score>();
callback(candidate, intersection_score);
if !left.advance() {
return;
}
}
SkipResultComplex::Overstep {
other_ord,
candidate,
} => match left.skip_next(candidate) {
SkipResult::End => {
return;
}
SkipResult::Reached => {
other_candidate_ord = other_ord;
}
SkipResult::OverStep => other_candidate_ord = usize::max_value(),
},
SkipResultComplex::End => {
return;
}
}
}
}
impl<TScorer, TOtherScorer> Scorer for Intersection<TScorer, TOtherScorer>
where
TScorer: Scorer,
@@ -224,6 +320,10 @@ where
+ self.right.score()
+ self.others.iter_mut().map(Scorer::score).sum::<Score>()
}
fn for_each(&mut self, callback: &mut FnMut(DocId, Score)) {
for_each(&mut self.left, &mut self.right, &mut self.others, callback);
}
}
#[cfg(test)]

View File

@@ -16,6 +16,9 @@ pub trait Scorer: downcast_rs::Downcast + DocSet + 'static {
/// Iterates through all of the document matched by the DocSet
/// `DocSet` and push the scored documents to the collector.
///
/// This method assumes that the Scorer is brand new, and `.advance()`
/// and `.skip()` haven't been called yet.
fn for_each(&mut self, callback: &mut FnMut(DocId, Score)) {
while self.advance() {
callback(self.doc(), self.score());

View File

@@ -260,6 +260,23 @@ where
fn score(&mut self) -> Score {
self.score
}
fn for_each(&mut self, callback: &mut FnMut(DocId, Score)) {
// TODO how do we deal with the fact that people may have called .advance() before.
while self.refill() {
let offset = self.offset;
for cursor in 0..HORIZON_NUM_TINYBITSETS {
while let Some(val) = self.bitsets[cursor].pop_lowest() {
let delta = val + (cursor as u32) * 64;
let doc = offset + delta;
let score_combiner = &mut self.scores[delta as usize];
let score = score_combiner.score();
callback(doc, score);
score_combiner.clear();
}
}
}
}
}
#[cfg(test)]