Compare commits

..

3 Commits

Author SHA1 Message Date
Paul Masurel
a7a98b11d7 exploratory 2019-05-22 10:18:53 +09:00
Paul Masurel
a18932165f for_each in union 2019-05-07 08:08:55 +09:00
Paul Masurel
8f82d0b773 Added impl for for_each specific to unions. 2019-05-05 17:31:32 +09:00
22 changed files with 226 additions and 529 deletions

View File

@@ -61,9 +61,6 @@ before_script:
script: script:
- bash ci/script.sh - bash ci/script.sh
after_success:
- cargo doc-upload
before_deploy: before_deploy:
- sh ci/before_deploy.sh - sh ci/before_deploy.sh

View File

@@ -9,20 +9,14 @@ Tantivy 0.10.0
Minor Minor
--------- ---------
- Small simplification of the code. - Small simplification of the code.
Calling .freq() or .doc() when .advance() has never been called Calling .freq() or .doc() when .advance() has never
on segment postings should panic from now on. on segment postings should panic from now on.
- Tokens exceeding `u16::max_value() - 4` chars are discarded silently instead of panicking. - Tokens exceeding `u16::max_value() - 4` chars are discarded silently instead of panicking.
- Fast fields are now preloaded when the `SegmentReader` is created. - Fast fields are now preloaded when the `SegmentReader` is created.
- `IndexMeta` is now public. (@hntd187)
- `IndexWriter` `add_document`, `delete_term`. `IndexWriter` is `Sync`, making it possible to use it with a `
Arc<RwLock<IndexWriter>>`. `add_document` and `delete_term` can
only require a read lock. (@pmasurel)
- Introducing `Opstamp` as an expressive type alias for `u64`. (@petr-tik)
- Stamper now relies on `AtomicU64` on all platforms (@petr-tik)
## How to update? ## How to update?
Your existing indexes are usable as is, but you may need some Your existing indexes are usable as is. Your may or may need some
trivial updates. trivial updates.
### Fast fields ### Fast fields

View File

@@ -1,107 +0,0 @@
// # Indexing from different threads.
//
// It is fairly common to have to index from different threads.
// Tantivy forbids to create more than one `IndexWriter` at a time.
//
// This `IndexWriter` itself has its own multithreaded layer, so managing your own
// indexing threads will not help. However, it can still be useful for some applications.
//
// For instance, if preparing documents to send to tantivy before indexing is the bottleneck of
// your application, it is reasonable to have multiple threads.
//
// Another very common reason to want to index from multiple threads, is implementing a webserver
// with CRUD capabilities. The server framework will most likely handle request from
// different threads.
//
// The recommended way to address both of these use case is to wrap your `IndexWriter` into a
// `Arc<RwLock<IndexWriter>>`.
//
// While this is counterintuitive, adding and deleting documents do not require mutability
// over the `IndexWriter`, so several threads will be able to do this operation concurrently.
//
// The example below does not represent an actual real-life use case (who would spawn thread to
// index a single document?), but aims at demonstrating the mechanism that makes indexing
// from several threads possible.
extern crate tempdir;
// ---
// Importing tantivy...
#[macro_use]
extern crate tantivy;
use std::sync::{Arc, RwLock};
use std::thread;
use std::time::Duration;
use tantivy::schema::{Schema, STORED, TEXT};
use tantivy::Opstamp;
use tantivy::{Index, IndexWriter};
fn main() -> tantivy::Result<()> {
// # Defining the schema
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT | STORED);
let body = schema_builder.add_text_field("body", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let index_writer: Arc<RwLock<IndexWriter>> = Arc::new(RwLock::new(index.writer(50_000_000)?));
// # First indexing thread.
let index_writer_clone_1 = index_writer.clone();
thread::spawn(move || {
// we index 100 times the document... for the sake of the example.
for i in 0..100 {
let opstamp = {
// A read lock is sufficient here.
let index_writer_rlock = index_writer_clone_1.read().unwrap();
index_writer_rlock.add_document(
doc!(
title => "Of Mice and Men",
body => "A few miles south of Soledad, the Salinas River drops in close to the hillside \
bank and runs deep and green. The water is warm too, for it has slipped twinkling \
over the yellow sands in the sunlight before reaching the narrow pool. On one \
side of the river the golden foothill slopes curve up to the strong and rocky \
Gabilan Mountains, but on the valley side the water is lined with trees—willows \
fresh and green with every spring, carrying in their lower leaf junctures the \
debris of the winters flooding; and sycamores with mottled, white, recumbent \
limbs and branches that arch over the pool"
))
};
println!("add doc {} from thread 1 - opstamp {}", i, opstamp);
thread::sleep(Duration::from_millis(20));
}
});
// # Second indexing thread.
let index_writer_clone_2 = index_writer.clone();
// For convenience, tantivy also comes with a macro to
// reduce the boilerplate above.
thread::spawn(move || {
// we index 100 times the document... for the sake of the example.
for i in 0..100 {
// A read lock is sufficient here.
let opstamp = {
let index_writer_rlock = index_writer_clone_2.read().unwrap();
index_writer_rlock.add_document(doc!(
title => "Manufacturing consent",
body => "Some great book description..."
))
};
println!("add doc {} from thread 2 - opstamp {}", i, opstamp);
thread::sleep(Duration::from_millis(10));
}
});
// # In the main thread, we commit 10 times, once every 500ms.
for _ in 0..10 {
let opstamp: Opstamp = {
// Committing or rollbacking on the other hand requires write lock. This will block other threads.
let mut index_writer_wlock = index_writer.write().unwrap();
index_writer_wlock.commit().unwrap()
};
println!("committed with opstamp {}", opstamp);
thread::sleep(Duration::from_millis(500));
}
Ok(())
}

View File

@@ -2,7 +2,6 @@ use core::SegmentMeta;
use schema::Schema; use schema::Schema;
use serde_json; use serde_json;
use std::fmt; use std::fmt;
use Opstamp;
/// Meta information about the `Index`. /// Meta information about the `Index`.
/// ///
@@ -16,7 +15,7 @@ use Opstamp;
pub struct IndexMeta { pub struct IndexMeta {
pub segments: Vec<SegmentMeta>, pub segments: Vec<SegmentMeta>,
pub schema: Schema, pub schema: Schema,
pub opstamp: Opstamp, pub opstamp: u64,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>, pub payload: Option<String>,
} }

View File

@@ -10,7 +10,6 @@ use schema::Schema;
use std::fmt; use std::fmt;
use std::path::PathBuf; use std::path::PathBuf;
use std::result; use std::result;
use Opstamp;
use Result; use Result;
/// A segment is a piece of the index. /// A segment is a piece of the index.
@@ -51,7 +50,7 @@ impl Segment {
} }
#[doc(hidden)] #[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment { pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment {
Segment { Segment {
index: self.index, index: self.index,
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp), meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),

View File

@@ -5,7 +5,6 @@ use serde;
use std::collections::HashSet; use std::collections::HashSet;
use std::fmt; use std::fmt;
use std::path::PathBuf; use std::path::PathBuf;
use Opstamp;
lazy_static! { lazy_static! {
static ref INVENTORY: Inventory<InnerSegmentMeta> = { Inventory::new() }; static ref INVENTORY: Inventory<InnerSegmentMeta> = { Inventory::new() };
@@ -14,7 +13,7 @@ lazy_static! {
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta { struct DeleteMeta {
num_deleted_docs: u32, num_deleted_docs: u32,
opstamp: Opstamp, opstamp: u64,
} }
/// `SegmentMeta` contains simple meta information about a segment. /// `SegmentMeta` contains simple meta information about a segment.
@@ -137,9 +136,9 @@ impl SegmentMeta {
self.max_doc() - self.num_deleted_docs() self.max_doc() - self.num_deleted_docs()
} }
/// Returns the `Opstamp` of the last delete operation /// Returns the opstamp of the last delete operation
/// taken in account in this segment. /// taken in account in this segment.
pub fn delete_opstamp(&self) -> Option<Opstamp> { pub fn delete_opstamp(&self) -> Option<u64> {
self.tracked self.tracked
.deletes .deletes
.as_ref() .as_ref()
@@ -153,7 +152,7 @@ impl SegmentMeta {
} }
#[doc(hidden)] #[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta { pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta {
let delete_meta = DeleteMeta { let delete_meta = DeleteMeta {
num_deleted_docs, num_deleted_docs,
opstamp, opstamp,

View File

@@ -48,14 +48,14 @@ impl RetryPolicy {
/// ///
/// It is transparently associated to a lock file, that gets deleted /// It is transparently associated to a lock file, that gets deleted
/// on `Drop.` The lock is released automatically on `Drop`. /// on `Drop.` The lock is released automatically on `Drop`.
pub struct DirectoryLock(Box<Drop + Send + Sync + 'static>); pub struct DirectoryLock(Box<Drop + Send + 'static>);
struct DirectoryLockGuard { struct DirectoryLockGuard {
directory: Box<Directory>, directory: Box<Directory>,
path: PathBuf, path: PathBuf,
} }
impl<T: Drop + Send + Sync + 'static> From<Box<T>> for DirectoryLock { impl<T: Drop + Send + 'static> From<Box<T>> for DirectoryLock {
fn from(underlying: Box<T>) -> Self { fn from(underlying: Box<T>) -> Self {
DirectoryLock(underlying) DirectoryLock(underlying)
} }

View File

@@ -142,6 +142,11 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
unboxed.size_hint() unboxed.size_hint()
} }
fn append_to_bitset(&mut self, bitset: &mut BitSet) {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.append_to_bitset(bitset);
}
fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 { fn count(&mut self, delete_bitset: &DeleteBitSet) -> u32 {
let unboxed: &mut TDocSet = self.borrow_mut(); let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.count(delete_bitset) unboxed.count(delete_bitset)
@@ -151,9 +156,4 @@ impl<TDocSet: DocSet + ?Sized> DocSet for Box<TDocSet> {
let unboxed: &mut TDocSet = self.borrow_mut(); let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.count_including_deleted() unboxed.count_including_deleted()
} }
fn append_to_bitset(&mut self, bitset: &mut BitSet) {
let unboxed: &mut TDocSet = self.borrow_mut();
unboxed.append_to_bitset(bitset);
}
} }

View File

@@ -2,7 +2,6 @@ use super::operation::DeleteOperation;
use std::mem; use std::mem;
use std::ops::DerefMut; use std::ops::DerefMut;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use Opstamp;
// The DeleteQueue is similar in conceptually to a multiple // The DeleteQueue is similar in conceptually to a multiple
// consumer single producer broadcast channel. // consumer single producer broadcast channel.
@@ -185,7 +184,7 @@ impl DeleteCursor {
/// queue are consume and the next get will return None. /// queue are consume and the next get will return None.
/// - the next get will return the first operation with an /// - the next get will return the first operation with an
/// `opstamp >= target_opstamp`. /// `opstamp >= target_opstamp`.
pub fn skip_to(&mut self, target_opstamp: Opstamp) { pub fn skip_to(&mut self, target_opstamp: u64) {
// TODO Can be optimize as we work with block. // TODO Can be optimize as we work with block.
while self.is_behind_opstamp(target_opstamp) { while self.is_behind_opstamp(target_opstamp) {
self.advance(); self.advance();
@@ -193,7 +192,7 @@ impl DeleteCursor {
} }
#[cfg_attr(feature = "cargo-clippy", allow(clippy::wrong_self_convention))] #[cfg_attr(feature = "cargo-clippy", allow(clippy::wrong_self_convention))]
fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool { fn is_behind_opstamp(&mut self, target_opstamp: u64) -> bool {
self.get() self.get()
.map(|operation| operation.opstamp < target_opstamp) .map(|operation| operation.opstamp < target_opstamp)
.unwrap_or(false) .unwrap_or(false)

View File

@@ -1,6 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use DocId; use DocId;
use Opstamp;
// Doc to opstamp is used to identify which // Doc to opstamp is used to identify which
// document should be deleted. // document should be deleted.
@@ -24,7 +23,7 @@ pub enum DocToOpstampMapping {
} }
impl From<Vec<u64>> for DocToOpstampMapping { impl From<Vec<u64>> for DocToOpstampMapping {
fn from(opstamps: Vec<Opstamp>) -> DocToOpstampMapping { fn from(opstamps: Vec<u64>) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(Arc::new(opstamps)) DocToOpstampMapping::WithMap(Arc::new(opstamps))
} }
} }
@@ -36,7 +35,7 @@ impl DocToOpstampMapping {
// //
// The edge case opstamp = some doc opstamp is in practise // The edge case opstamp = some doc opstamp is in practise
// never called. // never called.
pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId { pub fn compute_doc_limit(&self, target_opstamp: u64) -> DocId {
match *self { match *self {
DocToOpstampMapping::WithMap(ref doc_opstamps) => { DocToOpstampMapping::WithMap(ref doc_opstamps) => {
match doc_opstamps.binary_search(&target_opstamp) { match doc_opstamps.binary_search(&target_opstamp) {

View File

@@ -30,7 +30,6 @@ use std::ops::Range;
use std::sync::Arc; use std::sync::Arc;
use std::thread; use std::thread;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use Opstamp;
use Result; use Result;
// Size of the margin for the heap. A segment is closed when the remaining memory // Size of the margin for the heap. A segment is closed when the remaining memory
@@ -100,7 +99,7 @@ pub struct IndexWriter {
delete_queue: DeleteQueue, delete_queue: DeleteQueue,
stamper: Stamper, stamper: Stamper,
committed_opstamp: Opstamp, committed_opstamp: u64,
} }
/// Open a new index writer. Attempts to acquire a lockfile. /// Open a new index writer. Attempts to acquire a lockfile.
@@ -178,7 +177,7 @@ pub fn compute_deleted_bitset(
segment_reader: &SegmentReader, segment_reader: &SegmentReader,
delete_cursor: &mut DeleteCursor, delete_cursor: &mut DeleteCursor,
doc_opstamps: &DocToOpstampMapping, doc_opstamps: &DocToOpstampMapping,
target_opstamp: Opstamp, target_opstamp: u64,
) -> Result<bool> { ) -> Result<bool> {
let mut might_have_changed = false; let mut might_have_changed = false;
@@ -220,7 +219,7 @@ pub fn compute_deleted_bitset(
pub fn advance_deletes( pub fn advance_deletes(
mut segment: Segment, mut segment: Segment,
segment_entry: &mut SegmentEntry, segment_entry: &mut SegmentEntry,
target_opstamp: Opstamp, target_opstamp: u64,
) -> Result<()> { ) -> Result<()> {
{ {
if segment_entry.meta().delete_opstamp() == Some(target_opstamp) { if segment_entry.meta().delete_opstamp() == Some(target_opstamp) {
@@ -300,11 +299,11 @@ fn index_documents(
// the worker thread. // the worker thread.
assert!(num_docs > 0); assert!(num_docs > 0);
let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?; let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
let segment_meta = SegmentMeta::new(segment_id, num_docs); let segment_meta = SegmentMeta::new(segment_id, num_docs);
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap()); let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
let delete_bitset_opt = if delete_cursor.get().is_some() { let delete_bitset_opt = if delete_cursor.get().is_some() {
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps); let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
@@ -495,7 +494,7 @@ impl IndexWriter {
/// state as it was after the last commit. /// state as it was after the last commit.
/// ///
/// The opstamp at the last commit is returned. /// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<Opstamp> { pub fn rollback(&mut self) -> Result<()> {
info!("Rolling back to opstamp {}", self.committed_opstamp); info!("Rolling back to opstamp {}", self.committed_opstamp);
// marks the segment updater as killed. From now on, all // marks the segment updater as killed. From now on, all
@@ -530,7 +529,7 @@ impl IndexWriter {
// was dropped with the index_writer. // was dropped with the index_writer.
for _ in document_receiver.clone() {} for _ in document_receiver.clone() {}
Ok(self.committed_opstamp) Ok(())
} }
/// Prepares a commit. /// Prepares a commit.
@@ -568,7 +567,7 @@ impl IndexWriter {
info!("Preparing commit"); info!("Preparing commit");
// this will drop the current document channel // this will drop the current document channel
// and recreate a new one. // and recreate a new one channels.
self.recreate_document_channel(); self.recreate_document_channel();
let former_workers_join_handle = mem::replace(&mut self.workers_join_handle, Vec::new()); let former_workers_join_handle = mem::replace(&mut self.workers_join_handle, Vec::new());
@@ -602,7 +601,7 @@ impl IndexWriter {
/// Commit returns the `opstamp` of the last document /// Commit returns the `opstamp` of the last document
/// that made it in the commit. /// that made it in the commit.
/// ///
pub fn commit(&mut self) -> Result<Opstamp> { pub fn commit(&mut self) -> Result<u64> {
self.prepare_commit()?.commit() self.prepare_commit()?.commit()
} }
@@ -618,7 +617,7 @@ impl IndexWriter {
/// ///
/// Like adds, the deletion itself will be visible /// Like adds, the deletion itself will be visible
/// only after calling `commit()`. /// only after calling `commit()`.
pub fn delete_term(&self, term: Term) -> Opstamp { pub fn delete_term(&mut self, term: Term) -> u64 {
let opstamp = self.stamper.stamp(); let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation { opstamp, term }; let delete_operation = DeleteOperation { opstamp, term };
self.delete_queue.push(delete_operation); self.delete_queue.push(delete_operation);
@@ -632,7 +631,7 @@ impl IndexWriter {
/// ///
/// This is also the opstamp of the commit that is currently /// This is also the opstamp of the commit that is currently
/// available for searchers. /// available for searchers.
pub fn commit_opstamp(&self) -> Opstamp { pub fn commit_opstamp(&self) -> u64 {
self.committed_opstamp self.committed_opstamp
} }
@@ -646,7 +645,7 @@ impl IndexWriter {
/// ///
/// Currently it represents the number of documents that /// Currently it represents the number of documents that
/// have been added since the creation of the index. /// have been added since the creation of the index.
pub fn add_document(&self, document: Document) -> Opstamp { pub fn add_document(&mut self, document: Document) -> u64 {
let opstamp = self.stamper.stamp(); let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document }; let add_operation = AddOperation { opstamp, document };
let send_result = self.operation_sender.send(vec![add_operation]); let send_result = self.operation_sender.send(vec![add_operation]);
@@ -663,7 +662,7 @@ impl IndexWriter {
/// The total number of stamps generated by this method is `count + 1`; /// The total number of stamps generated by this method is `count + 1`;
/// each operation gets a stamp from the `stamps` iterator and `last_opstamp` /// each operation gets a stamp from the `stamps` iterator and `last_opstamp`
/// is for the batch itself. /// is for the batch itself.
fn get_batch_opstamps(&self, count: Opstamp) -> (Opstamp, Range<Opstamp>) { fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range<u64>) {
let Range { start, end } = self.stamper.stamps(count + 1u64); let Range { start, end } = self.stamper.stamps(count + 1u64);
let last_opstamp = end - 1; let last_opstamp = end - 1;
let stamps = Range { let stamps = Range {
@@ -689,7 +688,7 @@ impl IndexWriter {
/// Like adds and deletes (see `IndexWriter.add_document` and /// Like adds and deletes (see `IndexWriter.add_document` and
/// `IndexWriter.delete_term`), the changes made by calling `run` will be /// `IndexWriter.delete_term`), the changes made by calling `run` will be
/// visible to readers only after calling `commit()`. /// visible to readers only after calling `commit()`.
pub fn run(&self, user_operations: Vec<UserOperation>) -> Opstamp { pub fn run(&mut self, user_operations: Vec<UserOperation>) -> u64 {
let count = user_operations.len() as u64; let count = user_operations.len() as u64;
if count == 0 { if count == 0 {
return self.stamper.stamp(); return self.stamper.stamp();
@@ -740,7 +739,7 @@ mod tests {
let mut schema_builder = schema::Schema::builder(); let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", schema::TEXT); let text_field = schema_builder.add_text_field("text", schema::TEXT);
let index = Index::create_in_ram(schema_builder.build()); let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap(); let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
let operations = vec![ let operations = vec![
UserOperation::Add(doc!(text_field=>"a")), UserOperation::Add(doc!(text_field=>"a")),
UserOperation::Add(doc!(text_field=>"b")), UserOperation::Add(doc!(text_field=>"b")),
@@ -802,7 +801,7 @@ mod tests {
fn test_empty_operations_group() { fn test_empty_operations_group() {
let schema_builder = schema::Schema::builder(); let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build()); let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(3_000_000).unwrap(); let mut index_writer = index.writer(3_000_000).unwrap();
let operations1 = vec![]; let operations1 = vec![];
let batch_opstamp1 = index_writer.run(operations1); let batch_opstamp1 = index_writer.run(operations1);
assert_eq!(batch_opstamp1, 0u64); assert_eq!(batch_opstamp1, 0u64);

View File

@@ -1,6 +1,5 @@
use census::{Inventory, TrackedObject}; use census::{Inventory, TrackedObject};
use std::collections::HashSet; use std::collections::HashSet;
use Opstamp;
use SegmentId; use SegmentId;
#[derive(Default)] #[derive(Default)]
@@ -18,8 +17,8 @@ impl MergeOperationInventory {
} }
} }
/// A `MergeOperation` has two roles. /// A `MergeOperation` has two role.
/// It carries all of the information required to describe a merge: /// It carries all of the information required to describe a merge :
/// - `target_opstamp` is the opstamp up to which we want to consume the /// - `target_opstamp` is the opstamp up to which we want to consume the
/// delete queue and reflect their deletes. /// delete queue and reflect their deletes.
/// - `segment_ids` is the list of segment to be merged. /// - `segment_ids` is the list of segment to be merged.
@@ -36,14 +35,14 @@ pub struct MergeOperation {
} }
struct InnerMergeOperation { struct InnerMergeOperation {
target_opstamp: Opstamp, target_opstamp: u64,
segment_ids: Vec<SegmentId>, segment_ids: Vec<SegmentId>,
} }
impl MergeOperation { impl MergeOperation {
pub fn new( pub fn new(
inventory: &MergeOperationInventory, inventory: &MergeOperationInventory,
target_opstamp: Opstamp, target_opstamp: u64,
segment_ids: Vec<SegmentId>, segment_ids: Vec<SegmentId>,
) -> MergeOperation { ) -> MergeOperation {
let inner_merge_operation = InnerMergeOperation { let inner_merge_operation = InnerMergeOperation {
@@ -55,7 +54,7 @@ impl MergeOperation {
} }
} }
pub fn target_opstamp(&self) -> Opstamp { pub fn target_opstamp(&self) -> u64 {
self.inner.target_opstamp self.inner.target_opstamp
} }

View File

@@ -1,18 +1,17 @@
use schema::Document; use schema::Document;
use schema::Term; use schema::Term;
use Opstamp;
/// Timestamped Delete operation. /// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)] #[derive(Clone, Eq, PartialEq, Debug)]
pub struct DeleteOperation { pub struct DeleteOperation {
pub opstamp: Opstamp, pub opstamp: u64,
pub term: Term, pub term: Term,
} }
/// Timestamped Add operation. /// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)] #[derive(Eq, PartialEq, Debug)]
pub struct AddOperation { pub struct AddOperation {
pub opstamp: Opstamp, pub opstamp: u64,
pub document: Document, pub document: Document,
} }

View File

@@ -1,16 +1,15 @@
use super::IndexWriter; use super::IndexWriter;
use Opstamp;
use Result; use Result;
/// A prepared commit /// A prepared commit
pub struct PreparedCommit<'a> { pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter, index_writer: &'a mut IndexWriter,
payload: Option<String>, payload: Option<String>,
opstamp: Opstamp, opstamp: u64,
} }
impl<'a> PreparedCommit<'a> { impl<'a> PreparedCommit<'a> {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp) -> PreparedCommit { pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
PreparedCommit { PreparedCommit {
index_writer, index_writer,
payload: None, payload: None,
@@ -18,7 +17,7 @@ impl<'a> PreparedCommit<'a> {
} }
} }
pub fn opstamp(&self) -> Opstamp { pub fn opstamp(&self) -> u64 {
self.opstamp self.opstamp
} }
@@ -26,11 +25,11 @@ impl<'a> PreparedCommit<'a> {
self.payload = Some(payload.to_string()) self.payload = Some(payload.to_string())
} }
pub fn abort(self) -> Result<Opstamp> { pub fn abort(self) -> Result<()> {
self.index_writer.rollback() self.index_writer.rollback()
} }
pub fn commit(self) -> Result<Opstamp> { pub fn commit(self) -> Result<u64> {
info!("committing {}", self.opstamp); info!("committing {}", self.opstamp);
self.index_writer self.index_writer
.segment_updater() .segment_updater()

View File

@@ -36,7 +36,6 @@ use std::sync::Arc;
use std::sync::RwLock; use std::sync::RwLock;
use std::thread; use std::thread;
use std::thread::JoinHandle; use std::thread::JoinHandle;
use Opstamp;
use Result; use Result;
/// Save the index meta file. /// Save the index meta file.
@@ -225,7 +224,7 @@ impl SegmentUpdater {
/// ///
/// Tne method returns copies of the segment entries, /// Tne method returns copies of the segment entries,
/// updated with the delete information. /// updated with the delete information.
fn purge_deletes(&self, target_opstamp: Opstamp) -> Result<Vec<SegmentEntry>> { fn purge_deletes(&self, target_opstamp: u64) -> Result<Vec<SegmentEntry>> {
let mut segment_entries = self.0.segment_manager.segment_entries(); let mut segment_entries = self.0.segment_manager.segment_entries();
for segment_entry in &mut segment_entries { for segment_entry in &mut segment_entries {
let segment = self.0.index.segment(segment_entry.meta().clone()); let segment = self.0.index.segment(segment_entry.meta().clone());
@@ -234,7 +233,7 @@ impl SegmentUpdater {
Ok(segment_entries) Ok(segment_entries)
} }
pub fn save_metas(&self, opstamp: Opstamp, commit_message: Option<String>) { pub fn save_metas(&self, opstamp: u64, commit_message: Option<String>) {
if self.is_alive() { if self.is_alive() {
let index = &self.0.index; let index = &self.0.index;
let directory = index.directory(); let directory = index.directory();
@@ -281,7 +280,7 @@ impl SegmentUpdater {
.garbage_collect(|| self.0.segment_manager.list_files()); .garbage_collect(|| self.0.segment_manager.list_files());
} }
pub fn commit(&self, opstamp: Opstamp, payload: Option<String>) -> Result<()> { pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
self.run_async(move |segment_updater| { self.run_async(move |segment_updater| {
if segment_updater.is_alive() { if segment_updater.is_alive() {
let segment_entries = segment_updater let segment_entries = segment_updater

View File

@@ -16,7 +16,6 @@ use tokenizer::BoxedTokenizer;
use tokenizer::FacetTokenizer; use tokenizer::FacetTokenizer;
use tokenizer::{TokenStream, Tokenizer}; use tokenizer::{TokenStream, Tokenizer};
use DocId; use DocId;
use Opstamp;
use Result; use Result;
/// A `SegmentWriter` is in charge of creating segment index from a /// A `SegmentWriter` is in charge of creating segment index from a
@@ -30,7 +29,7 @@ pub struct SegmentWriter {
segment_serializer: SegmentSerializer, segment_serializer: SegmentSerializer,
fast_field_writers: FastFieldsWriter, fast_field_writers: FastFieldsWriter,
fieldnorms_writer: FieldNormsWriter, fieldnorms_writer: FieldNormsWriter,
doc_opstamps: Vec<Opstamp>, doc_opstamps: Vec<u64>,
tokenizers: Vec<Option<Box<BoxedTokenizer>>>, tokenizers: Vec<Option<Box<BoxedTokenizer>>>,
} }

View File

@@ -1,27 +1,70 @@
use std::ops::Range; use std::ops::Range;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::Ordering;
use std::sync::Arc; use std::sync::Arc;
use Opstamp;
/// Stamper provides Opstamps, which is just an auto-increment id to label // AtomicU64 have not landed in stable.
/// an operation. // For the moment let's just use AtomicUsize on
/// // x86/64 bit platform, and a mutex on other platform.
/// Cloning does not "fork" the stamp generation. The stamper actually wraps an `Arc`. #[cfg(target_arch = "x86_64")]
mod archicture_impl {
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Default)]
pub struct AtomicU64Ersatz(AtomicUsize);
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(AtomicUsize::new(first_opstamp as usize))
}
pub fn fetch_add(&self, val: u64, order: Ordering) -> u64 {
self.0.fetch_add(val as usize, order) as u64
}
}
}
#[cfg(not(target_arch = "x86_64"))]
mod archicture_impl {
use std::sync::atomic::Ordering;
/// Under other architecture, we rely on a mutex.
use std::sync::RwLock;
#[derive(Default)]
pub struct AtomicU64Ersatz(RwLock<u64>);
impl AtomicU64Ersatz {
pub fn new(first_opstamp: u64) -> AtomicU64Ersatz {
AtomicU64Ersatz(RwLock::new(first_opstamp))
}
pub fn fetch_add(&self, incr: u64, _order: Ordering) -> u64 {
let mut lock = self.0.write().unwrap();
let previous_val = *lock;
*lock = previous_val + incr;
previous_val
}
}
}
use self::archicture_impl::AtomicU64Ersatz;
#[derive(Clone, Default)] #[derive(Clone, Default)]
pub struct Stamper(Arc<AtomicU64>); pub struct Stamper(Arc<AtomicU64Ersatz>);
impl Stamper { impl Stamper {
pub fn new(first_opstamp: Opstamp) -> Stamper { pub fn new(first_opstamp: u64) -> Stamper {
Stamper(Arc::new(AtomicU64::new(first_opstamp))) Stamper(Arc::new(AtomicU64Ersatz::new(first_opstamp)))
} }
pub fn stamp(&self) -> Opstamp { pub fn stamp(&self) -> u64 {
self.0.fetch_add(1u64, Ordering::SeqCst) as u64 self.0.fetch_add(1u64, Ordering::SeqCst) as u64
} }
/// Given a desired count `n`, `stamps` returns an iterator that /// Given a desired count `n`, `stamps` returns an iterator that
/// will supply `n` number of u64 stamps. /// will supply `n` number of u64 stamps.
pub fn stamps(&self, n: u64) -> Range<Opstamp> { pub fn stamps(&self, n: u64) -> Range<u64> {
let start = self.0.fetch_add(n, Ordering::SeqCst); let start = self.0.fetch_add(n, Ordering::SeqCst);
Range { Range {
start, start,
@@ -49,5 +92,4 @@ mod test {
assert_eq!(stamper.stamps(3u64), (12..15)); assert_eq!(stamper.stamps(3u64), (12..15));
assert_eq!(stamper.stamp(), 15u64); assert_eq!(stamper.stamp(), 15u64);
} }
} }

View File

@@ -226,7 +226,7 @@ mod docset;
pub use self::docset::{DocSet, SkipResult}; pub use self::docset::{DocSet, SkipResult};
pub use core::SegmentComponent; pub use core::SegmentComponent;
pub use core::{Index, Searcher, Segment, SegmentId, SegmentMeta, IndexMeta}; pub use core::{Index, Searcher, Segment, SegmentId, SegmentMeta};
pub use core::{InvertedIndexReader, SegmentReader}; pub use core::{InvertedIndexReader, SegmentReader};
pub use directory::Directory; pub use directory::Directory;
pub use indexer::IndexWriter; pub use indexer::IndexWriter;
@@ -254,16 +254,6 @@ pub mod merge_policy {
/// as they are added in the segment. /// as they are added in the segment.
pub type DocId = u32; pub type DocId = u32;
/// A u64 assigned to every operation incrementally
///
/// All operations modifying the index receives an monotonic Opstamp.
/// The resulting state of the index is consistent with the opstamp ordering.
///
/// For instance, a commit with opstamp `32_423` will reflect all Add and Delete operations
/// with an opstamp `<= 32_423`. A delete operation with opstamp n will no affect a document added
/// with opstamp `n+1`.
pub type Opstamp = u64;
/// A f32 that represents the relevance of the document to the query /// A f32 that represents the relevance of the document to the query
/// ///
/// This is modelled internally as a `f32`. The /// This is modelled internally as a `f32`. The

View File

@@ -205,332 +205,4 @@ mod tests {
assert_eq!(score_docs(&boolean_query), vec![0.977973, 0.84699446]); assert_eq!(score_docs(&boolean_query), vec![0.977973, 0.84699446]);
} }
} }
/*
DoC 0
{
"_index": "test",
"_type": "_doc",
"_id": "0",
"matched": true,
"explanation": {
"value": 6.2610235,
"description": "max of:",
"details": [{
"value": 6.1969156,
"description": "sum of:",
"details": [{
"value": 6.1969156,
"description": "weight(text:оксана in 561) [PerFieldSimilarity], result of:",
"details": [{
"value": 6.1969156,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 5.65998,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 3,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.49766606,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 19.0,
"description": "dl, length of field",
"details": []
}, {
"value": 24.105577,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}]
}, {
"value": 6.2610235,
"description": "sum of:",
"details": [{
"value": 6.2610235,
"description": "weight(title:оксана in 561) [PerFieldSimilarity], result of:",
"details": [{
"value": 6.2610235,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 5.4086657,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 4,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.52617776,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 4.0,
"description": "dl, length of field",
"details": []
}, {
"value": 5.99502,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}]
}]
}
}
doc 2
{
"_index": "test",
"_type": "_doc",
"_id": "2",
"matched": true,
"explanation": {
"value": 11.911896,
"description": "max of:",
"details": [{
"value": 11.911896,
"description": "sum of:",
"details": [{
"value": 5.4068284,
"description": "weight(title:оксана in 0) [PerFieldSimilarity], result of:",
"details": [{
"value": 5.4068284,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 5.4086657,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 4,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.45439103,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 6.0,
"description": "dl, length of field",
"details": []
}, {
"value": 5.99502,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}, {
"value": 6.505067,
"description": "weight(title:лифенко in 0) [PerFieldSimilarity], result of:",
"details": [{
"value": 6.505067,
"description": "score(freq=1.0), product of:",
"details": [{
"value": 2.2,
"description": "boost",
"details": []
}, {
"value": 6.5072775,
"description": "idf, computed as log(1 + (N - n + 0.5) / (n + 0.5)) from:",
"details": [{
"value": 1,
"description": "n, number of documents containing term",
"details": []
}, {
"value": 1004,
"description": "N, total number of documents with field",
"details": []
}]
}, {
"value": 0.45439103,
"description": "tf, computed as freq / (freq + k1 * (1 - b + b * dl / avgdl)) from:",
"details": [{
"value": 1.0,
"description": "freq, occurrences of term within document",
"details": []
}, {
"value": 1.2,
"description": "k1, term saturation parameter",
"details": []
}, {
"value": 0.75,
"description": "b, length normalization parameter",
"details": []
}, {
"value": 6.0,
"description": "dl, length of field",
"details": []
}, {
"value": 5.99502,
"description": "avgdl, average length of field",
"details": []
}]
}]
}]
}]
}]
}
}
*/
// motivated by #554
#[test]
fn test_bm25_several_fields() {
let mut schema_builder = Schema::builder();
let title = schema_builder.add_text_field("title", TEXT);
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(
// tf = 1 0
title => "Законы притяжения Оксана Кулакова",
// tf = 1 0
text => "Законы притяжения Оксана Кулакова] \n\nТема: Сексуальное искусство, Женственность\nТип товара: Запись вебинара (аудио)\nПродолжительность: 1,5 часа\n\nСсылка на вебинар:\n ",
));
index_writer.add_document(doc!(
// tf = 1 0
title => "Любимые русские пироги (Оксана Путан)",
// tf = 2 0
text => "http://i95.fastpic.ru/big/2017/0628/9a/615b9c8504d94a3893d7f496ac53539a.jpg \n\nОт издателя\nОксана Путан профессиональный повар, автор кулинарных книг и известный кулинарный блогер. Ее рецепты отличаются практичностью, доступностью и пользуются огромной популярностью в русскоязычном интернете. Это третья книга автора о самом вкусном и ароматном настоящих русских пирогах и выпечке!\nДаже новички на кухне легко готовят по ее рецептам. Оксана описывает процесс приготовления настолько подробно и понятно, что вам остается только наслаждаться готовкой и не тратить время на лишние усилия. Готовьте легко и просто!\n\nhttps://www.ozon.ru/context/detail/id/139872462/"
));
index_writer.add_document(doc!(
// tf = 1 1
title => "PDF Мастер Класс \"Морячок\" (Оксана Лифенко)",
// tf = 0 0
text => "https://i.ibb.co/pzvHrDN/I3d U T6 Gg TM.jpg\nhttps://i.ibb.co/NFrb6v6/N0ls Z9nwjb U.jpg\nВ описание входит штаны, кофта, берет, матросский воротник. Описание продается в формате PDF, состоит из 12 страниц формата А4 и может быть напечатано на любом принтере.\nОписание предназначено для кукол BJD RealPuki от FairyLand, но может подойти и другим подобным куклам. Также вы можете вязать этот наряд из обычной пряжи, и он подойдет для куколок побольше.\nhttps://vk.com/market 95724412?w=product 95724412_2212"
));
for _ in 0..1_000 {
index_writer.add_document(doc!(
title => "a b d e f g",
text => "maitre corbeau sur un arbre perche tenait dans son bec un fromage Maitre rnard par lodeur alleche lui tint a peu pres ce langage."
));
}
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![title, text]);
let query = query_parser
.parse_query("Оксана Лифенко")
.unwrap();
let weight = query.weight(&searcher, true).unwrap();
let mut scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
// let mut scores = vec![];
// while
println!("=====|");
scorer.advance();
dbg!("scorer.score()");
assert!(false);
// scores.push(scorer.score());
// assert_eq!(scores, &[0.8017307, 0.72233325, 1.0300813]);
}
// motivated by #554
#[test]
fn test_bm25_several_fields_bbb() {
let mut schema_builder = Schema::builder();
let text = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut index_writer = index.writer_with_num_threads(1, 3_000_000).unwrap();
index_writer.add_document(doc!(
text => "Законы притяжения Оксана Кулакова] \n\nТема: Сексуальное искусство, Женственность\nТип товара: Запись вебинара (аудио)\nПродолжительность: 1,5 часа\n\nСсылка на вебинар:\n ",
));
index_writer.add_document(doc!(
text => "http://i95.fastpic.ru/big/2017/0628/9a/615b9c8504d94a3893d7f496ac53539a.jpg \n\nОт издателя\nОксана Путан профессиональный повар, автор кулинарных книг и известный кулинарный блогер. Ее рецепты отличаются практичностью, доступностью и пользуются огромной популярностью в русскоязычном интернете. Это третья книга автора о самом вкусном и ароматном настоящих русских пирогах и выпечке!\nДаже новички на кухне легко готовят по ее рецептам. Оксана описывает процесс приготовления настолько подробно и понятно, что вам остается только наслаждаться готовкой и не тратить время на лишние усилия. Готовьте легко и просто!\n\nhttps://www.ozon.ru/context/detail/id/139872462/"
));
index_writer.add_document(doc!(
text => "https://i.ibb.co/pzvHrDN/I3d U T6 Gg TM.jpg\nhttps://i.ibb.co/NFrb6v6/N0ls Z9nwjb U.jpg\nВ описание входит штаны, кофта, берет, матросский воротник. Описание продается в формате PDF, состоит из 12 страниц формата А4 и может быть напечатано на любом принтере.\nОписание предназначено для кукол BJD RealPuki от FairyLand, но может подойти и другим подобным куклам. Также вы можете вязать этот наряд из обычной пряжи, и он подойдет для куколок побольше.\nhttps://vk.com/market 95724412?w=product 95724412_2212"
));
for _ in 0..100 {
index_writer.add_document(doc!(
text => "maitre corbeau sur un arbre perche tenait dans son bec un fromage Maitre rnard par lodeur alleche lui tint a peu pres ce langage."
));
}
index_writer.commit().unwrap();
let reader = index.reader().unwrap();
let searcher = reader.searcher();
let query_parser = QueryParser::for_index(&index, vec![text]);
let query = query_parser
.parse_query("Оксана Лифенко")
.unwrap();
let weight = query.weight(&searcher, true).unwrap();
let mut scorer = weight.scorer(searcher.segment_reader(0u32)).unwrap();
let mut scores = vec![];
while scorer.advance() {
scores.push(scorer.score());
}
assert_eq!(scores, &[0.8017307, 0.72233325, 1.0300813]);
index_writer.commit().unwrap();
}
} }

View File

@@ -214,6 +214,102 @@ impl<TDocSet: DocSet, TOtherDocSet: DocSet> DocSet for Intersection<TDocSet, TOt
} }
} }
// `ahead` is assumed to be initialized (ahead.advance() has been called at least once,
// and this returned true).
//
// If behind is either uninitialized or `ahead.doc() > behind.doc()`.
fn next_in_intersection<'a, TScorer: Scorer>(
ahead: &'a mut TScorer,
behind: &'a mut TScorer,
) -> Option<DocId> {
let candidate = ahead.doc();
match behind.skip_next(candidate) {
SkipResult::Reached => Some(candidate),
SkipResult::OverStep => {
// yeah for tail-recursion
next_in_intersection(behind, ahead)
}
SkipResult::End => None,
}
}
enum SkipResultComplex {
Reached,
Overstep { other_ord: usize, candidate: DocId },
End,
}
fn skip_several_scorers<TDocSet: DocSet>(
others: &mut [TDocSet],
except_candidate_ord: usize,
target: DocId,
) -> SkipResultComplex {
for (ord, docset) in others.iter_mut().enumerate() {
// `candidate_ord` is already at the
// right position.
//
// Calling `skip_next` would advance this docset
// and miss it.
if ord == except_candidate_ord {
continue;
}
match docset.skip_next(target) {
SkipResult::Reached => {}
SkipResult::OverStep => {
return SkipResultComplex::Overstep {
other_ord: ord,
candidate: docset.doc(),
};
}
SkipResult::End => {
return SkipResultComplex::End;
}
}
}
SkipResultComplex::Reached
}
fn for_each<'a, TScorer: Scorer, TOtherscorer: Scorer>(
left: &'a mut TScorer,
right: &'a mut TScorer,
others: &'a mut [TOtherscorer],
callback: &mut FnMut(DocId, Score),
) {
let mut other_candidate_ord: usize = usize::max_value();
if !left.advance() {
return;
}
while let Some(candidate) = next_in_intersection(left, right) {
// test the remaining scorers
match skip_several_scorers(others, other_candidate_ord, candidate) {
SkipResultComplex::Reached => {
let intersection_score: Score = left.score()
+ right.score()
+ others.iter_mut().map(|other| other.score()).sum::<Score>();
callback(candidate, intersection_score);
if !left.advance() {
return;
}
}
SkipResultComplex::Overstep {
other_ord,
candidate,
} => match left.skip_next(candidate) {
SkipResult::End => {
return;
}
SkipResult::Reached => {
other_candidate_ord = other_ord;
}
SkipResult::OverStep => other_candidate_ord = usize::max_value(),
},
SkipResultComplex::End => {
return;
}
}
}
}
impl<TScorer, TOtherScorer> Scorer for Intersection<TScorer, TOtherScorer> impl<TScorer, TOtherScorer> Scorer for Intersection<TScorer, TOtherScorer>
where where
TScorer: Scorer, TScorer: Scorer,
@@ -224,6 +320,10 @@ where
+ self.right.score() + self.right.score()
+ self.others.iter_mut().map(Scorer::score).sum::<Score>() + self.others.iter_mut().map(Scorer::score).sum::<Score>()
} }
fn for_each(&mut self, callback: &mut FnMut(DocId, Score)) {
for_each(&mut self.left, &mut self.right, &mut self.others, callback);
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -16,6 +16,9 @@ pub trait Scorer: downcast_rs::Downcast + DocSet + 'static {
/// Iterates through all of the document matched by the DocSet /// Iterates through all of the document matched by the DocSet
/// `DocSet` and push the scored documents to the collector. /// `DocSet` and push the scored documents to the collector.
///
/// This method assumes that the Scorer is brand new, and `.advance()`
/// and `.skip()` haven't been called yet.
fn for_each(&mut self, callback: &mut FnMut(DocId, Score)) { fn for_each(&mut self, callback: &mut FnMut(DocId, Score)) {
while self.advance() { while self.advance() {
callback(self.doc(), self.score()); callback(self.doc(), self.score());

View File

@@ -260,6 +260,23 @@ where
fn score(&mut self) -> Score { fn score(&mut self) -> Score {
self.score self.score
} }
fn for_each(&mut self, callback: &mut FnMut(DocId, Score)) {
// TODO how do we deal with the fact that people may have called .advance() before.
while self.refill() {
let offset = self.offset;
for cursor in 0..HORIZON_NUM_TINYBITSETS {
while let Some(val) = self.bitsets[cursor].pop_lowest() {
let delta = val + (cursor as u32) * 64;
let doc = offset + delta;
let score_combiner = &mut self.scores[delta as usize];
let score = score_combiner.score();
callback(doc, score);
score_combiner.clear();
}
}
}
}
} }
#[cfg(test)] #[cfg(test)]